Skip to content

Commit e9fefec

Browse files
authored
[ibverbs] support multi unfinished send request in ib_socket (#982)
1 parent 935c408 commit e9fefec

File tree

3 files changed

+49
-27
lines changed

3 files changed

+49
-27
lines changed

include/ylt/coro_io/ibverbs/ib_io.hpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@ async_simple::coro::
170170
is_canceled = true;
171171
}
172172
if (is_canceled) [[unlikely]] {
173-
co_await coro_io::dispatch(ib_socket.get_executor());
174173
ib_socket.close();
175174
co_return std::pair{std::make_error_code(std::errc::operation_canceled),
176175
std::size_t{0}};
@@ -256,20 +255,16 @@ inline void reset_buffer(std::vector<ibv_sge>& buffer, std::size_t read_size) {
256255

257256
template <ib_socket_t::io_type io, typename Buffer>
258257
async_simple::coro::Lazy<std::pair<std::error_code, std::size_t>>
259-
async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
260-
bool read_some = false) {
258+
async_io_split_impl(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
259+
bool read_some) {
261260
if (!ib_socket.is_open()) {
262-
co_await coro_io::dispatch(
263-
ib_socket.get_coro_executor()->get_asio_executor());
264261
co_return std::pair{std::make_error_code(std::errc::not_connected),
265262
std::size_t{0}};
266263
}
267264
std::vector<ibv_sge> sge_list;
268265
make_sge(sge_list, raw_buffer);
269266
std::span<ibv_sge> sge_span = sge_list;
270267
if (sge_span.size() == 0) [[unlikely]] {
271-
co_await coro_io::dispatch(
272-
ib_socket.get_coro_executor()->get_asio_executor());
273268
co_return std::pair{std::error_code{}, std::size_t{0}};
274269
}
275270

@@ -278,8 +273,6 @@ async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
278273
uint32_t max_size = ib_socket.get_buffer_size();
279274
std::size_t io_completed_size = consume_buffer(ib_socket, sge_span);
280275
if (sge_span.empty()) {
281-
co_await coro_io::dispatch(
282-
ib_socket.get_coro_executor()->get_asio_executor());
283276
co_return std::pair{std::error_code{}, io_completed_size};
284277
}
285278

@@ -379,7 +372,6 @@ async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
379372
is_canceled = true;
380373
}
381374
if (is_canceled) [[unlikely]] {
382-
co_await coro_io::dispatch(ib_socket.get_executor());
383375
ib_socket.close();
384376
co_return std::pair{std::make_error_code(std::errc::operation_canceled),
385377
std::size_t{0}};
@@ -393,6 +385,19 @@ async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
393385
co_return std::pair{ec, io_completed_size};
394386
}
395387

388+
template <ib_socket_t::io_type io, typename Buffer>
389+
async_simple::coro::Lazy<std::pair<std::error_code, std::size_t>>
390+
async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
391+
bool read_some = false) {
392+
auto ret = co_await async_io_split_impl<io>(
393+
ib_socket, std::forward<Buffer>(raw_buffer), read_some);
394+
if (!ib_socket.get_executor().running_in_this_thread()) [[unlikely]] {
395+
// switch to io_thread
396+
co_await dispatch(ib_socket.get_executor());
397+
}
398+
co_return ret;
399+
}
400+
396401
} // namespace detail
397402

398403
template <typename buffer_t>

include/ylt/coro_io/ibverbs/ib_socket.hpp

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ struct ib_socket_shared_state_t
9191
std::queue<std::pair<std::error_code, std::size_t>>
9292
recv_result; // TODO optimize with circle buffer
9393
callback_t recv_cb_;
94-
callback_t send_cb_;
94+
std::deque<callback_t> send_cb_;
9595
ib_buffer_t recv_buf_;
9696
std::shared_ptr<ib_buffer_pool_t> ib_buffer_pool_;
9797
std::unique_ptr<asio::posix::stream_descriptor> fd_;
@@ -182,7 +182,7 @@ struct ib_socket_shared_state_t
182182
struct resume_struct {
183183
std::error_code ec;
184184
std::size_t len;
185-
callback_t* cb;
185+
uint64_t wr_id;
186186
};
187187

188188
void post_send_impl(std::span<ibv_sge> sge, callback_t&& handler) {
@@ -195,12 +195,11 @@ struct ib_socket_shared_state_t
195195
sge = std::move(sge_copy)]() mutable {
196196
ibv_send_wr sr{};
197197
ibv_send_wr* bad_wr = nullptr;
198-
self->send_cb_ = std::move(handler);
199198
if (sge.size() && sge[0].lkey == 0) {
200199
sr.send_flags = IBV_SEND_INLINE;
201200
}
202201
sr.next = NULL;
203-
sr.wr_id = (uintptr_t)&self->send_cb_;
202+
sr.wr_id = 1;
204203
sr.sg_list = sge.data();
205204
sr.num_sge = sge.size();
206205
sr.opcode = IBV_WR_SEND;
@@ -211,13 +210,16 @@ struct ib_socket_shared_state_t
211210
}
212211
// post the receive request to the RQ
213212
else if (auto ec = ibv_post_send(self->qp_.get(), &sr, &bad_wr); ec) {
214-
err = std::make_error_code(std::errc{std::abs(ec)});
213+
err = std::make_error_code(std::errc{ec});
215214
ELOG_ERROR << "ibv post send failed: " << err.message();
216215
}
217216
if (err) {
218217
ib_socket_shared_state_t::resume(std::pair{err, std::size_t{0}},
219218
handler);
220219
}
220+
else {
221+
self->send_cb_.push_back(std::move(handler));
222+
}
221223
},
222224
executor_->get_asio_executor())
223225
.start([](auto&& res) {
@@ -243,7 +245,7 @@ struct ib_socket_shared_state_t
243245
struct ibv_wc wc{};
244246
int ne = 0;
245247
std::vector<resume_struct> vec;
246-
callback_t tmp_callback;
248+
callback_t tmp_recv_callback;
247249
while ((ne = ibv_poll_cq(cq_.get(), 1, &wc)) != 0) {
248250
if (ne < 0) {
249251
ELOG_ERROR << "poll CQ failed:" << ne;
@@ -275,8 +277,8 @@ struct ib_socket_shared_state_t
275277
close();
276278
}
277279
}
278-
tmp_callback = std::move(recv_cb_);
279-
vec.push_back({ec, wc.byte_len, &tmp_callback});
280+
tmp_recv_callback = std::move(recv_cb_);
281+
vec.push_back({ec, wc.byte_len, 0});
280282
}
281283
else {
282284
recv_result.push(std::pair{ec, (std::size_t)wc.byte_len});
@@ -289,15 +291,21 @@ struct ib_socket_shared_state_t
289291
}
290292
}
291293
else {
292-
vec.push_back({ec, wc.byte_len, (callback_t*)wc.wr_id});
294+
vec.push_back({ec, wc.byte_len, wc.wr_id});
293295
}
294296
if (cq_ == nullptr) {
295297
break;
296298
}
297299
}
298300
}
299301
for (auto& result : vec) {
300-
resume(std::pair{result.ec, result.len}, *result.cb);
302+
if (result.wr_id == 0) {
303+
resume(std::pair{result.ec, result.len}, tmp_recv_callback);
304+
}
305+
else {
306+
resume(std::pair{result.ec, result.len}, send_cb_.front());
307+
send_cb_.pop_front();
308+
}
301309
}
302310
return ec;
303311
}
@@ -333,7 +341,7 @@ struct ibverbs_config {
333341
uint32_t cq_size = 128;
334342
uint32_t recv_buffer_cnt = 4;
335343
ibv_qp_type qp_type = IBV_QPT_RC;
336-
ibv_qp_cap cap = {.max_send_wr = 2,
344+
ibv_qp_cap cap = {.max_send_wr = 32,
337345
.max_recv_wr = 32,
338346
.max_send_sge = 3,
339347
.max_recv_sge = 1,
@@ -389,7 +397,8 @@ class ib_socket_t {
389397
~ib_socket_t() { close(); }
390398

391399
bool is_open() const noexcept {
392-
return state_->fd_ != nullptr && state_->fd_->is_open();
400+
return state_->fd_ != nullptr && state_->fd_->is_open() &&
401+
!state_->has_close_;
393402
}
394403
std::shared_ptr<ib_buffer_pool_t> buffer_pool() const noexcept {
395404
return state_->ib_buffer_pool_;
@@ -799,7 +808,7 @@ class ib_socket_t {
799808
void init_fd() {
800809
int r = ibv_req_notify_cq(state_->cq_.get(), 0);
801810
if (r) {
802-
auto err_code = std::make_error_code(std::errc{errno});
811+
auto err_code = std::make_error_code(std::errc{r});
803812
ELOG_ERROR << "ibv_req_notify_cq failed: " << err_code.message();
804813
throw std::system_error(err_code);
805814
}
@@ -823,12 +832,15 @@ class ib_socket_t {
823832
}
824833

825834
if (ec) {
826-
auto send_cb = std::move(self->send_cb_);
827835
self->close();
828836
ib_socket_shared_state_t::resume(std::pair{ec, std::size_t{0}},
829837
self->recv_cb_);
830-
ib_socket_shared_state_t::resume(std::pair{ec, std::size_t{0}},
831-
send_cb);
838+
while (!self->send_cb_.empty()) {
839+
ib_socket_shared_state_t::resume(std::pair{ec, std::size_t{0}},
840+
self->send_cb_.front());
841+
self->send_cb_.pop_front();
842+
}
843+
832844
break;
833845
}
834846
}

src/coro_io/tests/ibverbs/CMakeLists.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@ if(YLT_HAVE_IBVERBS)
44

55
add_executable(ibverbs_test
66
test_device.cpp
7-
ib_socket_pressure_test.cpp
87
test_ib_socket.cpp
98
main.cpp
109
)
10+
11+
add_executable(ibverbs_pressure_test
12+
ib_socket_pressure_test.cpp
13+
main.cpp)
1114

1215
target_link_libraries(ibverbs_test -libverbs)
16+
target_link_libraries(ibverbs_pressure_test -libverbs)
1317
add_test(NAME ibverbs_test COMMAND ibverbs_test)
18+
add_test(NAME ibverbs_pressure_test COMMAND ibverbs_pressure_test)
1419
endif()

0 commit comments

Comments
 (0)