Skip to content

Commit 1bd20f7

Browse files
[coro_io][rdma] enhance rdma implement (#968)
* speed up write * remove tmp * add max_buffer * fix * speed up add buffer * change buffer to 2m * fix crash * fix * fix * test inline data * support inline send * fix * fix * rmda support notify peer when close * fix soc * fix crash * fix test * fix format * fix code * add doc * fix rdma * fix * fix for lifetime --------- Co-authored-by: qicosmos <qicosmos@linux.alibaba.com>
1 parent 3a2bd9b commit 1bd20f7

File tree

23 files changed

+763
-318
lines changed

23 files changed

+763
-318
lines changed

cmake/find_ibverbs.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
check_library_exists(ibverbs ibv_create_qp "" HAVE_IBVERBS_IBV)
2-
if(HAVE_IBVERBS_IBV)
1+
check_library_exists(ibverbs ibv_create_qp "" YLT_HAVE_IBVERBS)
2+
if(YLT_HAVE_IBVERBS)
33
set(YLT_ENABLE_IBV ON)
44
set(IBVERBS_LIBRARY ibverbs)
55
message(STATUS "have libibverbs")

include/ylt/coro_io/ibverbs/ib_buffer.hpp

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ struct ib_buffer_t {
8080
~ib_buffer_t();
8181
};
8282

83+
std::shared_ptr<ib_device_t> g_ib_device(ib_config_t conf = {});
8384
class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
8485
private:
8586
friend struct ib_buffer_t;
8687
struct private_construct_token {};
8788
static async_simple::coro::Lazy<void> collect_idle_timeout_client(
8889
std::weak_ptr<ib_buffer_pool_t> self_weak,
89-
std::chrono::milliseconds sleep_time, std::size_t clear_cnt) {
90+
std::chrono::milliseconds sleep_time) {
9091
std::shared_ptr<ib_buffer_pool_t> self = self_weak.lock();
9192
if (self == nullptr) {
9293
co_return;
@@ -101,7 +102,7 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
101102
while (true) {
102103
ELOG_TRACE << "start collect timeout buffer of pool{" << self.get()
103104
<< "}, now client count: " << self->free_buffers_.size();
104-
std::size_t is_all_cleared = self->free_buffers_.clear_old(clear_cnt);
105+
std::size_t is_all_cleared = self->free_buffers_.clear_old(1000);
105106
ELOG_TRACE << "finish collect timeout buffer of pool{" << self.get()
106107
<< "}, now client cnt: " << self->free_buffers_.size();
107108
if (is_all_cleared != 0) [[unlikely]] {
@@ -134,8 +135,7 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
134135
<< "}";
135136
collect_idle_timeout_client(this->weak_from_this(),
136137
(std::max)(pool_config_.idle_timeout,
137-
std::chrono::milliseconds{50}),
138-
pool_config_.idle_queue_per_max_clear_count)
138+
std::chrono::milliseconds{50}))
139139
.directlyStart(
140140
[](auto&&) {
141141
},
@@ -210,15 +210,15 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
210210
return buffer;
211211
}
212212
struct config_t {
213-
size_t buffer_size = 4 * 1024 * 1024; // 4MB
213+
size_t buffer_size = 2 * 1024 * 1024; // 2MB
214214
uint64_t max_memory_usage = 4ull * 1024 * 1024 * 1024; // 4GB
215-
size_t idle_queue_per_max_clear_count = 1000;
216215
std::chrono::milliseconds idle_timeout = std::chrono::milliseconds{5000};
217216
};
218217
ib_buffer_pool_t(private_construct_token t,
219218
std::shared_ptr<ib_device_t> device,
220219
const config_t& pool_config)
221-
: device_(std::move(device)), pool_config_(std::move(pool_config)) {}
220+
: device_(device ? std::move(device) : coro_io::g_ib_device()),
221+
pool_config_(std::move(pool_config)) {}
222222
static std::shared_ptr<ib_buffer_pool_t> create(
223223
std::shared_ptr<ib_device_t> device, const config_t& pool_config) {
224224
return std::make_shared<ib_buffer_pool_t>(private_construct_token{},
@@ -245,17 +245,29 @@ inline ib_buffer_t ib_buffer_t::regist(ib_buffer_pool_t& pool,
245245
int ib_flags) {
246246
auto mr = ibv_reg_mr(dev->pd(), ptr, size, ib_flags);
247247
if (mr != nullptr) [[unlikely]] {
248-
ELOG_INFO << "ibv_reg_mr regist: " << mr << " with pd:" << dev->pd();
248+
ELOG_DEBUG << "ibv_reg_mr regist: " << mr << " with pd:" << dev->pd();
249249
return ib_buffer_t{mr, std::move(dev), pool.weak_from_this()};
250250
}
251251
else {
252252
throw std::make_error_code(std::errc{errno});
253253
}
254254
};
255+
256+
inline std::shared_ptr<ib_device_t> g_ib_device(ib_config_t conf) {
257+
static auto dev = std::make_shared<ib_device_t>(conf);
258+
return dev;
259+
}
260+
261+
inline std::shared_ptr<ib_buffer_pool_t> g_ib_buffer_pool(
262+
const ib_buffer_pool_t::config_t& pool_config = {}) {
263+
static auto pool = ib_buffer_pool_t::create(g_ib_device(), pool_config);
264+
return pool;
265+
}
266+
255267
inline ib_buffer_t::ib_buffer_t(ibv_mr* mr, std::shared_ptr<ib_device_t> dev,
256268
std::weak_ptr<ib_buffer_pool_t> owner_pool)
257269
: mr_(mr),
258-
dev_(std::move(dev)),
270+
dev_(dev ? std::move(dev) : coro_io::g_ib_device()),
259271
owner_pool_(std::move(owner_pool)),
260272
has_memory_ownership_(false) {
261273
if (auto ptr = owner_pool_.lock(); ptr) {
@@ -299,14 +311,4 @@ inline void ib_buffer_t::change_owner(
299311
}
300312
}
301313

302-
inline std::shared_ptr<ib_device_t> g_ib_device(ib_config_t conf = {}) {
303-
static auto dev = std::make_shared<ib_device_t>(conf);
304-
return dev;
305-
}
306-
307-
inline std::shared_ptr<ib_buffer_pool_t> g_ib_buffer_pool(
308-
const ib_buffer_pool_t::config_t& pool_config = {}) {
309-
static auto pool = ib_buffer_pool_t::create(g_ib_device(), pool_config);
310-
return pool;
311-
}
312314
} // namespace coro_io

include/ylt/coro_io/ibverbs/ib_device.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,11 @@ class ib_device_t {
211211
asio::ip::address gid_address() const noexcept { return gid_address_; }
212212
const ibv_port_attr& attr() const noexcept { return attr_; }
213213

214+
bool is_support_inline_data() const noexcept { return support_inline_data_; }
215+
void set_support_inline_data(bool flag) noexcept {
216+
support_inline_data_ = flag;
217+
}
218+
214219
private:
215220
int ipv6_addr_v4mapped(const struct in6_addr* a) {
216221
return ((a->s6_addr32[0] | a->s6_addr32[1]) |
@@ -243,6 +248,7 @@ class ib_device_t {
243248
std::string name_;
244249
std::unique_ptr<ibv_pd, ib_deleter> pd_;
245250
std::unique_ptr<ibv_context, ib_deleter> ctx_;
251+
std::atomic<bool> support_inline_data_ = true;
246252
ibv_port_attr attr_;
247253
ibv_gid gid_;
248254
asio::ip::address gid_address_;

include/ylt/coro_io/ibverbs/ib_io.hpp

Lines changed: 93 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <exception>
99
#include <future>
1010
#include <memory>
11+
#include <optional>
1112
#include <span>
1213
#include <string_view>
1314
#include <system_error>
@@ -19,6 +20,7 @@
1920
#include "async_simple/Promise.h"
2021
#include "async_simple/Signal.h"
2122
#include "async_simple/coro/Collect.h"
23+
#include "async_simple/coro/FutureAwaiter.h"
2224
#include "async_simple/coro/Lazy.h"
2325
#include "ib_buffer.hpp"
2426
#include "ib_socket.hpp"
@@ -30,13 +32,12 @@
3032
#include "ylt/struct_pack/reflection.hpp"
3133
namespace coro_io {
3234

33-
// unlike tcp socket, client won't connnected util server first read ib_socket.
3435
inline async_simple::coro::Lazy<std::error_code> async_accept(
3536
asio::ip::tcp::acceptor& acceptor, coro_io::ib_socket_t& ib_socket) {
36-
auto soc = std::make_unique<asio::ip::tcp::socket>(ib_socket.get_executor());
37+
asio::ip::tcp::socket soc(ib_socket.get_executor());
3738
auto ec = co_await async_io<std::error_code>(
3839
[&](auto cb) {
39-
acceptor.async_accept(*soc, cb);
40+
acceptor.async_accept(soc, cb);
4041
},
4142
acceptor);
4243

@@ -51,24 +52,13 @@ inline async_simple::coro::Lazy<std::error_code> async_accept(
5152
inline async_simple::coro::Lazy<std::error_code> async_connect(
5253
coro_io::ib_socket_t& ib_socket, const std::string& host,
5354
const std::string& port) {
54-
asio::ip::tcp::socket soc{ib_socket.get_executor()};
55-
auto ec =
56-
co_await async_connect(ib_socket.get_coro_executor(), soc, host, port);
57-
if (ec) [[unlikely]] {
58-
co_return std::move(ec);
59-
}
60-
co_return co_await ib_socket.connect(soc);
55+
return ib_socket.connect(host, port);
6156
}
6257

6358
template <typename EndPointSeq>
6459
inline async_simple::coro::Lazy<std::error_code> async_connect(
6560
coro_io::ib_socket_t& ib_socket, const EndPointSeq& endpoint) noexcept {
66-
asio::ip::tcp::socket soc{ib_socket.get_executor()};
67-
auto ec = co_await async_connect(soc, endpoint);
68-
if (ec) [[unlikely]] {
69-
co_return std::move(ec);
70-
}
71-
co_return co_await ib_socket.connect(soc);
61+
return ib_socket.connect(endpoint);
7262
}
7363

7464
namespace detail {
@@ -138,32 +128,64 @@ async_simple::coro::
138128
async_simple::coro::
139129
Lazy<std::pair<std::error_code, std::size_t>> inline async_send_impl(
140130
coro_io::ib_socket_t& ib_socket, std::span<ibv_sge> sge_list,
141-
std::size_t io_size) {
142-
std::vector<ib_buffer_t> tmp_buffers;
131+
std::size_t io_size,
132+
std::optional<
133+
async_simple::Future<std::pair<std::error_code, std::size_t>>>&
134+
prev_op) {
135+
if (io_size == 0) [[unlikely]] {
136+
co_return std::pair{std::error_code{}, 0};
137+
}
138+
ib_buffer_t buffer;
143139
ibv_sge socket_buffer;
144-
socket_buffer = ib_socket.get_send_buffer();
145-
if (socket_buffer.length == 0) [[unlikely]] {
146-
co_return std::pair{std::make_error_code(std::errc::no_buffer_space),
147-
std::size_t{0}};
140+
std::span<ibv_sge> list;
141+
if (ib_socket.get_config().cap.max_inline_data >= io_size) {
142+
if (sge_list.size() <= ib_socket.get_config().cap.max_send_sge) {
143+
list = sge_list;
144+
}
145+
}
146+
else {
147+
buffer = ib_socket.buffer_pool()->get_buffer();
148+
if (!buffer || buffer->length < io_size) [[unlikely]] {
149+
co_return std::pair{std::make_error_code(std::errc::no_buffer_space),
150+
std::size_t{0}};
151+
}
152+
socket_buffer = buffer.subview();
153+
auto len = copy(sge_list, socket_buffer);
154+
assert(len == io_size);
155+
socket_buffer.length = io_size;
156+
list = {&socket_buffer, 1};
148157
}
149158

150-
auto len = copy(sge_list, socket_buffer);
151-
assert(len == io_size);
152-
153-
std::span<ibv_sge> io_buffer;
154-
io_buffer = {&socket_buffer, 1};
155-
socket_buffer.length = io_size;
156-
auto result =
157-
co_await coro_io::async_io<std::pair<std::error_code, std::size_t>>(
158-
[&](auto&& cb) {
159-
ib_socket.post_send(io_buffer, std::move(cb));
160-
},
161-
ib_socket);
162-
if (result.first) [[unlikely]] {
163-
co_return std::pair{result.first, result.second};
159+
async_simple::Promise<std::pair<std::error_code, std::size_t>> promise;
160+
std::pair<std::error_code, std::size_t> result{};
161+
if (prev_op) {
162+
result = co_await std::move(*prev_op);
163+
}
164+
prev_op = promise.getFuture();
165+
auto slot = co_await async_simple::coro::CurrentSlot{};
166+
auto work = coro_io::async_io<std::pair<std::error_code, std::size_t>>(
167+
[&ib_socket, list](auto&& cb) mutable {
168+
ib_socket.post_send(list, std::move(cb));
169+
},
170+
ib_socket);
171+
auto cb = [p = std::move(promise), io_size = io_size,
172+
buffer = std::move(buffer)](auto&& result) mutable {
173+
if (buffer) {
174+
std::move(buffer).collect();
175+
}
176+
if (!result.hasError()) {
177+
result.value().second = io_size;
178+
}
179+
p.setValue(std::move(result));
180+
};
181+
if (slot) {
182+
std::move(work).setLazyLocal(slot->signal()).start(std::move(cb));
183+
}
184+
else {
185+
std::move(work).start(std::move(cb));
164186
}
165187

166-
co_return std::pair{result.first, io_size};
188+
co_return std::pair{result.first, result.second};
167189
}
168190

169191
template <typename T>
@@ -258,6 +280,8 @@ async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
258280

259281
std::size_t block_size;
260282
uint32_t now_split_size = 0;
283+
std::optional<async_simple::Future<std::pair<std::error_code, std::size_t>>>
284+
future;
261285
for (auto& sge : sge_span) {
262286
for (std::size_t i = 0; i < sge.length; i += block_size) {
263287
block_size =
@@ -284,47 +308,45 @@ async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
284308
}
285309
else {
286310
std::tie(ec, len) = co_await async_send_impl(
287-
ib_socket, split_sge_block, now_split_size);
311+
ib_socket, split_sge_block, now_split_size, future);
288312
}
289-
290313
io_completed_size += len;
291314
ELOG_TRACE << "has completed size:" << io_completed_size;
292315
if (ec) {
293316
co_return std::pair{ec, io_completed_size};
294317
}
295318

296-
if (len == 0 || len > now_split_size) [[unlikely]] {
297-
ELOG_ERROR << "read size error, it shouldn't be:" << len;
298-
co_return std::pair{std::make_error_code(std::errc::io_error),
299-
io_completed_size};
300-
}
301-
302-
if (read_some) {
303-
co_return std::pair{ec, io_completed_size};
304-
}
305-
306-
if (len < now_split_size) [[unlikely]] {
307-
reset_buffer(split_sge_block, len);
319+
if constexpr (io == ib_socket_t::io_type::recv) {
320+
if (read_some) {
321+
co_return std::pair{ec, io_completed_size};
322+
}
323+
if (len < now_split_size) [[unlikely]] {
324+
reset_buffer(split_sge_block, len);
325+
}
326+
else {
327+
split_sge_block.clear();
328+
}
329+
now_split_size -= len;
308330
}
309331
else {
310332
split_sge_block.clear();
333+
now_split_size = 0;
311334
}
312-
now_split_size -= len;
313335
}
314336
}
315337
}
316338

317339
std::error_code ec;
318340
std::size_t len = 0;
319341
while (now_split_size > 0) {
320-
reset_buffer(split_sge_block, len);
321342
if constexpr (io == ib_socket_t::io_type::recv) {
343+
reset_buffer(split_sge_block, len);
322344
std::tie(ec, len) =
323345
co_await async_recv_impl(ib_socket, split_sge_block, now_split_size);
324346
}
325347
else {
326-
std::tie(ec, len) =
327-
co_await async_send_impl(ib_socket, split_sge_block, now_split_size);
348+
std::tie(ec, len) = co_await async_send_impl(ib_socket, split_sge_block,
349+
now_split_size, future);
328350
}
329351

330352
io_completed_size += len;
@@ -333,17 +355,24 @@ async_io_split(coro_io::ib_socket_t& ib_socket, Buffer&& raw_buffer,
333355
co_return std::pair{ec, io_completed_size};
334356
}
335357

336-
if (len == 0 || len > now_split_size) [[unlikely]] {
337-
ELOG_ERROR << ((io == ib_socket_t::io_type::recv) ? "recv " : "send")
338-
<< "size error, it shouldn't be:" << len;
339-
co_return std::pair{std::make_error_code(std::errc::io_error),
340-
io_completed_size};
358+
if constexpr (io == ib_socket_t::io_type::recv) {
359+
if (read_some) {
360+
co_return std::pair{ec, io_completed_size};
361+
}
362+
now_split_size -= len;
341363
}
342-
343-
if (read_some) {
344-
co_return std::pair{ec, io_completed_size};
364+
else {
365+
now_split_size = 0;
366+
}
367+
}
368+
if constexpr (io == ib_socket_t::io_type::send) {
369+
if (future) {
370+
std::tie(ec, len) = co_await std::move(*future);
371+
io_completed_size += len;
372+
if (ec) [[unlikely]] {
373+
co_return std::pair{ec, io_completed_size};
374+
}
345375
}
346-
now_split_size -= len;
347376
}
348377
co_return std::pair{ec, io_completed_size};
349378
}

0 commit comments

Comments
 (0)