Skip to content

Commit 935c408

Browse files
authored
[coro_io][ibverbs] fix close failed to notify peer (#980)
1 parent 3b950e3 commit 935c408

File tree

14 files changed

+176
-117
lines changed

14 files changed

+176
-117
lines changed

include/ylt/coro_io/ibverbs/ib_buffer.hpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <atomic>
77
#include <cerrno>
8+
#include <cmath>
89
#include <cstddef>
910
#include <cstdint>
1011
#include <memory>
@@ -114,9 +115,12 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
114115
<< "}, now ib_buffer count: " << self->free_buffers_.size();
115116
std::size_t clear_cnt = self->free_buffers_.clear_old(1000);
116117
self->total_memory_ -= clear_cnt * self->buffer_size();
117-
ELOG_WARN << "finish ib_buffer timeout free of pool{" << self.get()
118+
ELOG_INFO << "finish ib_buffer timeout free of pool{" << self.get()
118119
<< "}, now ib_buffer cnt: " << self->free_buffers_.size()
119-
<< " mem usage:" << self->total_memory_;
120+
<< " mem usage:"
121+
<< (int64_t)(std::round(self->total_memory_ /
122+
(1.0 * 1024 * 1024)))
123+
<< " MB";
120124
if (clear_cnt != 0) {
121125
try {
122126
co_await async_simple::coro::Yield{};

include/ylt/coro_io/ibverbs/ib_socket.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ struct ib_socket_shared_state_t
206206
sr.opcode = IBV_WR_SEND;
207207
sr.send_flags |= IBV_SEND_SIGNALED;
208208
std::error_code err;
209-
if (self->has_close_) {
209+
if (self->fd_ == nullptr) {
210210
err = std::make_error_code(std::errc::operation_canceled);
211211
}
212212
// post the receive request to the RQ
@@ -240,7 +240,7 @@ struct ib_socket_shared_state_t
240240
ELOG_ERROR << std::make_error_code(std::errc{r}).message();
241241
return std::make_error_code(std::errc{r});
242242
}
243-
struct ibv_wc wc {};
243+
struct ibv_wc wc{};
244244
int ne = 0;
245245
std::vector<resume_struct> vec;
246246
callback_t tmp_callback;

include/ylt/coro_rpc/impl/coro_rpc_server.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class coro_rpc_server_base {
172172
}
173173
errc_ = listen();
174174
if (!errc_) {
175-
if constexpr (requires(typename server_config::executor_pool_t & pool) {
175+
if constexpr (requires(typename server_config::executor_pool_t &pool) {
176176
pool.run();
177177
}) {
178178
thd_ = std::thread([this] {
@@ -420,7 +420,12 @@ class coro_rpc_server_base {
420420
}
421421
#endif
422422
if (error) {
423-
ELOG_ERROR << "accept failed, error: " << error.message();
423+
if (error == asio::error::operation_aborted) {
424+
ELOG_INFO << "server was canceled:" << error.message();
425+
}
426+
else {
427+
ELOG_ERROR << "server accept failed:" << error.message();
428+
}
424429
if (error == asio::error::operation_aborted ||
425430
error == asio::error::bad_descriptor) {
426431
acceptor_close_waiter_.set_value();

include/ylt/thirdparty/async_simple/CommonMacros.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,26 @@
4343
#endif // __SANITIZE_ADDRESS__
4444
#endif // __GNUC__
4545

46+
#if __has_cpp_attribute(clang::coro_only_destroy_when_complete)
47+
#define CORO_ONLY_DESTROY_WHEN_DONE [[clang::coro_only_destroy_when_complete]]
48+
#else
4649
#if defined(__alibaba_clang__) && \
4750
__has_cpp_attribute(ACC::coro_only_destroy_when_complete)
4851
#define CORO_ONLY_DESTROY_WHEN_DONE [[ACC::coro_only_destroy_when_complete]]
4952
#else
5053
#define CORO_ONLY_DESTROY_WHEN_DONE
5154
#endif
55+
#endif
5256

57+
#if __has_cpp_attribute(clang::coro_await_elidable)
58+
#define ELIDEABLE_AFTER_AWAIT [[clang::coro_await_elidable]]
59+
#else
5360
#if defined(__alibaba_clang__) && \
5461
__has_cpp_attribute(ACC::elideable_after_await)
5562
#define ELIDEABLE_AFTER_AWAIT [[ACC::elideable_after_await]]
5663
#else
5764
#define ELIDEABLE_AFTER_AWAIT
5865
#endif
66+
#endif
5967

6068
#endif

include/ylt/thirdparty/async_simple/Signal.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#define ASYNC_SIMPLE_SIGNAL_H
1818

1919
#ifndef ASYNC_SIMPLE_USE_MODULES
20+
2021
#include <assert.h>
2122
#include <any>
2223
#include <atomic>
@@ -94,7 +95,7 @@ class Signal : public std::enable_shared_from_this<Signal> {
9495
// binding slots, then execute the slot callback functions. It will return
9596
// the signal which success triggered. If no signal success triggger, return
9697
// SignalType::none.
97-
SignalType emit(SignalType state) noexcept;
98+
SignalType emits(SignalType state) noexcept;
9899

99100
// Return now signal type.
100101
SignalType state() const noexcept {
@@ -213,7 +214,8 @@ class Slot {
213214
(signal()->state() & type)) {
214215
return false;
215216
}
216-
// if signal triggered later, we will found it by cas failed.
217+
// if signal triggered later, we will found it by atomic handler CAS
218+
// failed.
217219
auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire);
218220
if (oldHandler ==
219221
&detail::SignalSlotSharedState::HandlerManager::emittedTag) {
@@ -318,7 +320,7 @@ class Slot {
318320
[chainedSignal =
319321
chainedSignal->weak_from_this()](SignalType type) {
320322
if (auto signal = chainedSignal.lock(); signal != nullptr) {
321-
signal->emit(type);
323+
signal->emits(type);
322324
}
323325
}),
324326
std::memory_order_release);
@@ -449,7 +451,7 @@ inline detail::SignalSlotSharedState::~SignalSlotSharedState() {
449451
}
450452
}
451453

452-
inline SignalType Signal::emit(SignalType state) noexcept {
454+
inline SignalType Signal::emits(SignalType state) noexcept {
453455
if (state != SignalType::None) {
454456
SignalType vaildSignal = UpdateState(_state, state);
455457
if (vaildSignal) {

include/ylt/thirdparty/async_simple/async_simple.cppm

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,30 @@ module;
1515
#include <stdio.h>
1616
#include <cassert>
1717
#include <climits>
18+
#include <cstdint>
1819
#include <version>
1920

2021
#ifdef __linux__
2122
#include <sched.h>
2223
#endif
2324

25+
#if __has_include(<libaio.h>) && !defined(ASYNC_SIMPLE_HAS_NOT_AIO)
26+
#include <libaio.h>
27+
#endif
28+
2429
export module async_simple;
2530
import std;
2631
#define ASYNC_SIMPLE_USE_MODULES
2732
export extern "C++" {
2833
#include "util/move_only_function.h"
2934
#include "coro/Traits.h"
35+
#include "CommonMacros.h"
36+
#include "Common.h"
3037
#include "MoveWrapper.h"
3138
#include "experimental/coroutine.h"
39+
#include "async_simple/Signal.h"
3240
#include "Executor.h"
33-
#include "CommonMacros.h"
34-
#include "Common.h"
41+
#include "async_simple/coro/LazyLocalBase.h"
3542
#include "Unit.h"
3643
#include "Try.h"
3744
#include "FutureState.h"
@@ -41,6 +48,9 @@ export extern "C++" {
4148
#include "Promise.h"
4249
#include "coro/DetachedCoroutine.h"
4350
#include "coro/ViaCoroutine.h"
51+
#if defined(__clang_major__) && __clang_major__ >= 17
52+
#include "coro/PromiseAllocator.h"
53+
#endif
4454
#include "coro/Lazy.h"
4555
#include "uthread/internal/thread_impl.h"
4656
#include "uthread/Await.h"
@@ -71,7 +81,6 @@ export extern "C++" {
7181
#include "coro/Semaphore.h"
7282
// There are some bugs in clang lower versions.
7383
#if defined(__clang_major__) && __clang_major__ >= 17
74-
#include "coro/PromiseAllocator.h"
7584
#include "coro/Generator.h"
7685
#endif
7786
}

include/ylt/thirdparty/async_simple/coro/Collect.h

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ struct CollectAnyAwaiter {
167167
SignalType type, Signal*) mutable {
168168
auto count = e->downCount();
169169
if (count == size + 1) {
170-
c.resume();
170+
c.resume();
171171
}
172172
})) { // has canceled
173173
return false;
@@ -187,13 +187,13 @@ struct CollectAnyAwaiter {
187187
auto count = e->downCount();
188188
// n+1: n coro + 1 cancel handler
189189
if (count == size + 1) {
190-
_result = std::make_unique<ResultType>();
191-
_result->_idx = i;
192-
_result->_value = std::move(result);
193-
if (auto ptr = local->getSlot(); ptr) {
194-
ptr->signal()->emit(_SignalType);
195-
}
196-
c.resume();
190+
_result = std::make_unique<ResultType>();
191+
_result->_idx = i;
192+
_result->_value = std::move(result);
193+
if (auto ptr = local->getSlot(); ptr) {
194+
ptr->signal()->emits(_SignalType);
195+
}
196+
c.resume();
197197
}
198198
});
199199
} // end for
@@ -269,7 +269,7 @@ struct CollectAnyVariadicAwaiter {
269269
Signal*) mutable {
270270
auto count = e->downCount();
271271
if (count == std::tuple_size<InputType>() + 1) {
272-
c.resume();
272+
c.resume();
273273
}
274274
})) { // has canceled
275275
return false;
@@ -291,12 +291,12 @@ struct CollectAnyVariadicAwaiter {
291291
auto count = e->downCount();
292292
// n+1: n coro + 1 cancel handler
293293
if (count == std::tuple_size<InputType>() + 1) {
294-
_result = std::make_unique<ResultType>(
295-
std::in_place_index_t<index>(), std::move(res));
296-
if (auto ptr = local->getSlot(); ptr) {
297-
ptr->signal()->emit(_SignalType);
298-
}
299-
c.resume();
294+
_result = std::make_unique<ResultType>(
295+
std::in_place_index_t<index>(), std::move(res));
296+
if (auto ptr = local->getSlot(); ptr) {
297+
ptr->signal()->emits(_SignalType);
298+
}
299+
c.resume();
300300
}
301301
});
302302
}(),
@@ -392,15 +392,14 @@ struct CollectAllAwaiter {
392392
_event.setAwaitingCoro(continuation);
393393
auto size = _input.size();
394394
for (size_t i = 0; i < size; ++i) {
395-
auto& exec = _input[i]._coro.promise()._executor;
396-
if (exec == nullptr) {
397-
exec = executor;
398-
}
399-
std::unique_ptr<LazyLocalBase> local;
400-
local = std::make_unique<LazyLocalBase>(_signal.get());
401-
_input[i]._coro.promise()._lazy_local = local.get();
402-
auto&& func =
403-
[this, i, local = std::move(local)]() mutable {
395+
auto& exec = _input[i]._coro.promise()._executor;
396+
if (exec == nullptr) {
397+
exec = executor;
398+
}
399+
std::unique_ptr<LazyLocalBase> local;
400+
local = std::make_unique<LazyLocalBase>(_signal.get());
401+
_input[i]._coro.promise()._lazy_local = local.get();
402+
auto&& func = [this, i, local = std::move(local)]() mutable {
404403
_input[i].start([this, i, local = std::move(local)](
405404
Try<ValueType>&& result) {
406405
_output[i] = std::move(result);
@@ -410,21 +409,21 @@ struct CollectAllAwaiter {
410409
auto signalType = _SignalType;
411410
auto awaitingCoro = _event.down(oldCount, 1);
412411
if (oldCount == size) {
413-
signal->emit(signalType);
412+
signal->emits(signalType);
414413
}
415414
if (awaitingCoro) {
416415
awaitingCoro.resume();
417416
}
418417
});
419-
};
420-
if (Para == true && _input.size() > 1) {
421-
if (exec != nullptr)
422-
AS_LIKELY {
423-
exec->schedule_move_only(std::move(func));
424-
continue;
425-
}
426-
}
427-
func();
418+
};
419+
if (Para == true && _input.size() > 1) {
420+
if (exec != nullptr)
421+
AS_LIKELY {
422+
exec->schedule_move_only(std::move(func));
423+
continue;
424+
}
425+
}
426+
func();
428427
}
429428
}
430429
inline auto await_resume() { return std::move(_output); }
@@ -583,7 +582,7 @@ struct CollectAllVariadicAwaiter {
583582
auto signalType = _SignalType;
584583
auto awaitingCoro = _event.down(oldCount, 1);
585584
if (oldCount == sizeof...(Ts)) {
586-
signal->emit(signalType);
585+
signal->emits(signalType);
587586
}
588587
if (awaitingCoro) {
589588
awaitingCoro.resume();

include/ylt/thirdparty/async_simple/coro/CountEvent.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ namespace detail {
3434
// The last 'down' will resume the awaiting coroutine on this event.
3535
class CountEvent {
3636
public:
37-
CountEvent(size_t count) : _count(count) {}
38-
CountEvent(const CountEvent&) = delete;
39-
CountEvent(CountEvent&& other)
40-
: _count(other._count.exchange(0, std::memory_order_relaxed)),
41-
_awaitingCoro(std::exchange(other._awaitingCoro, nullptr)) {}
37+
CountEvent(size_t count) : _count(count) {}
38+
CountEvent(const CountEvent&) = delete;
39+
CountEvent(CountEvent&& other)
40+
: _count(other._count.exchange(0, std::memory_order_relaxed)),
41+
_awaitingCoro(std::exchange(other._awaitingCoro, nullptr)) {}
4242

43-
[[nodiscard]] CoroHandle<> down(size_t n = 1) {
44-
std::size_t oldCount;
45-
return down(oldCount, n);
43+
[[nodiscard]] CoroHandle<> down(size_t n = 1) {
44+
std::size_t oldCount;
45+
return down(oldCount, n);
4646
}
4747
[[nodiscard]] CoroHandle<> down(size_t& oldCount, std::size_t n) {
4848
// read acquire and write release, _awaitingCoro store can not be

0 commit comments

Comments
 (0)