Skip to content

[coro_rpc][rdma]coro_rpc_rdma #955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 75 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
714c03f
rdma devices
qicosmos Apr 9, 2025
d40ce0d
devices tests
qicosmos Apr 9, 2025
91bce25
test device
qicosmos Apr 9, 2025
d64c470
rename
qicosmos Apr 9, 2025
c4ba03a
simplify
qicosmos Apr 9, 2025
da447d3
add more code
poor-circle Apr 9, 2025
0b1a678
add ib_socket
poor-circle Apr 21, 2025
9191f4c
fix
poor-circle Apr 21, 2025
6bd3349
fix rdma
poor-circle Apr 22, 2025
f999fba
add ownership for ib_device_t
poor-circle Apr 22, 2025
b09d51a
format
qicosmos Apr 22, 2025
0541579
ib error
qicosmos Apr 22, 2025
9e1cb36
add ib err msg
qicosmos Apr 22, 2025
e8fbcf4
fix wr_id
poor-circle Apr 22, 2025
8acf5af
support cancel
poor-circle Apr 22, 2025
b6a63f6
add err
poor-circle Apr 23, 2025
c4d7201
choose best gid index
qicosmos Apr 25, 2025
af0a1dd
refact ib_io
poor-circle Apr 25, 2025
2c9c97e
qp reset
qicosmos Apr 27, 2025
b164976
log level
qicosmos Apr 27, 2025
27df175
fix resume; move qp to state; increase rnr timer and retry count
qicosmos Apr 27, 2025
40a2cba
add test for fix size read/write
poor-circle Apr 27, 2025
32504b9
fix poll_completion and free ibv resources
qicosmos Apr 27, 2025
8910e3d
mtu
qicosmos Apr 27, 2025
65b4fb9
fix read bigger than 8m
poor-circle Apr 27, 2025
4f6c707
fix read some
poor-circle Apr 28, 2025
8628330
fix to support max_sge
poor-circle Apr 28, 2025
a42707a
refact code
poor-circle Apr 29, 2025
218baa9
fix
poor-circle Apr 29, 2025
073d3c2
add more test
poor-circle Apr 29, 2025
b3310d9
add more test
poor-circle Apr 29, 2025
7879de6
fix
poor-circle Apr 29, 2025
33adaa7
fix
poor-circle Apr 30, 2025
43b7737
fix
poor-circle Apr 30, 2025
082b39f
format
qicosmos May 6, 2025
ff669e4
simplify some
qicosmos May 6, 2025
9b43e45
Reduce cyclomatic complexity
qicosmos May 6, 2025
610e994
rename
qicosmos May 7, 2025
f8643ed
adjust structure
qicosmos May 7, 2025
a770797
rename variable
qicosmos May 7, 2025
d00dacd
improve some
qicosmos May 7, 2025
1934431
rename
qicosmos May 7, 2025
8e2d5d1
merge some code
qicosmos May 7, 2025
b1d7533
fix
poor-circle May 8, 2025
453fe9a
remove zero copy
power-more May 8, 2025
fb29135
fix data race; fix cb
qicosmos May 12, 2025
6386286
fix compile
qicosmos May 12, 2025
26ac99e
safe_resume
qicosmos May 13, 2025
ed6b8d9
add support for rdma
poor-circle May 14, 2025
facdea0
fix compile
qicosmos May 15, 2025
cc24c8c
add buffer
poor-circle May 15, 2025
a1bcd07
fix
poor-circle May 15, 2025
ad6f4e1
rmda bench
qicosmos May 16, 2025
bc1e2c8
fix recv len
qicosmos May 16, 2025
2c45f76
fix compile in gcc
poor-circle May 16, 2025
6904960
format code
poor-circle May 16, 2025
7d632a6
fix log and case
qicosmos May 19, 2025
d74b5f6
fix config
qicosmos May 19, 2025
8fa8a8e
fix warning
qicosmos May 19, 2025
007f764
simplify some
qicosmos May 20, 2025
faa1eda
for gcc compile
qicosmos May 20, 2025
628f727
conf with port
qicosmos May 20, 2025
ca0d534
fix post send
qicosmos May 20, 2025
67a33ab
fix log; set log level
qicosmos May 20, 2025
4a8223b
improve ib conf
qicosmos May 20, 2025
4165e7e
cmake auto find ibverbs
qicosmos May 20, 2025
6f95e1b
can switch tcp and ibverbs
qicosmos May 20, 2025
235ad45
sync code
qicosmos May 20, 2025
2e5bb62
compile
qicosmos May 20, 2025
ee5b89f
compile
qicosmos May 20, 2025
997d5ed
compile
qicosmos May 20, 2025
650601f
remove log
qicosmos May 20, 2025
54be5d8
remove log
qicosmos May 20, 2025
ce1e8d4
revert
qicosmos May 21, 2025
a63d4e9
fix reset
qicosmos May 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ if(CMAKE_PROJECT_NAME STREQUAL "yaLanTingLibs") # if ylt is top-level project
include_directories(src/include)

include(cmake/find_openssl.cmake)
include(cmake/find_ibverbs.cmake)
include(cmake/utils.cmake)
include(cmake/build.cmake)
include(cmake/develop.cmake)
Expand Down
11 changes: 11 additions & 0 deletions cmake/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ if (YLT_ENABLE_SSL)
target_link_libraries(yalantinglibs INTERFACE OpenSSL::SSL OpenSSL::Crypto)
endif ()
endif ()
option(YLT_ENABLE_IBV "Enable ibverbs support" OFF)
if (YLT_ENABLE_IBV)
message(STATUS "Enable ibverbs support")
if(CMAKE_PROJECT_NAME STREQUAL "yaLanTingLibs")
add_compile_definitions("YLT_ENABLE_IBV")
link_libraries(-libverbs)
else ()
target_compile_definitions(yalantinglibs INTERFACE "YLT_ENABLE_IBV")
target_link_libraries(yalantinglibs -libverbs)
endif ()
endif ()

option(YLT_ENABLE_PMR "Enable pmr support" OFF)
message(STATUS "YLT_ENABLE_PMR: ${YLT_ENABLE_PMR}")
Expand Down
6 changes: 6 additions & 0 deletions cmake/find_ibverbs.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
check_library_exists(ibverbs ibv_create_qp "" HAVE_IBVERBS_IBV)
if(HAVE_IBVERBS_IBV)
set(YLT_ENABLE_IBV ON)
set(IBVERBS_LIBRARY ibverbs)
message(STATUS "have libibverbs")
endif()
200 changes: 137 additions & 63 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <ostream>
#include <system_error>
#include <thread>
#include <type_traits>
#include <utility>

#include "asio/dispatch.hpp"
#include "asio/io_context.hpp"
#include "asio/ip/address.hpp"
#include "async_simple/Signal.h"
#include "async_simple/coro/FutureAwaiter.h"
#include "async_simple/coro/SpinLock.h"
#include "ylt/easylog.hpp"
#include "ylt/util/type_traits.h"

Expand All @@ -49,8 +50,6 @@
#include <asio/read_until.hpp>
#include <asio/write.hpp>
#include <asio/write_at.hpp>
#include <chrono>
#include <deque>

#include "io_context_pool.hpp"
#if __has_include("ylt/util/type_traits.h")
Expand All @@ -61,7 +60,6 @@
#ifdef __linux__
#include <sys/sendfile.h>
#endif

namespace coro_io {
template <typename T>
constexpr inline bool is_lazy_v =
Expand Down Expand Up @@ -136,6 +134,8 @@ class callback_awaitor_base {
}
void resume() const { obj->coro_.resume(); }

auto handler() const { return (std::size_t)obj; }

private:
Derived *obj;
};
Expand Down Expand Up @@ -185,6 +185,30 @@ struct post_helper {
Func func;
};

template <typename R, typename Func, typename Executor>
struct dispatch_helper {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();
handler.resume();
}
else {
auto r = func();
handler.set_value_then_resume(std::move(r));
}
} catch (const std::exception &e) {
R er;
er.setException(std::current_exception());
handler.set_value_then_resume(std::move(er));
}
});
}
Executor e;
Func func;
};

template <typename Func, typename Executor>
inline async_simple::coro::Lazy<
async_simple::Try<typename util::function_traits<Func>::return_type>>
Expand All @@ -205,6 +229,17 @@ post(Func func,
return post(std::move(func), e->get_asio_executor());
}

template <typename Func, typename Executor>
inline async_simple::coro::Lazy<
async_simple::Try<typename util::function_traits<Func>::return_type>>
dispatch(Func func, Executor executor) {
using R =
async_simple::Try<typename util::function_traits<Func>::return_type>;
callback_awaitor<R> awaitor;
dispatch_helper<R, Func, Executor> helper{executor, std::move(func)};
co_return co_await awaitor.await_resume(helper);
}

namespace detail {

template <typename T>
Expand Down Expand Up @@ -261,7 +296,7 @@ inline async_simple::coro::Lazy<ret_type> async_io(IO_func io_func,
}
}
});
if (hasCanceled) {
if (hasCanceled) [[unlikely]] {
asio::dispatch(executor, [handler]() {
handler.set_value(
std::make_error_code(std::errc::operation_canceled));
Expand Down Expand Up @@ -324,7 +359,8 @@ inline async_simple::coro::Lazy<std::error_code> async_accept(
asio::ip::tcp::acceptor &acceptor, asio::ip::tcp::socket &socket) noexcept {
return async_io<std::error_code>(
[&](auto &&cb) {
acceptor.async_accept(socket, std::move(cb));
ELOG_INFO << "call asio acceptor.async_accept";
acceptor.async_accept(socket, cb);
},
acceptor);
}
Expand Down Expand Up @@ -415,37 +451,56 @@ template <typename executor_t>
inline async_simple::coro::Lazy<std::error_code> async_connect(
executor_t *executor, asio::ip::tcp::socket &socket,
const std::string &host, const std::string &port) noexcept {
asio::ip::tcp::resolver resolver(executor->get_asio_executor());
auto result = co_await async_io<
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
[&](auto &&cb) {
resolver.async_resolve(host, port, std::move(cb));
},
resolver);

if (result.first) {
co_return result.first;
std::error_code ec;
auto address = asio::ip::make_address(host, ec);
std::pair<std::error_code, asio::ip::tcp::resolver::iterator> result;
if (ec) {
asio::ip::tcp::resolver resolver(executor->get_asio_executor());
result = co_await async_io<
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
[&](auto &&cb) {
ELOG_INFO << "call asio resolver.async_resolve";
resolver.async_resolve(host, port, std::move(cb));
ELOG_INFO << "call asio resolver.async_resolve over, waiting cb";
},
resolver);
ELOG_INFO << "call asio resolver.async_resolve cbover";
if (result.first) {
co_return result.first;
}
co_return co_await async_io<std::error_code>(
[&](auto &&cb) {
ELOG_INFO << "call asio socket.async_connect";
asio::async_connect(socket, result.second, std::move(cb));
},
socket);
}
result = co_await async_io<
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
else {
ELOG_INFO << "direct call without resolve";
uint16_t port_v;
auto result =
std::from_chars(port.data(), port.data() + port.size(), port_v);
if (result.ec != std::errc{}) {
co_return std::make_error_code(result.ec);
}
asio::ip::tcp::endpoint ep{address, port_v};
co_return co_await async_io<std::error_code>(
[&](auto &&cb) {
ELOG_INFO << "call asio socket.async_connect";
asio::async_connect(socket, std::span{&ep, 1}, std::move(cb));
},
socket);
}
}

template <typename EndPointSeq>
inline async_simple::coro::Lazy<std::error_code> async_connect(
asio::ip::tcp::socket &socket, const EndPointSeq &endpoint) noexcept {
auto result = co_await async_io<std::error_code>(
[&](auto &&cb) {
asio::async_connect(socket, result.second, std::move(cb));
asio::async_connect(socket, endpoint, std::move(cb));
},
socket);
co_return result.first;
}

template <typename executor_t, typename EndPointSeq>
inline async_simple::coro::Lazy<
std::pair<std::error_code, asio::ip::tcp::endpoint>>
async_connect(executor_t *executor, asio::ip::tcp::socket &socket,
const EndPointSeq &endpoint) noexcept {
auto result =
co_await async_io<std::pair<std::error_code, asio::ip::tcp::endpoint>>(
[&](auto &&cb) {
asio::async_connect(socket, endpoint, std::move(cb));
},
socket);
co_return result;
}

Expand All @@ -463,6 +518,20 @@ async_resolve(executor_t *executor, asio::ip::tcp::socket &socket,
resolver);
}

template <typename executor_t>
inline async_simple::coro::Lazy<
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>
async_resolve(executor_t *executor, const std::string &host,
const std::string &port) noexcept {
asio::ip::tcp::resolver resolver(executor->get_asio_executor());
co_return co_await async_io<
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
[&](auto &&cb) {
resolver.async_resolve(host, port, std::move(cb));
},
resolver);
}

template <typename Socket>
inline async_simple::coro::Lazy<void> async_close(Socket &socket) noexcept {
callback_awaitor<void> awaitor;
Expand All @@ -479,13 +548,37 @@ inline async_simple::coro::Lazy<void> async_close(Socket &socket) noexcept {

#if defined(YLT_ENABLE_SSL) || defined(CINATRA_ENABLE_SSL)
inline async_simple::coro::Lazy<std::error_code> async_handshake(
auto &ssl_stream, asio::ssl::stream_base::handshake_type type) noexcept {
auto &&ssl_stream, asio::ssl::stream_base::handshake_type type) noexcept {
return async_io<std::error_code>(
[&, type](auto &&cb) {
ssl_stream->async_handshake(type, std::move(cb));
},
*ssl_stream);
}
template <typename executor_t>
inline async_simple::coro::Lazy<std::error_code> async_connect(
executor_t *executor, asio::ssl::stream<asio::ip::tcp::socket &> &socket,
const std::string &host, const std::string &port) noexcept {
auto ec = co_await async_connect(executor, socket, host, port);
if (ec) [[unlikely]] {
co_return ec;
}
ec = co_await coro_io::async_handshake(&socket,
asio::ssl::stream_base::client);
co_return ec;
}
template <typename EndPointSeq>
inline async_simple::coro::Lazy<std::error_code> async_connect(
asio::ssl::stream<asio::ip::tcp::socket &> &socket,
const EndPointSeq &endpoint) noexcept {
auto ec = co_await async_connect(socket.next_layer(), endpoint);
if (ec) [[unlikely]] {
co_return ec;
}
ec = co_await coro_io::async_handshake(&socket,
asio::ssl::stream_base::client);
co_return ec;
}
#endif
class period_timer : public asio::steady_timer {
public:
Expand Down Expand Up @@ -689,34 +782,15 @@ async_sendfile(asio::ip::tcp::socket &socket, int fd, off_t offset,
}
#endif

struct socket_wrapper_t {
socket_wrapper_t(asio::ip::tcp::socket &&soc,
coro_io::ExecutorWrapper<> *executor)
: socket_(std::make_unique<asio::ip::tcp::socket>(std::move(soc))),
executor_(executor) {}

private:
std::unique_ptr<asio::ip::tcp::socket> socket_;
coro_io::ExecutorWrapper<> *executor_;
#ifdef YLT_ENABLE_SSL
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream_;
#endif
public:
bool use_ssl() const noexcept {
#ifdef YLT_ENABLE_SSL
return ssl_stream_ != nullptr;
#else
return false;
#endif
}
auto get_executor() const noexcept { return executor_; }
std::unique_ptr<asio::ip::tcp::socket> &socket() noexcept { return socket_; }
#ifdef YLT_ENABLE_SSL
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>>
&ssl_stream() noexcept {
return ssl_stream_;
}
#endif
enum protocal { tcp, tcp_with_ssl, rdma };
struct endpoint {
asio::ip::address address;
asio::ip::port_type port;
protocal proto;
};

inline std::ostream &operator<<(std::ostream &stream, const endpoint &ep) {
return stream << ep.address.to_string() << ":" << ep.port;
}

} // namespace coro_io
Loading
Loading