Skip to content

Commit c9aa3a0

Browse files
authored
[coro_io][ibverbs] save memory usage (#964)
save memory usage
1 parent 8ed97a4 commit c9aa3a0

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

include/ylt/coro_io/ibverbs/ib_socket.hpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,8 @@ struct ib_socket_shared_state_t {
180180
if (wc.wr_id == 0) { // recv
181181
if (recv_cb_) {
182182
assert(recv_result.empty());
183-
bool could_insert = (recv_queue_.size() <= recv_buffer_cnt_);
184183
recv_buf_ = recv_queue_.pop();
185-
if (could_insert) {
184+
if (recv_queue_.empty()) {
186185
if (recv_queue_.push(ib_buffer_pool_->get_buffer(), this)) {
187186
std::error_code ec;
188187
close(ec);
@@ -191,13 +190,13 @@ struct ib_socket_shared_state_t {
191190
resume(std::pair{ec, (std::size_t)wc.byte_len}, recv_cb_);
192191
}
193192
else {
194-
if (!recv_queue_.full()) {
193+
recv_result.push(std::pair{ec, (std::size_t)wc.byte_len});
194+
if (!recv_queue_.full() && recv_result.size()==recv_queue_.size()) {
195195
if (recv_queue_.push(ib_buffer_pool_->get_buffer(), this)) {
196196
std::error_code ec;
197197
close(ec);
198198
}
199199
}
200-
recv_result.push(std::pair{ec, (std::size_t)wc.byte_len});
201200
}
202201
}
203202
else {

src/coro_io/tests/ibverbs/test_io.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <atomic>
22
#include <chrono>
33
#include <cmath>
4+
#include <cstdint>
45
#include <ostream>
56
#include <string>
67
#include <string_view>
@@ -46,6 +47,8 @@ config_t config;
4647

4748
std::atomic<uint64_t> cnt[2];
4849

50+
std::atomic<uint64_t> connect_cnt;
51+
4952
using namespace std::chrono_literals;
5053
using namespace std::literals;
5154
async_simple::coro::Lazy<std::error_code> echo_connect(
@@ -185,12 +188,15 @@ async_simple::coro::Lazy<std::error_code> echo_accept() {
185188

186189
ELOG_INFO << "start new connection";
187190
auto executor = soc.get_executor();
191+
++connect_cnt;
188192
if (config.test_type == 0) {
189193
echo_connect(std::move(soc)).start([](auto &&) {
194+
--connect_cnt;
190195
});
191196
}
192197
else {
193198
echo_connect_read_some(std::move(soc)).start([](auto &&) {
199+
--connect_cnt;
194200
});
195201
}
196202
}
@@ -297,12 +303,15 @@ async_simple::coro::Lazy<std::error_code> echo_connect() {
297303
s.resize(config.request_size, 'A');
298304
uint64_t &sz = *(uint64_t *)s.data();
299305
sz = config.request_size - 8;
306+
++connect_cnt;
300307
if (config.test_type == 0) {
301-
co_return co_await echo_client(soc, s);
308+
ec = co_await echo_client(soc, s);
302309
}
303310
else {
304-
co_return co_await echo_client_read_some(soc, s);
311+
ec = co_await echo_client_read_some(soc, s);
305312
}
313+
--connect_cnt;
314+
co_return ec;
306315
}
307316

308317
int main() {
@@ -345,7 +354,7 @@ int main() {
345354
for (int i = 0; i < config.test_time; ++i) {
346355
std::this_thread::sleep_for(std::chrono::seconds{1});
347356
auto c = cnt_p->exchange(0);
348-
std::cout << "Throughput:" << 8.0 * c / 1000'000'000 << " Gb/s"
357+
std::cout << "Throughput:" << 8.0 * c / 1000'000'000 << " Gb/s, alive connection:" << connect_cnt
349358
<< std::endl;
350359
}
351360
return 0;

0 commit comments

Comments
 (0)