@@ -289,13 +289,10 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
289
289
}
290
290
}
291
291
292
- /* We need to send the requested 'snap_start_seqno_' as the snapshot
292
+ /* We may need to send the requested 'snap_start_seqno_' as the snapshot
293
293
start when we are sending the first snapshot because the first
294
294
snapshot could be resumption of a previous snapshot */
295
- if (!firstMarkerSent) {
296
- startSeqno = std::min (snap_start_seqno_, startSeqno);
297
- firstMarkerSent = true ;
298
- }
295
+ startSeqno = adjustStartIfFirstSnapshot (startSeqno);
299
296
300
297
VBucketPtr vb = engine->getVBucket (vb_);
301
298
if (!vb) {
@@ -1279,55 +1276,53 @@ void ActiveStream::processItems(
1279
1276
outstandingItemsResult.highCompletedSeqno ,
1280
1277
visibleSeqno,
1281
1278
highNonVisibleSeqno);
1282
- } else if (!firstMarkerSent && isSeqnoAdvancedEnabled () &&
1283
- lastReadSeqno < snap_end_seqno_) {
1284
- // MB-47009: This first snapshot has been completely filtered away.
1285
- // The remaining items must not of been for this client. We must
1286
- // still send a snapshot marker so that the client is moved to their
1287
- // end seqno - so a snapshot + seqno advance is needed.
1288
- firstMarkerSent = true ;
1289
-
1279
+ } else if (isSeqnoAdvancedEnabled ()) {
1290
1280
// Note that we cannot enter this case if supportSyncReplication()
1291
1281
// returns true (see isSeqnoAdvancedEnabled). This means that we
1292
1282
// do not need to set the HCS/MVS or timestamp parameters of the
1293
1283
// snapshot marker. MB-47877 tracks enabling sync-writes+filtering
1294
- pushToReadyQ (std::make_unique<SnapshotMarker>(opaque_,
1295
- vb_,
1296
- snap_start_seqno_,
1297
- snap_end_seqno_,
1298
- MARKER_FLAG_MEMORY,
1299
- std::nullopt ,
1300
- std::nullopt ,
1301
- std::nullopt ,
1302
- sid));
1303
-
1304
- lastSentSnapEndSeqno.store (snap_end_seqno_,
1305
- std::memory_order_relaxed);
1306
- nextSnapshotIsCheckpoint = false ;
1307
-
1308
- queueSeqnoAdvanced ();
1309
- } else if (isSeqnoAdvancedEnabled () &&
1310
- isSeqnoGapAtEndOfSnapshot (curChkSeqno)) {
1311
- auto vb = engine->getVBucket (getVBucket ());
1312
- if (vb) {
1313
- if (vb->getState () == vbucket_state_replica) {
1314
- /*
1315
- * If this is a collection stream and we're not sending any
1316
- * mutations from memory and we haven't queued a snapshot
1317
- * shot and we're a replica. Then our snapshot covers
1318
- * backfill and in memory. So we have one snapshot marker
1319
- * for both items on disk and in memory. Thus, we need to
1320
- * send a SeqnoAdvanced to push the consumer's seqno to the
1321
- * end of the snapshot. This is need when no items for the
1322
- * collection we're streaming are present in memory.
1323
- */
1324
- queueSeqnoAdvanced ();
1284
+ if (!firstMarkerSent && lastReadSeqno < snap_end_seqno_) {
1285
+ // MB-47009: This first snapshot has been completely filtered
1286
+ // away. The remaining items must not of been for this client.
1287
+ // We must still send a snapshot marker so that the client is
1288
+ // moved to their end seqno - so a snapshot + seqno advance is
1289
+ // needed.
1290
+ sendSnapshotAndSeqnoAdvanced (
1291
+ outstandingItemsResult.checkpointType ,
1292
+ snap_start_seqno_,
1293
+ snap_end_seqno_);
1294
+ firstMarkerSent = true ;
1295
+ } else if (isSeqnoGapAtEndOfSnapshot (curChkSeqno)) {
1296
+ auto vb = engine->getVBucket (getVBucket ());
1297
+ if (vb) {
1298
+ if (vb->getState () == vbucket_state_replica) {
1299
+ /*
1300
+ * If this is a collection stream and we're not sending
1301
+ * any mutations from memory and we haven't queued a
1302
+ * snapshot and we're a replica. Then our snapshot
1303
+ * covers backfill and in memory. So we have one
1304
+ * snapshot marker for both items on disk and in memory.
1305
+ * Thus, we need to send a SeqnoAdvanced to push the
1306
+ * consumer's seqno to the end of the snapshot. This is
1307
+ * needed when no items for the collection we're
1308
+ * streaming are present in memory.
1309
+ */
1310
+ queueSeqnoAdvanced ();
1311
+ }
1312
+ } else {
1313
+ log (spdlog::level::level_enum::warn,
1314
+ " {} processItems() for vbucket which does not "
1315
+ " exist" ,
1316
+ logPrefix);
1325
1317
}
1326
- } else {
1327
- log (spdlog::level::level_enum::warn,
1328
- " {} processItems() for vbucket which does not "
1329
- " exist" ,
1330
- logPrefix);
1318
+ } else if (highNonVisibleSeqno &&
1319
+ curChkSeqno >= highNonVisibleSeqno.value ()) {
1320
+ // MB-48368: Nothing directly available for the stream, but a
1321
+ // non-visible item was available - bring the client up-to-date
1322
+ sendSnapshotAndSeqnoAdvanced (
1323
+ outstandingItemsResult.checkpointType ,
1324
+ highNonVisibleSeqno.value (),
1325
+ highNonVisibleSeqno.value ());
1331
1326
}
1332
1327
}
1333
1328
}
@@ -2223,3 +2218,35 @@ bool ActiveStream::isSeqnoGapAtEndOfSnapshot(uint64_t streamSeqno) const {
2223
2218
return (lastSentSnapEndSeqno.load () > lastReadSeqno.load ()) &&
2224
2219
lastSentSnapEndSeqno.load () == streamSeqno;
2225
2220
}
2221
+
2222
+ void ActiveStream::sendSnapshotAndSeqnoAdvanced (CheckpointType checkpointType,
2223
+ uint64_t start,
2224
+ uint64_t end) {
2225
+ start = adjustStartIfFirstSnapshot (start);
2226
+
2227
+ const auto isCkptTypeDisk = checkpointType == CheckpointType::Disk;
2228
+ uint32_t flags = isCkptTypeDisk ? MARKER_FLAG_DISK : MARKER_FLAG_MEMORY;
2229
+
2230
+ pushToReadyQ (std::make_unique<SnapshotMarker>(opaque_,
2231
+ vb_,
2232
+ start,
2233
+ end,
2234
+ flags,
2235
+ std::nullopt ,
2236
+ std::nullopt ,
2237
+ std::nullopt ,
2238
+ sid));
2239
+
2240
+ lastSentSnapEndSeqno.store (end, std::memory_order_relaxed);
2241
+ nextSnapshotIsCheckpoint = false ;
2242
+
2243
+ queueSeqnoAdvanced ();
2244
+ }
2245
+
2246
+ uint64_t ActiveStream::adjustStartIfFirstSnapshot (uint64_t start) {
2247
+ if (!firstMarkerSent) {
2248
+ firstMarkerSent = true ;
2249
+ return std::min (snap_start_seqno_, start);
2250
+ }
2251
+ return start;
2252
+ }
0 commit comments