@@ -96,12 +96,16 @@ graph TD
96
96
#include < boost/asio.hpp>
97
97
#include < iostream>
98
98
99
+ std::mutex m;
100
+
99
101
void print_task (int n) {
100
- std::cout << "Task " << n << " is running." << std::endl;
102
+ std::lock_guard< std::mutex > lc{ m };
103
+ std::cout << "Task " << n << " is running on thr: " <<
104
+ std::this_thread::get_id() << '\n';
101
105
}
102
106
103
107
int main() {
104
- boost::asio::thread_pool pool{4 }; // 创建一个包含 4 个线程的线程池
108
+ boost::asio::thread_pool pool{ 4 }; // 创建一个包含 4 个线程的线程池
105
109
106
110
for (int i = 0; i < 10; ++i) {
107
111
boost::asio::post(pool, [i] { print_task(i); });
@@ -111,7 +115,7 @@ int main() {
111
115
}
112
116
```
113
117
114
- > [运行](https://godbolt.org/z/Pa3z1oYej )测试。
118
+ > [运行](https://godbolt.org/z/41445Kab5 )测试。
115
119
116
120
- 创建线程池时,指定线程数量,线程池会创建对应数量的线程。
117
121
@@ -163,11 +167,11 @@ thread_pool::~thread_pool()
163
167
boost::asio::thread_pool pool{ 4 };
164
168
165
169
for (int i = 0; i < 10; ++i) {
166
- boost::asio::post(pool, [ i] ( ) { print_task(i); });
170
+ boost::asio::post(pool, [ i] { print_task(i); });
167
171
}
168
172
```
169
173
170
- > [ 运行] ( https://godbolt.org/z/haPqKb1h7 ) 测试。
174
+ > [ 运行] ( https://godbolt.org/z/MPoxrY9Yo ) 测试。
171
175
172
176
因为析构函数并不是阻塞直到执行完所有任务,而是先** 停止** ,再 ` join() ` 以及 ` shutdown() ` 。
173
177
@@ -345,17 +349,13 @@ public:
345
349
start();
346
350
}
347
351
348
- ~ThreadPool (){
352
+ ~ThreadPool () {
349
353
stop ();
350
- join();
351
354
}
352
355
353
356
void stop() {
354
357
stop_.store(true);
355
358
cv_.notify_all();
356
- }
357
-
358
- void join(){
359
359
for (auto& thread : pool_) {
360
360
if (thread.joinable()) {
361
361
thread.join();
@@ -383,16 +383,16 @@ public:
383
383
return ret;
384
384
}
385
385
386
- void start(){
387
- for (std::size_t i = 0; i < num_threads_; ++i){
386
+ void start() {
387
+ for (std::size_t i = 0; i < num_threads_; ++i) {
388
388
pool_.emplace_back([this] {
389
389
while (!stop_) {
390
390
Task task;
391
391
{
392
392
std::unique_lock<std::mutex> lc{ mutex_ };
393
+ cv_.wait(lc, [this] {return stop_ || !tasks_.empty(); });
393
394
if (tasks_.empty())
394
395
return;
395
- cv_.wait(lc, [this] {return stop_ || !tasks_.empty(); });
396
396
task = std::move(tasks_.front());
397
397
tasks_.pop();
398
398
}
@@ -415,89 +415,77 @@ private:
415
415
** 测试 demo** :
416
416
417
417
``` cpp
418
- int print_task (int n) {
419
- std::osyncstream{ std::cout } << "Task " << n << " is running." << std::endl;
420
- return n;
421
- }
422
- int print_task2(int n) {
423
- std::osyncstream{ std::cout } << "🐢🐢🐢 " << n << " 🐉🐉🐉" << std::endl;
424
- return n;
425
- }
426
-
427
418
int main () {
428
- ThreadPool pool{ 4 }; // 创建一个有 4 个线程的线程池 构造函数自动启动线程池
419
+ ThreadPool pool{ 4 }; // 创建一个有 4 个线程的线程池
429
420
std::vector<std::future<int>> futures; // future 集合,获取返回值
430
421
431
422
for (int i = 0; i < 10; ++i) {
432
423
futures.emplace_back(pool.submit(print_task, i));
433
424
}
434
- pool.join(); // 阻塞,让任务全部执行完毕
435
-
436
- std::puts("---------------------");
437
-
438
- pool.start(); // 重新启动线程池
439
425
440
426
for (int i = 0; i < 10; ++i) {
441
427
futures.emplace_back(pool.submit(print_task2, i));
442
428
}
443
- pool.join(); // 阻塞,让任务全部执行完毕
444
429
445
430
int sum = 0;
446
- for(auto& future : futures){
447
- sum += future.get();
431
+ for (auto& future : futures) {
432
+ sum += future.get(); // get() 成员函数 阻塞到任务执行完毕,获取返回值
448
433
}
449
434
std::cout << "sum: " << sum << '\n';
450
- } // 析构自动 stop() join()
435
+ } // 析构自动 stop()
451
436
```
452
437
453
- **可能的[运行结果](https://godbolt.org/z/3rbExqbb7 )**:
438
+ ** 可能的[ 运行结果] ( https://godbolt.org/z/n7Tana59x ) ** :
454
439
455
440
``` shell
456
- Task 0 is running.
457
- Task 4 is running.
458
- Task 5 is running.
459
- Task 6 is running.
460
- Task 7 is running.
461
- Task 8 is running.
462
- Task 9 is running.
463
- Task 2 is running.
464
- Task 3 is running.
465
- Task 1 is running.
466
- ---------------------
467
- 🐢🐢🐢 0 🐉🐉🐉
441
+ Task 0 is running on thr: 6900
442
+ Task 1 is running on thr: 36304
443
+ Task 5 is running on thr: 36304
444
+ Task 3 is running on thr: 6900
445
+ Task 7 is running on thr: 6900
446
+ Task 2 is running on thr: 29376
447
+ Task 6 is running on thr: 36304
448
+ Task 4 is running on thr: 31416
468
449
🐢🐢🐢 1 🐉🐉🐉
450
+ Task 9 is running on thr: 29376
451
+ 🐢🐢🐢 0 🐉🐉🐉
452
+ Task 8 is running on thr: 6900
469
453
🐢🐢🐢 2 🐉🐉🐉
470
- 🐢🐢🐢 5 🐉🐉🐉
454
+ 🐢🐢🐢 6 🐉🐉🐉
471
455
🐢🐢🐢 4 🐉🐉🐉
456
+ 🐢🐢🐢 5 🐉🐉🐉
472
457
🐢🐢🐢 3 🐉🐉🐉
473
- 🐢🐢🐢 6 🐉🐉🐉
474
458
🐢🐢🐢 7 🐉🐉🐉
475
459
🐢🐢🐢 8 🐉🐉🐉
476
460
🐢🐢🐢 9 🐉🐉🐉
477
461
sum: 90
478
462
```
479
463
480
- > 如果不自己显式调用 ` join() ` ,而是等待线程池对象调用析构函数, 那么效果如同 ` asio::thread_pool ` ,会先进行 ` stop ` ,导致一些任务无法执行 。
464
+ > 如果等待线程池对象调用析构函数, 那么效果如同 ` asio::thread_pool ` ,会先进行 ` stop ` ,这可能导致一些任务无法执行。不过我们在最后 ** 循环遍历了 ` futures ` ** ,调用 ` get() ` 成员函数,不存在这个问题 。
481
465
482
466
它支持** 任意可调用类型** ,当然也包括非静态成员函数。我们使用了 [ ` std::decay_t ` ] ( https://zh.cppreference.com/w/cpp/types/decay ) ,所以参数的传递其实是** 按值复制** ,而不是引用传递,这一点和大部分库的设计一致。示例如下:
483
467
484
468
``` cpp
485
- struct X {
486
- void f(const int& n)const{
487
- std::cout << &n << '\n';
469
+ struct X {
470
+ void f(const int& n) const {
471
+ std::osyncstream{ std:: cout } << &n << '\n';
488
472
}
489
473
};
490
474
491
- X x;
492
- int n = 6 ;
493
- std::cout << &n << ' \n ' ;
494
- pool.start();
495
- pool.submit(&X::f, &x, n); // 默认复制,地址不同
496
- pool.submit(&X::f, &x, std::ref(n));
497
- pool.join();
475
+ int main() {
476
+ ThreadPool pool{ 4 }; // 创建一个有 4 个线程的线程池
477
+
478
+ X x;
479
+ int n = 6;
480
+ std::cout << &n << '\n';
481
+ auto t = pool.submit(&X::f, &x, n); // 默认复制,地址不同
482
+ auto t2 = pool.submit(&X::f, &x, std::ref(n));
483
+ t.wait();
484
+ t2.wait();
485
+ } // 析构自动 stop()
498
486
```
499
487
500
- > [ 运行] ( https://godbolt.org/z/vTc7M8Kov ) 测试。
488
+ > [运行](https://godbolt.org/z/vY458T44e )测试。
501
489
502
490
我们的线程池的 `submit` 成员函数在传递参数的行为上,与先前介绍的 `std::thread` 和 `std::async` 等设施基本一致。
503
491
@@ -512,11 +500,7 @@ pool.join();
512
500
**外部接口:**
513
501
514
502
- **`stop()`**:停止线程池,通知所有线程退出(不会等待所有任务执行完毕)。
515
-
516
- - ** ` join() ` ** :等待所有线程完成任务。
517
-
518
- - ** ` submit() ` ** :将任务提交到任务队列,并返回一个` std::future ` 对象用于获取任务结果。
519
-
503
+ - **`submit()`**:将任务提交到任务队列,并返回一个`std::future`对象用于获取任务结果以及确保任务执行完毕。
520
504
- **`start()`**:启动线程池,创建并启动指定数量的线程。
521
505
522
506
我们并没有提供一个功能强大的所谓的“***调度器***”,我们只是利用条件变量和互斥量,让操作系统自行调度而已,它并不具备设置任务优先级之类的调度功能。
0 commit comments