Skip to content

Commit 5c6b3ba

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-35133: Move SyncWrite warmup to EPBucket
We want to re-use the SyncWrite warmup code when performing a rollback as it's simpler than trying to write code to revert every possible state to the pre-rollback state. Move the loadPreparedSyncWrites code to EPBucket so that we can re-use it for rollback. Change-Id: I89b66fe36ace1d873a26fd92a840bdcfdef00be4 Reviewed-on: http://review.couchbase.org/113025 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Dave Rigby <daver@couchbase.com>
1 parent a6312a6 commit 5c6b3ba

File tree

7 files changed

+161
-134
lines changed

7 files changed

+161
-134
lines changed

engines/ep/src/ep_bucket.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040

4141
#include "dcp/dcpconnmap.h"
4242

43+
#include <platform/timeutils.h>
44+
4345
#include <gsl.h>
4446

4547
/**
@@ -1319,6 +1321,130 @@ void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) {
13191321
}
13201322
}
13211323

1324+
// Perform an in-order scan of the seqno index.
1325+
// a) For each Prepared item found, add to a map of outstanding Prepares.
1326+
// b) For each Committed (via Mutation or Prepare) item, if there's an
1327+
// outstanding Prepare then that prepare has already been Committed,
1328+
// hence remove it from the map.
1329+
//
1330+
// At the end of the scan, all outstanding Prepared items (which did not
1331+
// have a Commit persisted to disk) will be registered with the Durability
1332+
// Monitor.
1333+
void EPBucket::loadPreparedSyncWrites(
1334+
folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) {
1335+
/// Disk load callback for scan.
1336+
struct LoadSyncWrites : public StatusCallback<GetValue> {
1337+
LoadSyncWrites(EPVBucket& vb) : vb(vb) {
1338+
}
1339+
1340+
void callback(GetValue& val) override {
1341+
if (val.item->isPending()) {
1342+
// Pending item which was not aborted (deleted). Add to
1343+
// outstanding Prepare map.
1344+
outstandingPrepares.emplace(val.item->getKey(),
1345+
std::move(val.item));
1346+
return;
1347+
}
1348+
1349+
if (val.item->isCommitted()) {
1350+
// Committed item. _If_ there's an outstanding prepared
1351+
// SyncWrite, remove it (as it has already been committed).
1352+
outstandingPrepares.erase(val.item->getKey());
1353+
return;
1354+
}
1355+
}
1356+
1357+
EPVBucket& vb;
1358+
1359+
/// Map of Document key -> outstanding (not yet Committed / Aborted)
1360+
/// prepares.
1361+
std::unordered_map<StoredDocKey, std::unique_ptr<Item>>
1362+
outstandingPrepares;
1363+
};
1364+
1365+
auto& epVb = dynamic_cast<EPVBucket&>(vb);
1366+
const auto start = std::chrono::steady_clock::now();
1367+
1368+
// @TODO MB-34017: We can optimise this by starting the scan at the
1369+
// high_committed_seqno - all earlier prepares would have been committed
1370+
// (or were aborted) and only scanning up to the high prepared seqno.
1371+
uint64_t startSeqno = 0;
1372+
1373+
// Get the kvStore. Using the RW store as the rollback code that will call
1374+
// this function will modify vbucket_state that will only be reflected in
1375+
// RW store. For warmup case, we don't allow writes at this point in time
1376+
// anyway.
1377+
auto* kvStore = getRWUnderlyingByShard(epVb.getShard()->getId());
1378+
1379+
auto storageCB = std::make_shared<LoadSyncWrites>(epVb);
1380+
1381+
// Don't expect to find anything already in the HashTable, so use
1382+
// NoLookupCallback.
1383+
auto cacheCB = std::make_shared<NoLookupCallback>();
1384+
1385+
// Use ALL_ITEMS filter for the scan. NO_DELETES is insufficient
1386+
// because (committed) SyncDeletes manifest as a prepared_sync_write
1387+
// (doc on disk not deleted) followed by a commit_sync_write (which
1388+
// *is* marked as deleted as that's the resulting state).
1389+
// We need to see that Commit, hence ALL_ITEMS.
1390+
const auto docFilter = DocumentFilter::ALL_ITEMS;
1391+
const auto valFilter = getValueFilterForCompressionMode();
1392+
1393+
auto* scanCtx = kvStore->initScanContext(
1394+
storageCB, cacheCB, epVb.getId(), startSeqno, docFilter, valFilter);
1395+
1396+
// Storage problems can lead to a null context, kvstore logs details
1397+
if (!scanCtx) {
1398+
EP_LOG_CRITICAL(
1399+
"EPBucket::loadPreparedSyncWrites: scanCtx is null for {}",
1400+
epVb.getId());
1401+
return;
1402+
}
1403+
1404+
auto scanResult = kvStore->scan(scanCtx);
1405+
Expects(scanResult == scan_success);
1406+
1407+
kvStore->destroyScanContext(scanCtx);
1408+
1409+
EP_LOG_DEBUG(
1410+
"EPBucket::loadPreparedSyncWrites: Identified {} outstanding "
1411+
"prepared SyncWrites for {} in {}",
1412+
storageCB->outstandingPrepares.size(),
1413+
epVb.getId(),
1414+
cb::time2text(std::chrono::steady_clock::now() - start));
1415+
1416+
// Insert all outstanding Prepares into the VBucket (HashTable &
1417+
// DurabilityMonitor).
1418+
std::vector<queued_item> prepares;
1419+
for (auto& prepare : storageCB->outstandingPrepares) {
1420+
prepares.emplace_back(std::move(prepare.second));
1421+
}
1422+
// Sequence must be sorted by seqno (ascending) for DurabilityMonitor.
1423+
std::sort(
1424+
prepares.begin(), prepares.end(), [](const auto& a, const auto& b) {
1425+
return a->getBySeqno() < b->getBySeqno();
1426+
});
1427+
1428+
// Need the HPS/HCS so the DurabilityMonitor can be fully resumed
1429+
auto vbState = kvStore->getVBucketState(epVb.getId());
1430+
if (!vbState) {
1431+
throw std::logic_error("EPBucket::loadPreparedSyncWrites: processing " +
1432+
epVb.getId().to_string() +
1433+
", but found no vbucket_state");
1434+
}
1435+
1436+
epVb.loadOutstandingPrepares(vbStateLh, *vbState, std::move(prepares));
1437+
}
1438+
1439+
ValueFilter EPBucket::getValueFilterForCompressionMode() {
1440+
auto compressionMode = engine.getCompressionMode();
1441+
if (compressionMode != BucketCompressionMode::Off) {
1442+
return ValueFilter::VALUES_COMPRESSED;
1443+
}
1444+
1445+
return ValueFilter::VALUES_DECOMPRESSED;
1446+
}
1447+
13221448
void EPBucket::notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) {
13231449
if (notifyCtx.notifyFlusher) {
13241450
notifyFlusher(vbid);

engines/ep/src/ep_bucket.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,15 @@ class EPBucket : public KVBucket {
137137

138138
void rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) override;
139139

140+
void loadPreparedSyncWrites(folly::SharedMutex::WriteHolder& vbStateLh,
141+
VBucket& vb) override;
142+
143+
/**
144+
* Returns the ValueFilter to use for KVStore scans, given the bucket
145+
* compression mode.
146+
*/
147+
ValueFilter getValueFilterForCompressionMode();
148+
140149
void notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) override;
141150

142151
virtual bool isGetAllKeysSupported() const override {

engines/ep/src/ep_vb.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -743,13 +743,10 @@ MutationStatus EPVBucket::insertFromWarmup(Item& itm,
743743
return ht.insertFromWarmup(itm, eject, keyMetaDataOnly, eviction);
744744
}
745745

746-
void EPVBucket::restoreOutstandingPreparesFromWarmup(
746+
void EPVBucket::loadOutstandingPrepares(
747+
const folly::SharedMutex::WriteHolder& vbStateLock,
747748
const vbucket_state& vbs,
748749
std::vector<queued_item>&& outstandingPrepares) {
749-
// About to change the durabilityMonitor object, which is guarded by
750-
// stateLock.
751-
folly::SharedMutex::WriteHolder wlh(getStateLock());
752-
753750
// First insert all prepares into the HashTable, updating their type
754751
// to PreparedMaybeVisible to ensure that the document cannot be read until
755752
// the Prepare is re-committed.

engines/ep/src/ep_vb.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,13 @@ class EPVBucket : public VBucket {
180180
* Populates the HashTable and the DurabilityMonitor with the given
181181
* set of queued_items.
182182
*
183+
* @param vbStateLock read lock on the vBucket state.
183184
* @param vbs The vbucket_state read during warmup
184185
* @param outstandingPrepares Sequence of prepared_sync_writes, sorted by
185186
* seqno in ascending order.
186187
*/
187-
void restoreOutstandingPreparesFromWarmup(
188+
void loadOutstandingPrepares(
189+
const folly::SharedMutex::WriteHolder& vbStateLock,
188190
const vbucket_state& vbs,
189191
std::vector<queued_item>&& outstandingPrepares);
190192

engines/ep/src/ephemeral_bucket.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ class EphemeralBucket : public KVBucket {
104104
// No op
105105
}
106106

107+
void loadPreparedSyncWrites(folly::SharedMutex::WriteHolder& vbStateLh,
108+
VBucket& vb) override {
109+
// No op
110+
}
111+
107112
void notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) override;
108113

109114
/**

engines/ep/src/kv_bucket_iface.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,15 @@ class KVBucketIface {
838838
virtual void rollbackUnpersistedItems(VBucket& vb,
839839
int64_t rollbackSeqno) = 0;
840840

841+
/**
842+
* Load the prepared SyncWrites from disk for the given vBucket.
843+
*
844+
* @param vbStateLh vBucket state lock
845+
* @param vb vBucket for which we will load SyncWrites
846+
*/
847+
virtual void loadPreparedSyncWrites(
848+
folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) = 0;
849+
841850
// During the warmup phase we might want to enable external traffic
842851
// at a given point in time.. The LoadStorageKvPairCallback will be
843852
// triggered whenever we want to check if we could enable traffic..

engines/ep/src/warmup.cc

Lines changed: 7 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,6 @@ void logWarmupStats(EPBucket& epstore) {
7676
megabytes_per_seconds);
7777
}
7878

79-
/**
80-
* Returns the ValueFilter to use for KVStore scans, given the bucket
81-
* compression mode.
82-
*/
83-
static ValueFilter getValueFilterForCompressionMode(
84-
const BucketCompressionMode& compressionMode);
85-
8679
//////////////////////////////////////////////////////////////////////////////
8780
// //
8881
// Helper class used to insert data into the epstore //
@@ -1220,119 +1213,17 @@ void Warmup::scheduleLoadPreparedSyncWrites() {
12201213
}
12211214

12221215
void Warmup::loadPreparedSyncWrites(uint16_t shardId) {
1223-
// Perform an in-order scan of the seqno index.
1224-
// a) For each Prepared item found, add to a map of outstanding Prepares.
1225-
// b) For each Committed (via Mutation or Prepare) item, if there's an
1226-
// outstanding Prepare then that prepare has already been Committed,
1227-
// hence remove it from the map.
1228-
//
1229-
// At the end of the scan, all outstanding Prepared items (which did not
1230-
// have a Commit persisted to disk) will be registered with the Durability
1231-
// Monitor.
1232-
1233-
/// Disk load callback for scan.
1234-
struct LoadSyncWrites : public StatusCallback<GetValue> {
1235-
LoadSyncWrites(EPVBucket& vb) : vb(vb) {
1236-
}
1237-
1238-
void callback(GetValue& val) override {
1239-
if (val.item->isPending()) {
1240-
// Pending item which was not aborted (deleted). Add to
1241-
// outstanding Prepare map.
1242-
outstandingPrepares.emplace(val.item->getKey(),
1243-
std::move(val.item));
1244-
return;
1245-
}
1246-
1247-
if (val.item->isCommitted()) {
1248-
// Committed item. _If_ there's an outstanding prepared
1249-
// SyncWrite, remove it (as it has already been committed).
1250-
outstandingPrepares.erase(val.item->getKey());
1251-
return;
1252-
}
1253-
}
1254-
1255-
EPVBucket& vb;
1256-
1257-
/// Map of Document key -> outstanding (not yet Committed / Aborted)
1258-
/// prepares.
1259-
std::unordered_map<StoredDocKey, std::unique_ptr<Item>>
1260-
outstandingPrepares;
1261-
};
1262-
12631216
for (const auto vbid : shardVbIds[shardId]) {
1264-
const auto start = std::chrono::steady_clock::now();
12651217
auto itr = warmedUpVbuckets.find(vbid.get());
12661218
if (itr == warmedUpVbuckets.end()) {
12671219
continue;
12681220
}
1269-
auto& epVb = dynamic_cast<EPVBucket&>(*(itr->second));
1270-
1271-
auto storageCB = std::make_shared<LoadSyncWrites>(epVb);
1272-
1273-
// Don't expect to find anything already in the HashTable, so use
1274-
// NoLookupCallback.
1275-
auto cacheCB = std::make_shared<NoLookupCallback>();
1276-
1277-
// @todo-durability: We can optimise this by starting the scan at the
1278-
// high_committed_seqno - all earlier prepares would have been committed
1279-
// (or were aborted).
1280-
uint64_t startSeqno = 0;
1281-
1282-
auto* kvStore = store.getROUnderlyingByShard(shardId);
1283-
// Use ALL_ITEMS filter for the scan. NO_DELETES is insufficient
1284-
// because (committed) SyncDeletes manifest as a prepared_sync_write
1285-
// (doc on disk not deleted) followed by a commit_sync_write (which
1286-
// *is* marked as deleted as that's the resulting state).
1287-
// We need to see that Commit, hence ALL_ITEMS.
1288-
const auto docFilter = DocumentFilter::ALL_ITEMS;
1289-
const auto valFilter = getValueFilterForCompressionMode(
1290-
store.getEPEngine().getCompressionMode());
1291-
auto* scanCtx = kvStore->initScanContext(
1292-
storageCB, cacheCB, vbid, startSeqno, docFilter, valFilter);
1293-
1294-
// storage problems can lead to a null context, kvstore logs details
1295-
if (!scanCtx) {
1296-
EP_LOG_CRITICAL(
1297-
"Warmup::loadPreparedSyncWrites: scanCtx is null for {}", vbid);
1298-
continue;
1299-
}
1300-
1301-
auto scanResult = kvStore->scan(scanCtx);
1302-
Expects(scanResult == scan_success);
1303-
1304-
kvStore->destroyScanContext(scanCtx);
1305-
1306-
EP_LOG_DEBUG(
1307-
"Warmup::loadPreparedSyncWrites: Identified {} outstanding "
1308-
"prepared SyncWrites for {} in {}",
1309-
storageCB->outstandingPrepares.size(),
1310-
vbid,
1311-
cb::time2text(std::chrono::steady_clock::now() - start));
1312-
1313-
// Insert all outstanding Prepares into the VBucket (HashTable &
1314-
// DurabilityMonitor).
1315-
std::vector<queued_item> prepares;
1316-
for (auto& prepare : storageCB->outstandingPrepares) {
1317-
prepares.emplace_back(std::move(prepare.second));
1318-
}
1319-
// Sequence must be sorted by seqno (ascending) for DurabilityMonitor.
1320-
std::sort(prepares.begin(),
1321-
prepares.end(),
1322-
[](const auto& a, const auto& b) {
1323-
return a->getBySeqno() < b->getBySeqno();
1324-
});
1325-
1326-
// Need the HPS/HCS so the DurabilityMonitor can be fully resumed
1327-
auto vbState = shardVbStates[shardId].find(vbid);
1328-
if (vbState == shardVbStates[shardId].end()) {
1329-
throw std::logic_error(
1330-
"Warmup::loadPreparedSyncWrites: processing " +
1331-
vbid.to_string() + ", but found no vbucket_state");
1332-
}
1333-
const vbucket_state& vbs = vbState->second;
13341221

1335-
epVb.restoreOutstandingPreparesFromWarmup(vbs, std::move(prepares));
1222+
// Our EPBucket function will do the load for us as we re-use the code
1223+
// for rollback.
1224+
auto& vb = *(itr->second);
1225+
folly::SharedMutex::WriteHolder vbStateLh(vb.getStateLock());
1226+
store.loadPreparedSyncWrites(vbStateLh, vb);
13361227
}
13371228

13381229
if (++threadtask_count == store.vbMap.getNumShards()) {
@@ -1572,16 +1463,6 @@ void Warmup::scheduleLoadingKVPairs()
15721463

15731464
}
15741465

1575-
ValueFilter getValueFilterForCompressionMode(
1576-
const BucketCompressionMode& compressionMode) {
1577-
1578-
if (compressionMode != BucketCompressionMode::Off) {
1579-
return ValueFilter::VALUES_COMPRESSED;
1580-
}
1581-
1582-
return ValueFilter::VALUES_DECOMPRESSED;
1583-
}
1584-
15851466
void Warmup::loadKVPairsforShard(uint16_t shardId)
15861467
{
15871468
bool maybe_enable_traffic = false;
@@ -1597,8 +1478,7 @@ void Warmup::loadKVPairsforShard(uint16_t shardId)
15971478
auto cl =
15981479
std::make_shared<LoadValueCallback>(store.vbMap, state.getState());
15991480

1600-
ValueFilter valFilter = getValueFilterForCompressionMode(
1601-
store.getEPEngine().getCompressionMode());
1481+
ValueFilter valFilter = store.getValueFilterForCompressionMode();
16021482

16031483
for (const auto vbid : shardVbIds[shardId]) {
16041484
ScanContext* ctx = kvstore->initScanContext(cb, cl, vbid, 0,
@@ -1640,8 +1520,7 @@ void Warmup::loadDataforShard(uint16_t shardId)
16401520
auto cl =
16411521
std::make_shared<LoadValueCallback>(store.vbMap, state.getState());
16421522

1643-
ValueFilter valFilter = getValueFilterForCompressionMode(
1644-
store.getEPEngine().getCompressionMode());
1523+
ValueFilter valFilter = store.getValueFilterForCompressionMode();
16451524

16461525
for (const auto vbid : shardVbIds[shardId]) {
16471526
ScanContext* ctx = kvstore->initScanContext(cb, cl, vbid, 0,

0 commit comments

Comments
 (0)