Skip to content

Threading Model

joe maley edited this page Oct 15, 2020 · 15 revisions

Overview

TileDB allocates two thread pools per context: a compute thread pool and an IO thread pool. As the names suggest, the compute thread pool is used for executing compute-bound tasks while the IO thread pool is used for executing IO-bound tasks. By default, each thread pool has a concurrency level equal to the number of hardware threads. For example: on a system with 12 hardware threads, both thread pools will have a concurrency level of 12. The motivation for this configuration is to ensure that each hardware thread has one TileDB software thread with CPU-heavy work. We keep the TileDB software threads for IO-bound tasks in their own thread pool so that the programmer does not need to worry about overloading a single shared thread pool with IO-bound tasks that may block pending CPU-bound tasks from performing useful work while the scheduled IO-bound tasks are idle-waiting.

The default concurrency level for each thread pool can be modified with the following configuration parameters:
"sm.compute_concurrency_level"
"sm.io_concurrency_level"

We use the term "concurrency level" so that we have the flexibility to adjust the number of allocated software threads in our thread pool implementation independent of the user configuration. At the time of writing, a concurrency level of N allocates N-1 software threads.

Thread Pool Usage

Synchronous Usage

The following example illustrates typical synchronous usage. The code snippet does the following:

  • The calling thread creates a thread pool with a concurrency level of 2.
  • The calling thread executes two tasks on the thread pool.
  • The calling thread synchronously waits for both tasks to complete.
void foo() {
  ThreadPool tp;
  tp.init(2);

  std::vector<ThreadPool::Task> tasks;

  std::atomic<int> my_int;

  tasks.emplace_back(tp.execute([]() {
    my_int += 1;
  }));

  tasks.emplace_back(tp.execute([]() {
    my_int += 2;
  }));

  tp.wait_all(tasks);

  assert(my_int == 3);

  std::cout << "DONE" << std::endl;
}

The above snippet is valid for any non-zero concurrency level. By initializing the thread pool with a concurrency level of 2, the thread pool allows for up to two tasks may execute concurrently.

Asynchronous Usage

The following example illustrates typical asynchronous usage. The code snippet does the following:

  • The calling thread creates a thread pool with a concurrency level of 2.
  • The calling thread executes two tasks on the thread pool, where each task asynchronously invokes a callback bar.
  • The calling thread does not wait for the tasks to complete. Instead, the completed state is reached when the last callback is executed.
std::atomic<int> g_my_int;

void bar(int my_int) {
  int prev = g_my_int.fetch_add(my_int);
  if (prev + my_int == 3)
    std::cout << "DONE" << std::endl;
}

void foo() {
  ThreadPool tp;
  tp.init(2);

  tp.execute([]() {
    bar(1);
  });

  tp.execute([]() {
    bar(2);
  });

  // Assume `tp` remains in-scope until both tasks have completed.
}

Recursive Usage (Single Instance)

The ThreadPool supports arbitrary recursive usage. We use the term "arbitrary" to indicate that the programmer does not need to worry about dead-lock if the total number of scheduled tasks exceeds the concurrency level.

The following example illustrates arbitrary recursive usage. The code snippet does the following:

  • The calling thread creates a thread pool with a concurrency level of 2.
  • The calling thread executes Task #1 and waits.
  • Task #1 executes Task #2 and waits.
  • Task #2 executes Task #3 and waits.
  • Task #3 executes.
void foo() {
  ThreadPool tp;
  tp.init(2);

  bool done = false;
  ThreadPool::Task task_1 = tp.execute([&]() {
    ThreadPool::Task task_2 = tp.execute([&]() {
      ThreadPool::Task task_3 = tp.execute([&]() {
        done = true;
      });
      tp.wait_all(task_3);
    });
    tp.wait_all(task_2);
  });
  tp.wait_all(task_1);

  assert(done == true);
}

The above is valid for any non-zero concurrency level. If you're familiar with non-recursive thread pools, you may expect dead-lock when executing the third task. If a non-recursive thread pool has two threads, tasks #1 and task #2 would be idle-waiting so there would not be a thread available to execute task #3. In our thread pool implementation, tasks executing within tp that execute another, inner-task on tp will break this deadlock by invoking the inner-task directly.

Recursive Usage (Multiple Instances)

The ThreadPool supports arbitrary recursive usage among multiple thread pool instances. This is important because the programmer does not need to worry about how the tasks on the compute thread pool and IO thread pool are nested.

The following example illustrates arbitrary recursive usage. The code snippet does the following:

  • The calling thread creates two thread pools with a concurrency level of 2.
  • Tasks #1 - Task #5 are executed recursively among the two thread pool instances.
void foo() {
  ThreadPool tp_a;
  ThreadPool tp_b;
  tp_a.init(2);
  tp_b.init(2);

  bool done = false;
  ThreadPool::Task task_1 = tp_a.execute([&]() {
    ThreadPool::Task task_2 = tp_b.execute([&]() {
      ThreadPool::Task task_3 = tp_a.execute([&]() {
        ThreadPool::Task task_4 = tp_b.execute([&]() {
          ThreadPool::Task task_5 = tp_a.execute([&]() {
            done = true;
          });
          tp_a.wait_all(task_4);
        });
        tp_b.wait_all(task_4);
      });
      tp_a.wait_all(task_3);
    });
    tp_b.wait_all(task_2);
  });
  tp_a.wait_all(task_1);

  assert(done == true);
}

Thread Pool Architecture

The ThreadPool class was designed to the following:

  • Prefer throughput to latency.
  • Optimize for synchronous execute-and-wait usage (see above).
  • Allow for recursive usage of a single ThreadPool instance within its own tasks.
  • Allow for arbitrary, recursive nesting among different ThreadPool instances.

TODO

Parallel Routines

For convenience, TileDB provides 3 high-level parallel routines: parallel_sort, parallel_for, and parallel_for_2d. Each of these routines operates on a single ThreadPool instance, determined by the caller.

parallel_sort

/**
 * Sort the given iterator range, possibly in parallel.
 *
 * @tparam IterT Iterator type
 * @tparam CmpT Comparator type
 * @param tp The threadpool to use.
 * @param begin Beginning of range to sort (inclusive).
 * @param end End of range to sort (exclusive).
 * @param cmp Comparator.
 */
template <
    typename IterT,
    typename CmpT = std::less<typename std::iterator_traits<IterT>::value_type>>
void parallel_sort(
    ThreadPool* const tp, IterT begin, IterT end, const CmpT& cmp = CmpT());

TODO

parallel_for

/**
 * Call the given function on each element in the given iterator range.
 *
 * @tparam IterT Iterator type
 * @tparam FuncT Function type (returning Status).
 * @param tp The threadpool to use.
 * @param begin Beginning of range (inclusive).
 * @param end End of range (exclusive).
 * @param F Function to call on each item
 * @return Vector of Status objects, one for each function invocation.
 */
template <typename FuncT>
std::vector<Status> parallel_for(
    ThreadPool* const tp, uint64_t begin, uint64_t end, const FuncT& F);

TODO

parallel_for_2d

/**
 * Call the given function on every pair (i, j) in the given i and j ranges,
 * possibly in parallel.
 *
 * @tparam FuncT Function type (returning Status).
 * @param tp The threadpool to use.
 * @param i0 Inclusive start of outer (rows) range.
 * @param i1 Exclusive end of outer range.
 * @param j0 Inclusive start of inner (cols) range.
 * @param j1 Exclusive end of inner range.
 * @param F Function to call on each (i, j) pair.
 * @return Vector of Status objects, one for each function invocation.
 */
template <typename FuncT>
std::vector<Status> parallel_for_2d(
    ThreadPool* const tp,
    uint64_t i0,
    uint64_t i1,
    uint64_t j0,
    uint64_t j1,
    const FuncT& F);

TODO

images/todo.png

Clone this wiki locally