Skip to content

Commit fad99b5

Browse files
committed
fix
1 parent db5d03f commit fad99b5

File tree

4 files changed

+33
-17
lines changed

4 files changed

+33
-17
lines changed

include/ylt/coro_io/coro_io.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ inline async_simple::coro::Lazy<ret_type> async_io(IO_func io_func,
276276
auto weak_lock = std::weak_ptr{lock};
277277
lock = nullptr;
278278
// wait cancel finish to make sure io object's life-time
279-
for (; weak_lock.lock();) {
279+
for (; !weak_lock.expired();) {
280280
co_await coro_io::post(
281281
[]() {
282282
},

include/ylt/coro_io/io_context_pool.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class ExecutorWrapper : public async_simple::Executor {
119119
slot, [timer](auto signalType, auto *signal) mutable {
120120
if (bool expected = false;
121121
!timer->second.compare_exchange_strong(
122-
expected, true, std::memory_order_release)) {
122+
expected, true, std::memory_order_acq_rel)) {
123123
timer->first.cancel();
124124
}
125125
})) {
@@ -130,7 +130,7 @@ class ExecutorWrapper : public async_simple::Executor {
130130
fn();
131131
});
132132
if (bool expected = false; !timer->second.compare_exchange_strong(
133-
expected, true, std::memory_order_release)) {
133+
expected, true, std::memory_order_acq_rel)) {
134134
timer->first.cancel();
135135
}
136136
}

include/ylt/metric/summary_impl.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ class summary_impl {
131131
if (piece) {
132132
if constexpr (inc_order) {
133133
for (int j = 0; j < piece->size(); ++j) {
134+
// tsan check data race here is expected. stat dont need to be very
135+
// strict. we allow old value.
134136
auto value = (*piece)[j].load(std::memory_order_relaxed);
135137
if (value) {
136138
result.emplace_back(get_ordered_index(i * piece_size + j), value);

src/coro_io/tests/test_coro_channel.cpp

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
#include <array>
44
#include <chrono>
55
#include <iostream>
6+
#include <memory>
7+
#include <system_error>
68
#include <ylt/coro_io/coro_io.hpp>
9+
10+
#include "async_simple/coro/Lazy.h"
711
using namespace std::chrono_literals;
812

913
#ifndef __clang__
@@ -34,18 +38,28 @@ async_simple::coro::Lazy<void> test_channel() {
3438
CHECK(val == 42);
3539
}
3640

41+
async_simple::coro::Lazy<std::pair<std::error_code, int>> async_receive_wrapper(
42+
std::shared_ptr<coro_io::channel<int>> ch) {
43+
co_return co_await async_receive(*ch);
44+
}
45+
46+
async_simple::coro::Lazy<void> wait_wrapper(
47+
std::shared_ptr<coro_io::period_timer> t) {
48+
co_await t->async_await();
49+
}
3750
async_simple::coro::Lazy<void> test_select_channel() {
3851
using namespace coro_io;
3952
using namespace async_simple::coro;
40-
auto ch1 = coro_io::create_channel<int>(1000);
41-
auto ch2 = coro_io::create_channel<int>(1000);
53+
auto ch1 = coro_io::create_shared_channel<int>(1000);
54+
auto ch2 = coro_io::create_shared_channel<int>(1000);
4255

43-
co_await async_send(ch1, 41);
44-
co_await async_send(ch2, 42);
56+
co_await async_send(*ch1, 41);
57+
co_await async_send(*ch2, 42);
4558

4659
std::array<int, 2> arr{41, 42};
4760

48-
auto result = co_await collectAny(async_receive(ch1), async_receive(ch2));
61+
auto result = co_await collectAny(async_receive_wrapper(ch1),
62+
async_receive_wrapper(ch2));
4963
int val = std::visit(
5064
[&val](auto& v) {
5165
return static_cast<int>(v.value().second);
@@ -54,22 +68,22 @@ async_simple::coro::Lazy<void> test_select_channel() {
5468

5569
CHECK(val == arr[result.index()]);
5670

57-
co_await async_send(ch1, 41);
58-
co_await async_send(ch2, 42);
71+
co_await async_send(*ch1, 41);
72+
co_await async_send(*ch2, 42);
5973

6074
std::vector<Lazy<std::pair<std::error_code, int>>> vec;
61-
vec.push_back(async_receive(ch1));
62-
vec.push_back(async_receive(ch2));
75+
vec.push_back(async_receive_wrapper(ch1));
76+
vec.push_back(async_receive_wrapper(ch2));
6377

6478
auto result2 = co_await collectAny(std::move(vec));
6579
val = result2.value().second;
6680
CHECK(val == arr[result2.index()]);
6781

68-
period_timer timer1(coro_io::get_global_executor());
69-
timer1.expires_after(100ms);
70-
period_timer timer2(coro_io::get_global_executor());
71-
timer2.expires_after(200ms);
72-
auto val1 = co_await collectAny(timer1.async_await(), timer2.async_await());
82+
auto timer1 = std::make_shared<period_timer>(coro_io::get_global_executor());
83+
timer1->expires_after(100ms);
84+
auto timer2 = std::make_shared<period_timer>(coro_io::get_global_executor());
85+
timer2->expires_after(200ms);
86+
auto val1 = co_await collectAny(wait_wrapper(timer1), wait_wrapper(timer2));
7387

7488
CHECK(val1.index() == 0);
7589
}

0 commit comments

Comments
 (0)