Skip to content

Commit c370cd5

Browse files
committed
MB-42029: FollyExecPool: Wait for tasks cancelled in unregisterTaskable
+Issue+ When enabling FollyExecutorPool by default, TSan reports the following race when running ./ep-engine_ep_unit_tests "--gtest_filter=DurabilityRespondAmbiguousTest.*": AuxIO thread: Previous atomic write of size 8 at 0x7b74000020a0 by thread T8: #0 __tsan_atomic64_fetch_sub <null> (libtsan.so.0+0x000000060890) ... #4 ~HashTable kv_engine/engines/ep/src/hash_table.cc:161 (ep-engine_ep_unit_tests+0x00000122cae1) #5 ~VBucket kv_engine/engines/ep/src/vbucket.cc:286 (ep-engine_ep_unit_tests+0x0000012b3af4) #6 ~EPVBucket kv_engine/engines/ep/src/ep_vb.cc:101 (ep-engine_ep_unit_tests+0x0000011af5e1) ... #10 ~VBucketMemoryDeletionTask kv_engine/engines/ep/src/vbucketdeletiontask.cc:45 (ep-engine_ep_unit_tests+0x0000012e4530) ... #17 std::__shared_ptr<GlobalTask>::reset() /usr/local/include/c++/7.3.0/bits/shared_ptr_base.h:1235 (ep-engine_ep_unit_tests+0x000001221e75) #18 FollyExecutorPool::TaskProxy::~TaskProxy()::{lambda()#1}::operator()() kv_engine/engines/ep/src/folly_executorpool.cc:80 (ep-engine_ep_unit_tests+0x000001221e75) Main thread: Write of size 8 at 0x7b74000020a0 by main thread: #0 free <null> (libtsan.so.0+0x000000027806) ... #6 CoreStore<...>::~CoreStore() platform/include/platform/corestore.h:50 (ep-engine_ep_unit_tests+0x0000012988b1) #7 ~EPStats kv_engine/engines/ep/src/stats.cc:132 (ep-engine_ep_unit_tests+0x0000012988b1) #8 ~EventuallyPersistentEngine kv_engine/engines/ep/src/ep_engine.cc:6593 (ep-engine_ep_unit_tests+0x0000011e3bb5) ... #12 DurabilityRespondAmbiguousTest_RespondAmbiguousNotificationDeadLock_Test::TestBody() kv_engine/engines/ep/tests/module_tests/evp_store_durability_test.cc:2350 (ep-engine_ep_unit_tests+0x000000bd3642) The crux of this issue seems to be that a background AuxIO task run via the FollyExecutorPool is deleting a VBucket object concurrently with the main thread deleting an EPStats object. +Background+ Details of how CB3ExecutorPool and FollyExecutorPool implement {{unregisterTaskable()}}, which I believe is what leads to this problem. CB3ExecutorPool: During CB3ExecutorPool::unregisterTaskable(): 1. CB3ExecutorPool::_stopTaskGroup() is called and will wait for VBucketMemoryDeletionTask to run. 2. When VBucketMemoryDeletionTask::run() is call it returns false. 3. CB3ExecutorThread will then synchronously call CB3ExecutorThread::cancelCurrentTask() -> CB3ExecutorPool::cancel(). That removes all Cb3ExecutorPool-owned references to task, and hence will run VBucketMemoryDeletionTask dtor. 4. VBucketMemoryDeletionTask dtor frees the VBucket object. As such, by the time CB3ExecutorPool::unregisterTaskable() returns the VBucket is *guaranteed* to have been freed. FollyExecutorPool: During FollyExecutorPool::unregisterTaskable(): 1. All tasks scheduled to run in future (owned by IO thread EventBase) are either cancelled (if allowed), or woken to run asap on CPU pool. 2. All tasks waiting to, or currently running on CPU pool are waited for by polling for taskOwners to no longer contain any tasks for the given taskable. 3. (On the CPU threads) Each queued task is run, on completion rescheduleTaskAfterRun is called to add work to the IO thread EventBase to decide when to reschedule, or (in this case) to actualy cancel the task. 4. (On the IO thread) FollyExecutorPool::rescheduleTaskAfterRun is called, for cancelled tasks this calls State::cancelTask() which removes the task from taskOwners - at which point TaskProxy dtor runs, which schedules _another_ task on CPU pool to actually delete the GlobalTask. The problem here is that the TaskProxy is removed from taskOwners and deletes at (4) on the IO thread; however the GlobalTask destruction is deferred to later execution on a CPU thread. As such, FollyExecutorPool::unregisterTaskable() can see taskOwners having no tasks left for the taskable (and hence return) _before_ the VBucket object is deleted. Note the deferred deletion at (5) was added to avoid potentially large amounts of work being done on the IO thread - we aim to minimise work done here as it can impact the scheduling of other tasks. +Solution+ If the TaskProxy removal from taskOwners is deferred until _after_ the GlobalTask shared ownership is released, then unregisterTaskable() will no longer return until all GlobalTask references inside FollyExecutorPool have been released. To achieve this changes the ownership model in FollyExecutorPool are needed: 1. Don't immediately remove TaskProxy from taskOwners in cancelTask(). Instead: a) Mark it as cancelled, by setting the GlobalTask ptr to null, b) Schedule an asynchronous task to release its GlobalTask shared_ptr. 2. This new async task (setup in resetTaskPtrViaCpuPool) releases the TaskProxy's shared ownership on GlobalTask, then schedules an (IO thread) completion task to finally remove the TaskProxy from taskOwners. unregisterTaskable() is unchanged - it still waits for the taskOwner map for the given Taskable become empty; however given the above changes that only happens once the GlobalTask reference has been released. Change-Id: Iecbff9f3b45fc9e3d385c67f6a6dd32242dc76fe Reviewed-on: http://review.couchbase.org/c/kv_engine/+/138373 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Jim Walker <jim@couchbase.com>
1 parent d541aaa commit c370cd5

File tree

4 files changed

+250
-46
lines changed

4 files changed

+250
-46
lines changed

engines/ep/src/folly_executorpool.cc

Lines changed: 200 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include <platform/string_hex.h>
3030
#include <statistics/collector.h>
3131

32+
using namespace std::string_literals;
33+
3234
/**
3335
* Thread factory for CPU pool threads.
3436
* - Gives each thread name based on the given prefix.
@@ -62,23 +64,21 @@ struct FollyExecutorPool::TaskProxy
6264
public std::enable_shared_from_this<TaskProxy> {
6365
TaskProxy(FollyExecutorPool& executor,
6466
folly::CPUThreadPoolExecutor& pool,
65-
ExTask task)
66-
: task(std::move(task)), executor(executor), cpuPool(pool) {
67+
ExTask task_)
68+
: task(std::move(task_)),
69+
taskId(task->getId()),
70+
executor(executor),
71+
cpuPool(pool) {
6772
}
6873

6974
~TaskProxy() override {
70-
// We are potentially the last (shared) owner of the GlobalTask,
71-
// whose destruction may perform an arbitrary amount of work which we
72-
// don't want to run on a non-CPU thread. As such, perform the actual
73-
// destructor on the appropriate CPU pool.
74-
ExTask taskToDelete;
75-
task.swap(taskToDelete);
76-
cpuPool.add([taskToDelete = std::move(taskToDelete)]() mutable {
77-
// We must account the destruction of the GlobalTask to the bucket
78-
// which owns it.
79-
BucketAllocationGuard guard(taskToDelete->getEngine());
80-
taskToDelete.reset();
81-
});
75+
// To ensure that we do not destruct GlobalTask objects on
76+
// arbitrary threads (if the TaskProxy is the last owner), we
77+
// should have already called task.reset() before desturction
78+
// of the TaskProxy.
79+
Expects(!task &&
80+
"task shared_ptr should already be empty before destructing "
81+
"TaskProxy");
8282
}
8383

8484
void timeoutExpired() noexcept override {
@@ -136,11 +136,7 @@ struct FollyExecutorPool::TaskProxy
136136
// Perform work on the appropriate CPU pool.
137137
// Note this retains a reference to itself (TaskProxy).
138138
cpuPool.add([proxy = shared_from_this()] {
139-
if (!proxy->task) {
140-
// ExTask has been set to null - Taskable likely unregistered
141-
// - nothing to do.
142-
return;
143-
}
139+
Expects(proxy->task.get());
144140

145141
bool runAgain = false;
146142
// Check if Task is still alive. If not don't run.
@@ -199,6 +195,41 @@ struct FollyExecutorPool::TaskProxy
199195
});
200196
}
201197

198+
void resetTaskPtrViaCpuPool() {
199+
using namespace std::chrono;
200+
201+
EP_LOG_TRACE(
202+
"TaskProxy::resetTaskPtrViaCpuPool() id:{} name:{} descr:'{}' "
203+
"enqueuing func to reset 'task' shared_ptr",
204+
task->getId(),
205+
GlobalTask::getTaskName(task->getTaskId()),
206+
task->getDescription());
207+
208+
// Move `task` from this object (leaving it as null)
209+
cpuPool.add([ptrToReset = std::move(task), proxy = this]() mutable {
210+
EP_LOG_TRACE(
211+
"FollyExecutorPool::resetTaskPtrViaCpuPool lambda() id:{} "
212+
"name:{}",
213+
ptrToReset->getId(),
214+
GlobalTask::getTaskName(ptrToReset->getTaskId()));
215+
216+
// Reset the shared_ptr, decrementing it's refcount and potentially
217+
// deleting the owned object if no other objects (Engine etc) have
218+
// retained a refcount.
219+
// Must account this to the relevent bucket.
220+
{
221+
BucketAllocationGuard guard(ptrToReset->getEngine());
222+
ptrToReset.reset();
223+
}
224+
// Finally, remove the taskProxy from taskOwners.
225+
proxy->executor.futurePool->getEventBase()->runInEventBaseThread(
226+
[proxy] {
227+
auto& executor = proxy->executor;
228+
executor.removeTaskAfterRun(*proxy);
229+
});
230+
});
231+
}
232+
202233
/**
203234
* Updates the timeout to the value of the GlobalTasks' wakeTime
204235
*/
@@ -234,6 +265,15 @@ struct FollyExecutorPool::TaskProxy
234265
Expects(executor.futurePool->getEventBase()
235266
->inRunningEventBaseThread());
236267

268+
if (!task) {
269+
// Task has been cancelled ('task' shared ptr reset to
270+
// null via resetTaskPtrViaCpuPool), but TaskProxy not yet
271+
// been cleaned up) - i.e. a wake and cancel have raced.
272+
// Cannot wake (until GlobalTask is re-scheduled) -
273+
// return.
274+
return;
275+
}
276+
237277
// Cancel any previously set future execution of the
238278
// task.
239279
cancelTimeout();
@@ -254,10 +294,20 @@ struct FollyExecutorPool::TaskProxy
254294
/// shared_ptr to the GlobalTask.
255295
ExTask task;
256296

297+
// identifier of the task. This is a copy of data within
298+
// `task`, because we reset `task` before removing the TaskProxy
299+
// entry from taskOwners, and need the taskId for that.
300+
const size_t taskId;
301+
257302
// Flag used to block re-scheduling of a task while it's in the process
258303
// of running on CPU pool. See comments in timeoutExpired().
259304
bool scheduledOnCpuPool{false};
260305

306+
// Did we re-use the same TaskProxy after a re-schedule? Used for
307+
// sanity checking if removeTaskAfterRun() finds a non-null 'task'
308+
// when executed.
309+
bool proxyReused{false};
310+
261311
private:
262312
// Associated FollyExecutorPool (needed for re-scheduling / cancelling
263313
// dead tasks).
@@ -297,7 +347,7 @@ using TaskOwnerMap = std::unordered_map<const Taskable*, TaskLocator>;
297347
*/
298348
struct FollyExecutorPool::State {
299349
void addTaskable(Taskable& taskable) {
300-
taskOwners.insert({&taskable, {}});
350+
taskOwners.try_emplace(&taskable);
301351
}
302352

303353
void removeTaskable(Taskable& taskable) {
@@ -310,20 +360,35 @@ struct FollyExecutorPool::State {
310360
* @param executor FollyExecutorPool owning the task.
311361
* @param task The Task to schedule.
312362
*/
313-
void scheduleTask(FollyExecutorPool& executor,
363+
bool scheduleTask(FollyExecutorPool& executor,
314364
folly::CPUThreadPoolExecutor& pool,
315365
ExTask task) {
316366
auto& tasksForOwner = taskOwners.at(&task->getTaskable());
317-
auto result = tasksForOwner.try_emplace(
318-
task->getId(),
319-
std::make_shared<TaskProxy>(executor, pool, task));
320-
auto& it = result.first;
367+
auto [it, inserted] = tasksForOwner.try_emplace(
368+
task->getId(), std::shared_ptr<TaskProxy>{});
369+
if (!inserted) {
370+
// taskId already present - i.e. this taskId has already been
371+
// scheduled.
372+
// It it only valid to re-schedule a task if it was previously
373+
// cancelled, but we hadn't cleaned up the cancellation - the
374+
// 'task' shared_ptr is null.
375+
if (it->second->task) {
376+
return false;
377+
}
378+
// re-assign task to the one passed in.
379+
it->second->task = task;
380+
it->second->proxyReused = true;
381+
} else {
382+
// Inserted a new entry into map - create a TaskProxy object for it.
383+
it->second = std::make_shared<TaskProxy>(executor, pool, task);
384+
}
321385

322386
// If we are rescheduling a previously cancelled task, we should
323387
// reset the task state to the initial value of running.
324-
task->setState(TASK_RUNNING, TASK_DEAD);
388+
it->second->task->setState(TASK_RUNNING, TASK_DEAD);
325389

326390
it->second->updateTimeoutFromWakeTime();
391+
return true;
327392
}
328393

329394
/**
@@ -389,8 +454,13 @@ struct FollyExecutorPool::State {
389454
std::vector<ExTask> cancelTasksOwnedBy(const Taskable& taskable,
390455
bool force) {
391456
std::vector<ExTask> removedTasks;
392-
for (auto it : taskOwners.at(&taskable)) {
457+
for (auto& it : taskOwners.at(&taskable)) {
393458
auto& tProxy = it.second;
459+
if (!tProxy->task) {
460+
// Task already cancelled (shared pointer reset to null) by
461+
// canelTask() - skip.
462+
continue;
463+
}
394464
EP_LOG_DEBUG(
395465
"FollyExecutorPool::unregisterTaskable(): Stopping "
396466
"Task id:{} taskable:{} description:'{}'",
@@ -415,13 +485,12 @@ struct FollyExecutorPool::State {
415485
}
416486

417487
/**
418-
* Cancel the specified task, optionally removing it from taskOwners.
488+
* Cancel the specified task.
419489
*
420490
* @param taskId Task to cancel
421-
* @param eraseTask If true then erase the task from taskOwners.
422491
* @return True if task found, else false.
423492
*/
424-
bool cancelTask(size_t taskId, bool eraseTask) {
493+
bool cancelTask(size_t taskId) {
425494
// Search for the given taskId across all buckets.
426495
// PERF: CB3ExecutorPool uses a secondary map (taskLocator)
427496
// to allow O(1) lookup by taskId, however cancel() isn't a
@@ -438,14 +507,70 @@ struct FollyExecutorPool::State {
438507
taskId,
439508
owner->getName());
440509

510+
if (!it->second->task) {
511+
// Task already cancelled (shared pointer reset to null) by
512+
// some previous call to cancelTask().
513+
return false;
514+
}
515+
441516
it->second->task->cancel();
442-
if (eraseTask) {
443-
tasks.erase(it);
517+
518+
// Now `task` has been cancelled, we need to remove our
519+
// reference (shared ownership) to the owned GlobalTask and from
520+
// taskOwners. Decrementing our refcount could delete the
521+
// GlobalTask (if we are the last owner). This must occur on a
522+
// CPU thread given GlobalTask destruction can be an arbitrary
523+
// amount of work.
524+
if (it->second->scheduledOnCpuPool) {
525+
// Currently scheduled on CPU pool - TaskProxy is "owned" by
526+
// CPU thread. Given we just called cancel() on the
527+
// GlobalTask, we can rely on rescheduleTaskAfterRun to
528+
// reset the TaskProxy (when it calls cancelTask()).
529+
return true;
444530
}
445-
// Note: We could potentially always erase here, given
446-
// that this is running in the eventBase and hence
447-
// there's no issue of racing with ourselves like
448-
// there is for CB3ExecutorPool::_cancel.
531+
532+
// Not currently scheduled on CPU pool - this thread (IO pool)
533+
// owns it. To perform refcount drop on CPu thread, we move the
534+
// shared_ptr from taskOwners (taskOwners entry remains but is
535+
// null), then pass the moved shared_ptr to CPU pool to perform
536+
// refcount decrement (and potential GlobalTask
537+
// (IO pool) owns it.
538+
// First cancel any pending timeout - shouldn't run again.
539+
it->second->cancelTimeout();
540+
541+
// Next, to perform refcount drop on CPU thread, we move the
542+
// shared_ptr from taskOwners (taskOwners entry remains but is
543+
// null), then pass the moved shared_ptr to CPU pool to perform
544+
// refcount decrement (and potential GlobalTask deletion).
545+
// Finally CPU pool will schedule a final IO thread function to
546+
// actually erase element from taskOwners.
547+
it->second->resetTaskPtrViaCpuPool();
548+
return true;
549+
}
550+
}
551+
return false;
552+
}
553+
554+
/**
555+
* Remove the cancelled task from taskOwners.
556+
*
557+
* @param taskId Task to remove
558+
* @return True if task found, else false.
559+
*/
560+
bool removeTask(size_t taskId) {
561+
for (auto& [owner, tasks] : taskOwners) {
562+
auto it = tasks.find(taskId);
563+
if (it != tasks.end()) {
564+
EP_LOG_TRACE(
565+
"FollyExecutorPool::State::removeTask() erasing task "
566+
"id:{} for "
567+
"owner:'{}'",
568+
taskId,
569+
owner->getName());
570+
Expects(!it->second->task &&
571+
"removeTask: 'proxy->task' should be null before "
572+
"removing element from taskOwners");
573+
tasks.erase(it);
449574
return true;
450575
}
451576
}
@@ -731,8 +856,8 @@ bool FollyExecutorPool::cancel(size_t taskId, bool eraseTask) {
731856
auto* eventBase = futurePool->getEventBase();
732857
bool found = false;
733858
eventBase->runInEventBaseThreadAndWait(
734-
[&found, state = state.get(), taskId, eraseTask] {
735-
state->cancelTask(taskId, eraseTask);
859+
[&found, state = state.get(), taskId] {
860+
state->cancelTask(taskId);
736861
});
737862
return found;
738863
}
@@ -941,14 +1066,11 @@ void FollyExecutorPool::rescheduleTaskAfterRun(
9411066
// Deschedule the task, in case it was already scheduled
9421067
proxy->cancelTimeout();
9431068

944-
// Decrement the ref-count on the task, and remove from taskLocator
945-
// (same pattern as CB3ExecutorPool -
946-
// ExecutorThread::cancelCurrentTask())
947-
state->cancelTask(proxy->task->getId(), true);
1069+
// Begin process of cancelling the task - mark as cancelled in
1070+
// taskOwners and schedule another CPU thread pool function to decrement
1071+
// the refcount and potentially delete the GlobalTask.
1072+
state->cancelTask(proxy->task->getId());
9481073

949-
// At this point the ref-count on the TaskProxy is still at least one,
950-
// via the `proxy` object. On return that will go out of scope so
951-
// object may be deleted (if no other object has a ref-count).
9521074
return;
9531075
}
9541076

@@ -963,3 +1085,36 @@ void FollyExecutorPool::rescheduleTaskAfterRun(
9631085
proxy->scheduleViaCPUPool();
9641086
}
9651087
}
1088+
1089+
void FollyExecutorPool::removeTaskAfterRun(TaskProxy& proxy) {
1090+
EP_LOG_TRACE("TaskProxy::removeTaskAfterRun() id:{} name:{}",
1091+
proxy.taskId,
1092+
proxy.task ? ("RESURRECTED:"s +
1093+
GlobalTask::getTaskName(proxy.task->getTaskId()))
1094+
: "<null>"s);
1095+
1096+
if (proxy.task) {
1097+
Expects(proxy.proxyReused);
1098+
return;
1099+
}
1100+
1101+
// Deschedule the task, in case it was already scheduled
1102+
proxy.cancelTimeout();
1103+
1104+
// Erase the task from taskOwners. If the TaskProxy is the last
1105+
// shared owner of the GlobalTask, that will be deleted here.
1106+
1107+
// PERF: CB3ExecutorPool uses a secondary map (taskLocator) to
1108+
// allow O(1) lookup of ownwe by taskId, however cancelling
1109+
// isn't a particularly hot function so I'm not sure if the
1110+
// extra complexity is warranted. If this shows up as hot then
1111+
// consider adding a similar structure to FollyExecutorPool.
1112+
bool taskFound = state->removeTask(proxy.taskId);
1113+
if (!taskFound) {
1114+
auto msg = fmt::format(
1115+
"FollyExecutorPool::rescheduleTaskAfterRun(): Failed to locate "
1116+
"an owner for task id:{}",
1117+
proxy.taskId);
1118+
throw std::logic_error(msg);
1119+
}
1120+
}

engines/ep/src/folly_executorpool.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ class FollyExecutorPool : public ExecutorPool {
186186
/// the task is dead (or should run again).
187187
void rescheduleTaskAfterRun(std::shared_ptr<TaskProxy> proxy);
188188

189+
/// Remove the given taskProxy from the tracked tasks.
190+
/// Should only be called at the end of scheduleViaCPUPool.
191+
void removeTaskAfterRun(TaskProxy& proxy);
192+
189193
struct State;
190194
/**
191195
* FollyExecutorPool internal state. unique_ptr for pimpl.

engines/ep/src/globaltask.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class GlobalTask {
143143
*
144144
* @return The id of this task.
145145
*/
146-
TaskId getTaskId() {
146+
TaskId getTaskId() const {
147147
return taskId;
148148
}
149149

0 commit comments

Comments
 (0)