Skip to content

Commit 10623d7

Browse files
committed
Be explicit about RPA CoTask lifecycle; don't use thread_local
Use of thread_local CoTask to parallelize RPA indexing was added by a recent commit: e4b01d7 The situation is fragile and there is potential for UB here due to static initialization hell. Instead, we explicitly create the CoTask in the DownloadBlocksTask thread and it lives as a DownloadBlocksTask member variable, which is more correct and less bugprone.
1 parent 6bfae61 commit 10623d7

File tree

3 files changed

+23
-20
lines changed

3 files changed

+23
-20
lines changed

src/BlockProc.cpp

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
/* static */ const TxHash PreProcessedBlock::nullhash;
3535

3636
/// fill this struct's data with all the txdata, etc from a bitcoin CBlock. Alternative to using the second c'tor.
37-
void PreProcessedBlock::fill(BlockHeight blockHeight, size_t blockSize, const bitcoin::CBlock &b, const bool enableRpa) {
37+
void PreProcessedBlock::fill(BlockHeight blockHeight, size_t blockSize, const bitcoin::CBlock &b, CoTask *rpaTask) {
3838
if (!header.IsNull() || !txInfos.empty())
3939
clear();
4040
height = blockHeight;
@@ -45,17 +45,8 @@ void PreProcessedBlock::fill(BlockHeight blockHeight, size_t blockSize, const bi
4545
std::unordered_map<TxHash, unsigned, HashHasher> txHashToIndex; // since we know the size ahead of time here, we can set max_load_factor to 1.0 and avoid over-allocating the hash table
4646
txHashToIndex.max_load_factor(1.0);
4747
txHashToIndex.reserve(b.vtx.size());
48-
static thread_local std::optional<CoTask> rpaTask;
4948
std::optional<CoTask::Future> rpaFut; // NB: rpaFut will auto-wait for work (if any) to complete as part of its d'tor
50-
if (enableRpa) {
51-
if (!rpaTask) {
52-
QString threadName = Util::ThreadName::Get();
53-
if (threadName.isEmpty()) threadName = "???"; // this should ideally not happen, but is here for defensive programming
54-
// Create a new CoTask into TLS. It will be destructed when the current thread exits.
55-
// The assumption here is that the DownloadBlocksTask is calling us and its threads stick around for a while
56-
// as blocks are downloaded.
57-
rpaTask.emplace(QString("RPA CoTask[%1]").arg(threadName));
58-
}
49+
if (rpaTask) {
5950
// Do RPA-related hashing and processing in the rpaTask's thread in parallel (really pays off for 1-off blocks)
6051
rpaFut = rpaTask->submitWork([&b, this]{
6152
// Process the first 30 inputs for each non-coinbase block txn
@@ -278,9 +269,9 @@ QString PreProcessedBlock::toDebugString() const
278269

279270
/// convenience factory static method: given a block, return a shard_ptr instance of this struct
280271
/*static*/
281-
PreProcessedBlockPtr PreProcessedBlock::makeShared(unsigned height_, size_t size, const bitcoin::CBlock &block, bool enableRpa)
272+
PreProcessedBlockPtr PreProcessedBlock::makeShared(unsigned height_, size_t size, const bitcoin::CBlock &block, CoTask *rpaTask)
282273
{
283-
return std::make_shared<PreProcessedBlock>(height_, size, block, enableRpa);
274+
return std::make_shared<PreProcessedBlock>(height_, size, block, rpaTask);
284275
}
285276

286277

src/BlockProc.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <unordered_set>
3434
#include <vector>
3535

36+
class CoTask;
3637

3738
struct PreProcessedBlock;
3839
using PreProcessedBlockPtr = std::shared_ptr<PreProcessedBlock>; ///< For clarity/convenience
@@ -163,17 +164,17 @@ struct PreProcessedBlock
163164

164165
// c'tors, etc... note this class is fully copyable and moveable
165166
PreProcessedBlock() = default;
166-
PreProcessedBlock(BlockHeight bheight, size_t rawBlockSizeBytes, const bitcoin::CBlock &b, bool enableRpaIndexing) {
167-
fill(bheight, rawBlockSizeBytes, b, enableRpaIndexing);
167+
PreProcessedBlock(BlockHeight bheight, size_t rawBlockSizeBytes, const bitcoin::CBlock &b, CoTask *rpaTask /* nullable */) {
168+
fill(bheight, rawBlockSizeBytes, b, rpaTask);
168169
}
169170
/// reset this to empty
170171
inline void clear() { *this = PreProcessedBlock(); }
171172
/// fill this block with data from bitcoin's CBlock
172-
void fill(BlockHeight blockHeight, size_t rawSizeBytes, const bitcoin::CBlock &b, bool enableRpaIndexing);
173+
void fill(BlockHeight blockHeight, size_t rawSizeBytes, const bitcoin::CBlock &b, CoTask *rpaTask /* nullable */);
173174

174175
/// convenience factory static method: given a block, return a shard_ptr instance of this struct
175176
static PreProcessedBlockPtr makeShared(unsigned height, size_t sizeBytes, const bitcoin::CBlock &block,
176-
bool enableRpaIndexing);
177+
CoTask *rpaTask /* nullable */);
177178

178179
/// debug string
179180
QString toDebugString() const;

src/Controller.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "Controller.h"
2323
#include "Controller_SynchDSPsTask.h"
2424
#include "Controller_SynchMempoolTask.h"
25+
#include "CoTask.h"
2526
#include "Mempool.h"
2627
#include "SubsMgr.h"
2728
#include "ThreadPool.h"
@@ -495,6 +496,7 @@ struct DownloadBlocksTask : CtlTask
495496
const bool allowMimble; ///< like above, but if true we allow mimblewimble (litecoin)
496497
const bool allowCashTokens; ///< allow special cashtoken deserialization rules (BCH only)
497498
const int rpaStartHeight; ///< if >= 0, rpa data will be indexed in PreProcessedBlock, starting at this height.
499+
std::optional<CoTask> rpaTask; ///< this gets created only at the point where current block height >= rpaStartHeight && rpaStartHeight > -1
498500

499501
void do_get(unsigned height);
500502

@@ -705,9 +707,18 @@ void DownloadBlocksTask::do_get(unsigned int bnum)
705707
// be out-of-synch due to configuration change, etc).
706708
VarDLTaskResult DownloadBlocksTask::process_block_guts(unsigned bnum, const QByteArray &rawblock, const bitcoin::CBlock &cblock)
707709
{
708-
const bool indexRpaForThisBlock = rpaStartHeight >= 0 && bnum >= unsigned(rpaStartHeight);
709-
auto ppb = PreProcessedBlock::makeShared(bnum, size_t(rawblock.size()), cblock, indexRpaForThisBlock);
710-
if (UNLIKELY(rpaStartHeight >= 0 && bnum == unsigned(rpaStartHeight))) {
710+
CoTask * rpaTaskIfEnabledForThisBlock = nullptr;
711+
// Determine if RPA indexing is enabled for this block, and if so, ensure this->rpaTask is created and pass down a
712+
// pointer to it. The non-null ptr then tells PreProcessedBlock to index RPA data for this block.
713+
const bool rpaIsEnabledForThisBlock = rpaStartHeight >= 0 && bnum >= unsigned(rpaStartHeight);
714+
if (rpaIsEnabledForThisBlock) {
715+
if (!rpaTask) rpaTask.emplace(QString("RPA CoTask[%1]").arg(objectName()));
716+
rpaTaskIfEnabledForThisBlock = &*rpaTask;
717+
}
718+
719+
auto ppb = PreProcessedBlock::makeShared(bnum, size_t(rawblock.size()), cblock, rpaTaskIfEnabledForThisBlock);
720+
721+
if (UNLIKELY(rpaIsEnabledForThisBlock && bnum == unsigned(rpaStartHeight))) {
711722
Util::AsyncOnObject(ctl, [height = rpaStartHeight]{
712723
// We do this in the Controller thread to make the log look pretty, since all other logging
713724
// user sees at this point is from the Controller thread anyway ...

0 commit comments

Comments
 (0)