Skip to content

Commit 29eb2a1

Browse files
authored
fix rdma buffer pool (#958)
* fix buffer pool * fix max send sge
1 parent e3a5f98 commit 29eb2a1

File tree

5 files changed

+34
-15
lines changed

5 files changed

+34
-15
lines changed

include/ylt/coro_io/ibverbs/ib_buffer.hpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,15 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
145145
void collect_free_inner(ib_buffer_t buffer) {
146146
if (buffer) {
147147
ELOG_TRACE << "collecting:" << (void*)buffer->addr;
148-
if (free_buffers_.size() < pool_config_.max_buffer_count) {
148+
if (free_buffers_.size() * pool_config_.buffer_size <
149+
pool_config_.max_memory_usage) {
149150
ELOG_TRACE << "collect free buffer{data:" << buffer->addr << ",len"
150151
<< buffer->length << "} enqueue";
151152
enqueue(std::move(buffer));
152153
}
153154
else {
154155
ELOG_TRACE << "out of max connection limit <<"
155-
<< pool_config_.max_buffer_count
156+
<< pool_config_.max_memory_usage
156157
<< "buffer{data:" << buffer->addr << ",len" << buffer->length
157158
<< "} wont be collect";
158159
}
@@ -164,6 +165,9 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
164165
};
165166

166167
public:
168+
std::size_t max_buffer_size() const noexcept {
169+
return this->pool_config_.buffer_size;
170+
}
167171
void collect_free(ib_buffer_t buffer) {
168172
if (buffer) {
169173
buffer.change_owner(weak_from_this());
@@ -183,7 +187,7 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
183187
if (pool_config_.max_memory_usage <
184188
pool_config_.buffer_size +
185189
total_memory_.load(std::memory_order_acquire)) [[unlikely]] {
186-
ELOG_TRACE << "Memory out of pool limit";
190+
ELOG_WARN << "Memory out of pool limit";
187191
// return std::move(buffer);
188192
}
189193
auto ptr = malloc(pool_config_.buffer_size);
@@ -212,9 +216,8 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
212216
return buffer;
213217
}
214218
struct config_t {
215-
size_t buffer_size = 8 * 1024 * 1024; // 16MB
216-
size_t max_buffer_count = 64;
217-
size_t max_memory_usage = 512 * 1024 * 1024; // 512MB
219+
size_t buffer_size = 4 * 1024 * 1024; // 4MB
220+
uint64_t max_memory_usage = 4ull * 1024 * 1024 * 1024; // 4GB
218221
size_t idle_queue_per_max_clear_count = 1000;
219222
std::chrono::milliseconds idle_timeout = std::chrono::milliseconds{5000};
220223
};

include/ylt/coro_io/ibverbs/ib_io.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ async_simple::coro::
143143
std::vector<ib_buffer_t> tmp_buffers;
144144
ibv_sge socket_buffer;
145145
socket_buffer = ib_socket.get_send_buffer();
146+
if (socket_buffer.length == 0) [[unlikely]] {
147+
co_return std::pair{std::make_error_code(std::errc::no_buffer_space),
148+
std::size_t{0}};
149+
}
146150

147151
auto len = copy(sge_list, socket_buffer);
148152
assert(len == io_size);

include/ylt/coro_io/ibverbs/ib_socket.hpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ struct buffer_queue {
5353
std::error_code post_recv_real(ibv_sge buffer, shared_state_t* state);
5454
std::error_code push(ib_buffer_t buf, shared_state_t* state) {
5555
ELOG_TRACE << "push. now buffer size:" << size();
56+
if (buf->length == 0) {
57+
ELOG_INFO << "Out of ib_buffer_pool limit." << size();
58+
return std::make_error_code(std::errc::no_buffer_space);
59+
}
5660
auto ec = post_recv_real(buf.subview(), state);
5761
if (!ec) {
5862
front = (front + 1) % queue.size();
@@ -241,14 +245,12 @@ inline std::error_code buffer_queue::post_recv_real(ibv_sge buffer,
241245
#ifdef YLT_ENABLE_IBV
242246
struct ibverbs_config {
243247
uint32_t cq_size = 1024;
244-
uint32_t request_buffer_size = 8 * 1024 * 1024;
248+
uint32_t request_buffer_size = 4 * 1024 * 1024;
245249
uint32_t recv_buffer_cnt = 2;
246-
std::chrono::milliseconds tcp_handshake_timeout =
247-
std::chrono::milliseconds{1000};
248250
ibv_qp_type qp_type = IBV_QPT_RC;
249-
ibv_qp_cap cap = {.max_send_wr = 8,
250-
.max_recv_wr = 8,
251-
.max_send_sge = 6,
251+
ibv_qp_cap cap = {.max_send_wr = 32,
252+
.max_recv_wr = 32,
253+
.max_send_sge = 1,
252254
.max_recv_sge = 1,
253255
.max_inline_data = 0};
254256
std::shared_ptr<coro_io::ib_device_t> device;
@@ -388,6 +390,11 @@ class ib_socket_t {
388390
buffer_size_ = std::min<uint32_t>(peer_info.buffer_size,
389391
get_config().request_buffer_size);
390392
ELOGV(INFO, "Final buffer size = %d", buffer_size_);
393+
if (buffer_size_ > buffer_pool()->max_buffer_size()) {
394+
ELOGV(ERROR, "Buffer size larger than buffer_pool limit: %d",
395+
buffer_size_);
396+
co_return std::make_error_code(std::errc::no_buffer_space);
397+
}
391398
try {
392399
init_qp();
393400
modify_qp_to_init();
@@ -492,6 +499,11 @@ class ib_socket_t {
492499
buffer_size_ = std::min<uint32_t>(peer_info.buffer_size,
493500
get_config().request_buffer_size);
494501
ELOGV(INFO, "Final buffer size = %d", buffer_size_);
502+
if (buffer_size_ > buffer_pool()->max_buffer_size()) {
503+
ELOGV(ERROR, "Buffer size larger than buffer_pool limit: %d",
504+
buffer_size_);
505+
co_return std::make_error_code(std::errc::no_buffer_space);
506+
}
495507
modify_qp_to_rtr(peer_info.qp_num, peer_info.lid,
496508
(uint8_t*)peer_info.gid);
497509

src/coro_io/tests/ibverbs/test_io.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
#include "ylt/struct_json/json_reader.h"
3030
#include "ylt/struct_json/json_writer.h"
3131
struct config_t {
32-
std::size_t buffer_size = 8 * 1024 * 1024;
33-
std::size_t request_size = 8 * 1024 * 1024;
32+
std::size_t buffer_size = 4 * 1024 * 1024;
33+
std::size_t request_size = 4 * 1024 * 1024;
3434
int concurrency = 2;
3535
int test_type = 0;
3636
int enable_log = 0;

src/coro_io/tests/ibverbs/test_io2.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#include "ylt/easylog/record.hpp"
2626
#include "ylt/struct_pack/util.h"
2727

28-
std::size_t buffer_size = 8 * 1024 * 1024;
28+
std::size_t buffer_size = 4 * 1024 * 1024;
2929
int concurrency = 10;
3030
std::atomic<int> port;
3131

0 commit comments

Comments
 (0)