Skip to content

Commit e664574

Browse files
committed
MB-54729: Enable history scan for CDC backfill
Replace the todo markers with code that now utilises the magma history API - this now means scanAllVersions for example is hooked into the magma history scanning API. Add new tests that validate multiple versions can be stored and returned. Also required are changes to unit tests to respect new expectation checks that occur in magma - primarily that flushing writes ordered batches - this is only a problem for tests which bypass the flusher and call KVStore directly. **** ISSUES **** ep-engine_ep_unit_tests does not pass: 1) Exception from magma MagmaKVStoreRollbackTest.Rollback hits the following exception GSL: Precondition failure: 'levelSize >= compactionState[level].history.Size' at /Users/jimwalker/Code/couchbase/neo/magma/lsm/lsm_tree.cc:895 2) Seg-fault in magma Seen in a number of tests, 1 example: CollectionsDcpEphemeralOrPersistent/CollectionsDcpParameterizedTest.DefaultCollectionDropped/persistent_magma_value_only Process 78731 stopped * thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] 72 } 73 74 Slice DocSequenceBuffer::GetKey() { -> 75 seqFmt.Set(sortedList[offset]->seqno); 76 return seqFmt.Encode(); 77 } 78 * thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) * frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] frame couchbase#1: 0x0000000101361e2e ep-engine_ep_unit_tests`magma::mvccIteratorAdaptor::GetKey(this=0x0000000118536c00) at mvcc.h:249:25 [opt] frame couchbase#2: 0x000000010132b688 ep-engine_ep_unit_tests`magma::IteratorWithFilter::filterKeys(this=0x0000000118128350) at iterator.cc:214:32 [opt] frame couchbase#3: 0x000000010132de5b ep-engine_ep_unit_tests`magma::KVReader::ReadKVs(this=0x00007ff7bfefd550) at common.cc:70:19 [opt] frame couchbase#4: 0x0000000101378f63 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, w=0x00007ff7bfefd890, itr=0x0000000118128350, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefd860)>) at lsm_tree.cc:719:15 [opt] frame couchbase#5: 0x0000000101376ee8 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, appendMode=<unavailable>, itr=0x0000000118128350, sizeEstimate=<unavailable>, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefdb60)>) at lsm_tree.cc:682:17 [opt] frame couchbase#6: 0x00000001013761b2 ep-engine_ep_unit_tests`magma::LSMTree::writeMemtable(this=0x000000011855a820, memtable=0x000000011854c7a0) at lsm_tree.cc:449:21 [opt] frame #7: 0x000000010137753f ep-engine_ep_unit_tests`magma::LSMTree::doMemtableFlushWork(this=0x000000011855a820) at lsm_tree.cc:531:18 [opt] frame #8: 0x000000010139fe62 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] magma::LSMTree::newFlush(this=<unavailable>)::$_16::operator()() const at lsm_tree.cc:993:34 [opt] frame #9: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] decltype(__f=<unavailable>)::$_16&>(fp)()) std::__1::__invoke<magma::LSMTree::newFlush()::$_16&>(magma::LSMTree::newFlush()::$_16&) at type_traits:3918:1 [opt] frame #10: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::tuple<magma::Status, magma::CheckpointTransaction> std::__1::__invoke_void_return_wrapper<std::__1::tuple<magma::Status, magma::CheckpointTransaction>, false>::__call<magma::LSMTree::newFlush(__args=<unavailable>)::$_16&>(magma::LSMTree::newFlush()::$_16&) at invoke.h:30:16 [opt] frame #11: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:178:16 [opt] frame #12: 0x000000010139fe59 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:352:12 [opt] frame #13: 0x00000001012f72af ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::__function::__value_func<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt] frame #14: 0x00000001012f7296 ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::function<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=0x0000000118131560)() const at function.h:1182:12 [opt] frame #15: 0x00000001012f7292 ep-engine_ep_unit_tests`magma::FlushWork::Execute(this=0x0000000118131560) at flush_work.cc:61:29 [opt] frame #16: 0x0000000101389d5e ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x00007ff7bfefe1c0)::$_38::operator()() at kvstore.cc:515:27 [opt] frame #17: 0x0000000101388fac ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x000000010442a420, wal=<unavailable>, offset=(SegID = 1, SegOffset = 4096), flushMode=<unavailable>, blockMode=Blocking) at kvstore.cc:582:16 [opt] frame #18: 0x0000000101389a5a ep-engine_ep_unit_tests`magma::KVStore::FlushMemTables(this=<unavailable>, wal=<unavailable>, flushMode=<unavailable>, blockMode=<unavailable>) at kvstore.cc:387:12 [opt] frame #19: 0x00000001012fd9ba ep-engine_ep_unit_tests`magma::Magma::Impl::syncKVStore(this=0x000000011814f000, kvID=<unavailable>, checkpoint=true) at db.cc:1352:21 [opt] frame #20: 0x000000010132678a ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=0x00007ff7bfefe400)>)::$_7::operator()() const at db.cc:880:23 [opt] frame #21: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=<unavailable>)>)::$_8::operator()() const at db.cc:891:21 [opt] frame #22: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] decltype(__f=<unavailable>)>)::$_8&>(fp)()) std::__1::__invoke<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at type_traits:3918:1 [opt] frame #23: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] void std::__1::__invoke_void_return_wrapper<void, true>::__call<magma::Magma::Impl::CompactKVStore(__args=<unavailable>)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at invoke.h:61:9 [opt] frame #24: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:178:16 [opt] frame #25: 0x0000000101326764 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:352:12 [opt] frame #26: 0x0000000101303138 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::__function::__value_func<void ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt] frame #27: 0x000000010130312d ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::function<void ()>::operator(this=0x00007ff7bfefe4b0)() const at function.h:1182:12 [opt] frame #28: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:92:9 [opt] frame #29: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:91:14 [opt] frame #30: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(this=<unavailable>, kvID=<unavailable>, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefe550)>) at db.cc:895:1 [opt] frame #31: 0x000000010130336c ep-engine_ep_unit_tests`magma::Magma::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=<unavailable>, makeCallback=<unavailable>)>) at db.cc:901:18 [opt] frame #32: 0x000000010004fd3d ep-engine_ep_unit_tests`MagmaMemoryTrackingProxy::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefea00)>) at magma-memory-tracking-proxy.cc:190:19 [opt] frame #33: 0x00000001000a9eeb ep-engine_ep_unit_tests`MagmaKVStore::compactDBInternal(this=<unavailable>, vbLock=0x00007ff7bfefeda0, ctx=std::__1::shared_ptr<CompactionContext>::element_type @ 0x00000001184acc20 strong=3 weak=1) at magma-kvstore.cc:2590:29 [opt] frame #34: 0x00000001000a93ad ep-engine_ep_unit_tests`MagmaKVStore::compactDB(this=0x00000001067e6500, vbLock=0x00007ff7bfefeda0, ctx=nullptr) at magma-kvstore.cc:2445:12 [opt] frame #35: 0x00000001001d7eb0 ep-engine_ep_unit_tests`EPBucket::compactInternal(this=0x00000001067e6000, vb=0x00007ff7bfefed90, config=<unavailable>) at ep_bucket.cc:1398:25 [opt] frame #36: 0x00000001001d83f6 ep-engine_ep_unit_tests`EPBucket::doCompact(this=0x00000001067e6000, vbid=(vbid = 0), config=0x00007ff7bfefedf0, cookies=size=0) at ep_bucket.cc:1476:14 [opt] 3) Key sorting issue Magma now checks for sorted keys - it turns out KV flushing is violating that ordering. Need to know if KV should fix or is the magma check required?? Example: CollectionsDcpEphemeralOrPersistent/CollectionsLegacyDcpTest.default_collection_is_not_vbucket_highseqno_with_pending/persistent_nexus_couchstore_magma_value_only CRITICAL [(SynchronousEPEngine:default) magma_0]Fatal error: Found: preceding key(d2) > current key( _collection). If history is enabled, all keys in the batch must be sorted lexicographicall The problem is that the test flushes a prepare(default collection, key=d2) and create-collection(fruit) together. The flusher orders these... \0d2 \1create_fruit This is correct. But \0d2 is marked as a prepare, when flushed to disk it goes into a special namespace. This occurs in KVStore after the sorting. \0d2 becomes \2\0d2 And magma actually sees \2\0d2 \1create_fruit and we have violated the expects Change-Id: Ica9ea1b52c51f125c9e8839a0fca412834fc25f7
1 parent 087b981 commit e664574

File tree

16 files changed

+316
-87
lines changed

16 files changed

+316
-87
lines changed

engines/ep/src/kv_bucket.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3098,6 +3098,13 @@ std::chrono::seconds KVBucket::getHistoryRetentionSeconds() const {
30983098

30993099
void KVBucket::setHistoryRetentionBytes(size_t bytes) {
31003100
historyRetentionBytes = bytes;
3101+
for (auto& i : vbMap.shards) {
3102+
KVShard* shard = i.get();
3103+
// The KVStore needs to know the per vbucket size
3104+
shard->getRWUnderlying()->setHistoryRetentionBytes(bytes /
3105+
vbMap.getSize());
3106+
}
3107+
31013108
}
31023109

31033110
size_t KVBucket::getHistoryRetentionBytes() const {

engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4542,3 +4542,10 @@ std::unique_ptr<TransactionContext> CouchKVStore::begin(
45424542
return std::make_unique<CouchKVStoreTransactionContext>(
45434543
*this, vbid, std::move(pcb));
45444544
}
4545+
4546+
void CouchKVStore::setHistoryRetentionBytes(size_t size) {
4547+
// no-op.
4548+
// Note: StorageProperties reports that history scan is not supported, so
4549+
// we accept this attempt to set size, but will fail if a scanAllVersions
4550+
// is attempted.
4551+
}

engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ class CouchKVStore : public KVStore
410410
std::unique_ptr<TransactionContext> begin(
411411
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;
412412

413+
void setHistoryRetentionBytes(size_t size) override;
414+
413415
protected:
414416
/**
415417
* RAII holder for a couchstore LocalDoc object

engines/ep/src/kvstore/kvstore_iface.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,11 @@ class KVStoreIface {
763763
* @param vbid ID of the vbucket being created
764764
*/
765765
virtual void prepareToCreateImpl(Vbid vbid) = 0;
766+
767+
/**
768+
* Method to configure the amount of history a vbucket should retain.
769+
*/
770+
virtual void setHistoryRetentionBytes(size_t size) = 0;
766771
};
767772

768773
std::string to_string(KVStoreIface::ReadVBStateStatus status);

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ StorageProperties MagmaKVStore::getStorageProperties() const {
869869
StorageProperties::AutomaticDeduplication::No,
870870
StorageProperties::PrepareCounting::No,
871871
StorageProperties::CompactionStaleItemCallbacks::Yes,
872-
StorageProperties::HistoryRetentionAvailable::No);
872+
StorageProperties::HistoryRetentionAvailable::Yes);
873873
return rv;
874874
}
875875

@@ -1695,8 +1695,7 @@ std::unique_ptr<BySeqnoScanContext> MagmaKVStore::initBySeqnoScanContext(
16951695
getDroppedStatus.String());
16961696
}
16971697

1698-
// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
1699-
auto historyStartSeqno = 0;
1698+
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
17001699
if (logger->should_log(spdlog::level::info)) {
17011700
logger->info(
17021701
"MagmaKVStore::initBySeqnoScanContext {} seqno:{} endSeqno:{}"
@@ -1809,8 +1808,7 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
18091808
return nullptr;
18101809
}
18111810

1812-
// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
1813-
auto historyStartSeqno = 0;
1811+
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
18141812
logger->info(
18151813
"MagmaKVStore::initByIdScanContext {} historyStartSeqno:{} "
18161814
"KeyIterator:{}",
@@ -1830,13 +1828,16 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
18301828
historyStartSeqno);
18311829
}
18321830

1831+
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
1832+
return scan(ctx, magma::Magma::SeqIterator::Mode::Snapshot);
1833+
}
1834+
18331835
scan_error_t MagmaKVStore::scanAllVersions(BySeqnoScanContext& ctx) const {
1834-
// @todo use magma's mode
1835-
// return scan(ctx, magma::Magma::SeqIterator::Mode::History);
1836-
return scan(ctx);
1836+
return scan(ctx, magma::Magma::SeqIterator::Mode::History);
18371837
}
18381838

1839-
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
1839+
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx,
1840+
magma::Magma::SeqIterator::Mode mode) const {
18401841
if (ctx.lastReadSeqno == ctx.maxSeqno) {
18411842
logger->TRACE("MagmaKVStore::scan {} lastReadSeqno:{} == maxSeqno:{}",
18421843
ctx.vbid,
@@ -1849,7 +1850,8 @@ scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
18491850
startSeqno = ctx.lastReadSeqno + 1;
18501851
}
18511852
auto& mctx = dynamic_cast<MagmaScanContext&>(ctx);
1852-
for (mctx.itr->Seek(startSeqno, ctx.maxSeqno); mctx.itr->Valid();
1853+
for (mctx.itr->Initialize(startSeqno, ctx.maxSeqno, mode);
1854+
mctx.itr->Valid();
18531855
mctx.itr->Next()) {
18541856
Slice keySlice, metaSlice, valSlice;
18551857
uint64_t seqno;
@@ -3722,3 +3724,7 @@ std::pair<Status, uint64_t> MagmaKVStore::getOldestRollbackableHighSeqno(
37223724

37233725
return {status, seqno};
37243726
}
3727+
3728+
void MagmaKVStore::setHistoryRetentionBytes(size_t size) {
3729+
magma->SetHistoryRetentionSize(size);
3730+
}

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,8 @@ class MagmaKVStore : public KVStore {
568568
std::unique_ptr<TransactionContext> begin(
569569
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;
570570

571+
void setHistoryRetentionBytes(size_t size) override;
572+
571573
// Magma uses a unique logger with a prefix of magma so that all logging
572574
// calls from the wrapper thru magma will be prefixed with magma.
573575
std::shared_ptr<BucketLogger> logger;
@@ -782,6 +784,9 @@ class MagmaKVStore : public KVStore {
782784
folly::assume_unreachable();
783785
}
784786

787+
scan_error_t scan(BySeqnoScanContext& ctx,
788+
magma::Magma::SeqIterator::Mode mode) const;
789+
785790
MagmaKVStoreConfig& configuration;
786791

787792
/**

engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3299,3 +3299,10 @@ Vbid::id_type NexusKVStore::getCacheSlot(Vbid vbid) const {
32993299
uint64_t NexusKVStore::getPurgeSeqno(Vbid vbid) const {
33003300
return purgeSeqno.at(getCacheSlot(vbid));
33013301
}
3302+
3303+
void NexusKVStore::setHistoryRetentionBytes(size_t size) {
3304+
// no-op.
3305+
// Note: StorageProperties reports that history scan is not supported, so
3306+
// we accept this attempt to set size, but will fail if a scanAllVersions
3307+
// is attempted.
3308+
}

engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ class NexusKVStore : public KVStoreIface {
143143
void delSystemEvent(TransactionContext& txnCtx,
144144
const queued_item item) override;
145145
void endTransaction(Vbid vbid) override;
146+
void setHistoryRetentionBytes(size_t size) override;
146147

147148
/**
148149
* Unit test only hook called before we compact the first KVStore. Public as

engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1973,3 +1973,10 @@ RocksDBKVStoreTransactionContext::RocksDBKVStoreTransactionContext(
19731973
: TransactionContext(kvstore, vbid, std::move(cb)),
19741974
pendingReqs(std::make_unique<RocksDBKVStore::PendingRequestQueue>()) {
19751975
}
1976+
1977+
void RocksDBKVStore::setHistoryRetentionBytes(size_t size) {
1978+
// no-op.
1979+
// Note: StorageProperties reports that history scan is not supported, so
1980+
// we accept this attempt to set size, but will fail if a scanAllVersions
1981+
// is attempted.
1982+
}

engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ class RocksDBKVStore : public KVStore {
333333
std::unique_ptr<TransactionContext> begin(
334334
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;
335335

336+
void setHistoryRetentionBytes(size_t size) override;
337+
336338
protected:
337339
// Write a batch of updates to the given database; measuring the time
338340
// taken and adding the timer to the commit histogram.

0 commit comments

Comments
 (0)