Skip to content

Commit b4d6e64

Browse files
committed
Make ChangeStreamOperationSpecification more resilient
Remove assumption that expected change stream documents are returned in a single batch. JAVA-3742
1 parent 0a86284 commit b4d6e64

File tree

2 files changed

+37
-29
lines changed

2 files changed

+37
-29
lines changed

driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ import com.mongodb.internal.operation.AsyncWriteOperation
5252
import com.mongodb.internal.operation.InsertOperation
5353
import com.mongodb.internal.operation.ReadOperation
5454
import com.mongodb.internal.operation.WriteOperation
55-
import com.mongodb.internal.validator.NoOpFieldNameValidator
5655
import com.mongodb.internal.session.SessionContext
56+
import com.mongodb.internal.validator.NoOpFieldNameValidator
5757
import org.bson.BsonDocument
5858
import org.bson.Document
5959
import org.bson.FieldNameValidator
@@ -156,6 +156,16 @@ class OperationFunctionalSpecification extends Specification {
156156
results
157157
}
158158

159+
def next(cursor, boolean async, int minimumCount) {
160+
List<BsonDocument> retVal = []
161+
162+
while (retVal.size() < minimumCount) {
163+
retVal.addAll(next(cursor, async))
164+
}
165+
166+
retVal
167+
}
168+
159169
def next(cursor, boolean async) {
160170
if (async) {
161171
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()

driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import com.mongodb.MongoNamespace
2121
import com.mongodb.OperationFunctionalSpecification
2222
import com.mongodb.ReadConcern
2323
import com.mongodb.WriteConcern
24-
import com.mongodb.internal.async.SingleResultCallback
2524
import com.mongodb.client.model.CreateCollectionOptions
2625
import com.mongodb.client.model.changestream.ChangeStreamDocument
2726
import com.mongodb.client.model.changestream.FullDocument
2827
import com.mongodb.client.model.changestream.OperationType
2928
import com.mongodb.client.model.changestream.UpdateDescription
3029
import com.mongodb.client.test.CollectionHelper
3130
import com.mongodb.connection.ConnectionDescription
31+
import com.mongodb.internal.async.SingleResultCallback
3232
import com.mongodb.internal.binding.AsyncConnectionSource
3333
import com.mongodb.internal.binding.AsyncReadBinding
3434
import com.mongodb.internal.binding.ConnectionSource
@@ -154,7 +154,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
154154
def expected = insertDocuments(helper, [1, 2])
155155

156156
then:
157-
def next = nextAndClean(cursor, async)
157+
def next = nextAndClean(cursor, async, expected.size())
158158
next == expected
159159

160160
when:
@@ -163,7 +163,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
163163

164164
then:
165165
cursor.getBatchSize() == 5
166-
nextAndClean(cursor, async) == expected
166+
nextAndClean(cursor, async, expected.size()) == expected
167167

168168
then:
169169
if (async) {
@@ -192,7 +192,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
192192
when:
193193
def cursor = execute(operation, false)
194194
helper.insertDocuments(BsonDocument.parse('{ _id : 2, x : 2 }'))
195-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
195+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
196196

197197
then:
198198
next.getResumeToken() != null
@@ -219,7 +219,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
219219
when:
220220
def cursor = execute(operation, false)
221221
helper.updateOne(BsonDocument.parse('{ _id : 2}'), BsonDocument.parse('{ $set : {x : 3}, $unset : {y : 1}}'))
222-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
222+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
223223

224224
then:
225225
next.getResumeToken() != null
@@ -246,7 +246,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
246246
when:
247247
def cursor = execute(operation, false)
248248
helper.replaceOne(BsonDocument.parse('{ _id : 2}'), BsonDocument.parse('{ _id : 2, x : 3}'), false)
249-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
249+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
250250

251251
then:
252252
next.getResumeToken() != null
@@ -273,7 +273,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
273273
when:
274274
def cursor = execute(operation, false)
275275
helper.deleteOne(BsonDocument.parse('{ _id : 2}'))
276-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
276+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
277277

278278
then:
279279
next.getResumeToken() != null
@@ -300,7 +300,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
300300
when:
301301
def cursor = execute(operation, false)
302302
helper.drop()
303-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
303+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
304304

305305
then:
306306
next.getResumeToken() != null
@@ -328,7 +328,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
328328
when:
329329
def cursor = execute(operation, false)
330330
helper.drop()
331-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
331+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
332332

333333
then:
334334
next.getResumeToken() != null
@@ -357,7 +357,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
357357
when:
358358
def cursor = execute(operation, false)
359359
helper.dropDatabase('JavaDriverTest')
360-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
360+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
361361

362362
then:
363363
next.getResumeToken() != null
@@ -386,7 +386,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
386386
when:
387387
def cursor = execute(operation, false)
388388
helper.renameCollection(newNamespace)
389-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
389+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
390390

391391
then:
392392
next.getResumeToken() != null
@@ -441,7 +441,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
441441
def expected = insertDocuments(helper, [1, 2])
442442

443443
then:
444-
nextAndClean(cursor, async) == expected
444+
nextAndClean(cursor, async, expected.size()) == expected
445445

446446
then:
447447
tryNextAndClean(cursor, async) == null
@@ -450,7 +450,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
450450
expected = insertDocuments(helper, [3, 4])
451451

452452
then:
453-
nextAndClean(cursor, async) == expected
453+
nextAndClean(cursor, async, expected.size()) == expected
454454

455455
cleanup:
456456
cursor?.close()
@@ -472,15 +472,12 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
472472
def expected = insertDocuments(helper, [1, 2])
473473

474474
then:
475-
nextAndClean(cursor, async) == expected
475+
nextAndClean(cursor, async, expected.size()) == expected
476476

477477
when:
478478
helper.killCursor(helper.getNamespace(), cursor.getWrapped().getServerCursor())
479479
expected = insertDocuments(helper, [3, 4])
480-
def results = nextAndClean(cursor, async)
481-
if (results.size() < expected.size()) {
482-
results.addAll(nextAndClean(cursor, async))
483-
}
480+
def results = nextAndClean(cursor, async, expected.size())
484481

485482
then:
486483
results == expected
@@ -492,10 +489,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
492489
expected = insertDocuments(helper, [5, 6])
493490
helper.killCursor(helper.getNamespace(), cursor.getWrapped().getServerCursor())
494491

495-
results = nextAndClean(cursor, async)
496-
if (results.size() < expected.size()) {
497-
results.addAll(nextAndClean(cursor, async))
498-
}
492+
results = nextAndClean(cursor, async, expected.size())
499493

500494
then:
501495
results == expected
@@ -520,7 +514,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
520514

521515
when:
522516
def expected = insertDocuments(helper, [1, 2])
523-
def result = next(cursor, async)
517+
def result = next(cursor, async, 2)
524518

525519
then:
526520
result.size() == 2
@@ -531,7 +525,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
531525

532526
operation.startAtOperationTime(result.last().getTimestamp('clusterTime'))
533527
cursor = execute(operation, async)
534-
result = nextAndClean(cursor, async)
528+
result = nextAndClean(cursor, async, expected.tail.size())
535529

536530
then:
537531
result == expected.tail()
@@ -555,7 +549,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
555549

556550
when:
557551
def expected = insertDocuments(helper, [1, 2])
558-
def result = next(cursor, async)
552+
def result = next(cursor, async, 2)
559553

560554
then:
561555
result.size() == 2
@@ -566,7 +560,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
566560

567561
operation.resumeAfter(result.head().getDocument('_id')).startAtOperationTime(null)
568562
cursor = execute(operation, async)
569-
result = nextAndClean(cursor, async)
563+
result = nextAndClean(cursor, async, expected.tail().size())
570564

571565
then:
572566
result == expected.tail()
@@ -591,7 +585,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
591585

592586
when:
593587
def expected = insertDocuments(helper, [1, 2])
594-
def result = next(cursor, async)
588+
def result = next(cursor, async, 2)
595589

596590
then:
597591
result.size() == 2
@@ -601,7 +595,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
601595
waitForLastRelease(async ? getAsyncCluster() : getCluster())
602596

603597
cursor = execute(operation.startAfter(result.head().getDocument('_id')).startAtOperationTime(null), async)
604-
result = nextAndClean(cursor, async)
598+
result = nextAndClean(cursor, async, expected.tail().size())
605599

606600
then:
607601
result == expected.tail()
@@ -761,6 +755,10 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
761755
removeExtra(tryNext(cursor, async))
762756
}
763757

758+
def nextAndClean(cursor, boolean async, int minimumCount) {
759+
removeExtra(next(cursor, async, minimumCount))
760+
}
761+
764762
def nextAndClean(cursor, boolean async) {
765763
removeExtra(next(cursor, async))
766764
}

0 commit comments

Comments
 (0)