Skip to content

Commit 2dbf225

Browse files
authored
[coro_rpc][rdma]coro_rpc_rdma (#955)
1 parent 9bd0ae6 commit 2dbf225

29 files changed

+4222
-360
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ if(CMAKE_PROJECT_NAME STREQUAL "yaLanTingLibs") # if ylt is top-level project
2222
include_directories(src/include)
2323

2424
include(cmake/find_openssl.cmake)
25+
include(cmake/find_ibverbs.cmake)
2526
include(cmake/utils.cmake)
2627
include(cmake/build.cmake)
2728
include(cmake/develop.cmake)

cmake/config.cmake

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@ if (YLT_ENABLE_SSL)
1111
target_link_libraries(yalantinglibs INTERFACE OpenSSL::SSL OpenSSL::Crypto)
1212
endif ()
1313
endif ()
14+
option(YLT_ENABLE_IBV "Enable ibverbs support" OFF)
15+
if (YLT_ENABLE_IBV)
16+
message(STATUS "Enable ibverbs support")
17+
if(CMAKE_PROJECT_NAME STREQUAL "yaLanTingLibs")
18+
add_compile_definitions("YLT_ENABLE_IBV")
19+
link_libraries(-libverbs)
20+
else ()
21+
target_compile_definitions(yalantinglibs INTERFACE "YLT_ENABLE_IBV")
22+
target_link_libraries(yalantinglibs -libverbs)
23+
endif ()
24+
endif ()
1425

1526
option(YLT_ENABLE_PMR "Enable pmr support" OFF)
1627
message(STATUS "YLT_ENABLE_PMR: ${YLT_ENABLE_PMR}")

cmake/find_ibverbs.cmake

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
check_library_exists(ibverbs ibv_create_qp "" HAVE_IBVERBS_IBV)
2+
if(HAVE_IBVERBS_IBV)
3+
set(YLT_ENABLE_IBV ON)
4+
set(IBVERBS_LIBRARY ibverbs)
5+
message(STATUS "have libibverbs")
6+
endif()

include/ylt/coro_io/coro_io.hpp

Lines changed: 137 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,18 @@
2323

2424
#include <atomic>
2525
#include <cstddef>
26+
#include <cstdint>
2627
#include <memory>
28+
#include <ostream>
2729
#include <system_error>
2830
#include <thread>
2931
#include <type_traits>
3032
#include <utility>
3133

3234
#include "asio/dispatch.hpp"
3335
#include "asio/io_context.hpp"
36+
#include "asio/ip/address.hpp"
3437
#include "async_simple/Signal.h"
35-
#include "async_simple/coro/FutureAwaiter.h"
36-
#include "async_simple/coro/SpinLock.h"
3738
#include "ylt/easylog.hpp"
3839
#include "ylt/util/type_traits.h"
3940

@@ -49,8 +50,6 @@
4950
#include <asio/read_until.hpp>
5051
#include <asio/write.hpp>
5152
#include <asio/write_at.hpp>
52-
#include <chrono>
53-
#include <deque>
5453

5554
#include "io_context_pool.hpp"
5655
#if __has_include("ylt/util/type_traits.h")
@@ -61,7 +60,6 @@
6160
#ifdef __linux__
6261
#include <sys/sendfile.h>
6362
#endif
64-
6563
namespace coro_io {
6664
template <typename T>
6765
constexpr inline bool is_lazy_v =
@@ -136,6 +134,8 @@ class callback_awaitor_base {
136134
}
137135
void resume() const { obj->coro_.resume(); }
138136

137+
auto handler() const { return (std::size_t)obj; }
138+
139139
private:
140140
Derived *obj;
141141
};
@@ -185,6 +185,30 @@ struct post_helper {
185185
Func func;
186186
};
187187

188+
template <typename R, typename Func, typename Executor>
189+
struct dispatch_helper {
190+
void operator()(auto handler) {
191+
asio::dispatch(e, [this, handler]() {
192+
try {
193+
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
194+
func();
195+
handler.resume();
196+
}
197+
else {
198+
auto r = func();
199+
handler.set_value_then_resume(std::move(r));
200+
}
201+
} catch (const std::exception &e) {
202+
R er;
203+
er.setException(std::current_exception());
204+
handler.set_value_then_resume(std::move(er));
205+
}
206+
});
207+
}
208+
Executor e;
209+
Func func;
210+
};
211+
188212
template <typename Func, typename Executor>
189213
inline async_simple::coro::Lazy<
190214
async_simple::Try<typename util::function_traits<Func>::return_type>>
@@ -205,6 +229,17 @@ post(Func func,
205229
return post(std::move(func), e->get_asio_executor());
206230
}
207231

232+
template <typename Func, typename Executor>
233+
inline async_simple::coro::Lazy<
234+
async_simple::Try<typename util::function_traits<Func>::return_type>>
235+
dispatch(Func func, Executor executor) {
236+
using R =
237+
async_simple::Try<typename util::function_traits<Func>::return_type>;
238+
callback_awaitor<R> awaitor;
239+
dispatch_helper<R, Func, Executor> helper{executor, std::move(func)};
240+
co_return co_await awaitor.await_resume(helper);
241+
}
242+
208243
namespace detail {
209244

210245
template <typename T>
@@ -261,7 +296,7 @@ inline async_simple::coro::Lazy<ret_type> async_io(IO_func io_func,
261296
}
262297
}
263298
});
264-
if (hasCanceled) {
299+
if (hasCanceled) [[unlikely]] {
265300
asio::dispatch(executor, [handler]() {
266301
handler.set_value(
267302
std::make_error_code(std::errc::operation_canceled));
@@ -324,7 +359,8 @@ inline async_simple::coro::Lazy<std::error_code> async_accept(
324359
asio::ip::tcp::acceptor &acceptor, asio::ip::tcp::socket &socket) noexcept {
325360
return async_io<std::error_code>(
326361
[&](auto &&cb) {
327-
acceptor.async_accept(socket, std::move(cb));
362+
ELOG_INFO << "call asio acceptor.async_accept";
363+
acceptor.async_accept(socket, cb);
328364
},
329365
acceptor);
330366
}
@@ -415,37 +451,56 @@ template <typename executor_t>
415451
inline async_simple::coro::Lazy<std::error_code> async_connect(
416452
executor_t *executor, asio::ip::tcp::socket &socket,
417453
const std::string &host, const std::string &port) noexcept {
418-
asio::ip::tcp::resolver resolver(executor->get_asio_executor());
419-
auto result = co_await async_io<
420-
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
421-
[&](auto &&cb) {
422-
resolver.async_resolve(host, port, std::move(cb));
423-
},
424-
resolver);
425-
426-
if (result.first) {
427-
co_return result.first;
454+
std::error_code ec;
455+
auto address = asio::ip::make_address(host, ec);
456+
std::pair<std::error_code, asio::ip::tcp::resolver::iterator> result;
457+
if (ec) {
458+
asio::ip::tcp::resolver resolver(executor->get_asio_executor());
459+
result = co_await async_io<
460+
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
461+
[&](auto &&cb) {
462+
ELOG_INFO << "call asio resolver.async_resolve";
463+
resolver.async_resolve(host, port, std::move(cb));
464+
ELOG_INFO << "call asio resolver.async_resolve over, waiting cb";
465+
},
466+
resolver);
467+
ELOG_INFO << "call asio resolver.async_resolve cbover";
468+
if (result.first) {
469+
co_return result.first;
470+
}
471+
co_return co_await async_io<std::error_code>(
472+
[&](auto &&cb) {
473+
ELOG_INFO << "call asio socket.async_connect";
474+
asio::async_connect(socket, result.second, std::move(cb));
475+
},
476+
socket);
428477
}
429-
result = co_await async_io<
430-
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
478+
else {
479+
ELOG_INFO << "direct call without resolve";
480+
uint16_t port_v;
481+
auto result =
482+
std::from_chars(port.data(), port.data() + port.size(), port_v);
483+
if (result.ec != std::errc{}) {
484+
co_return std::make_error_code(result.ec);
485+
}
486+
asio::ip::tcp::endpoint ep{address, port_v};
487+
co_return co_await async_io<std::error_code>(
488+
[&](auto &&cb) {
489+
ELOG_INFO << "call asio socket.async_connect";
490+
asio::async_connect(socket, std::span{&ep, 1}, std::move(cb));
491+
},
492+
socket);
493+
}
494+
}
495+
496+
template <typename EndPointSeq>
497+
inline async_simple::coro::Lazy<std::error_code> async_connect(
498+
asio::ip::tcp::socket &socket, const EndPointSeq &endpoint) noexcept {
499+
auto result = co_await async_io<std::error_code>(
431500
[&](auto &&cb) {
432-
asio::async_connect(socket, result.second, std::move(cb));
501+
asio::async_connect(socket, endpoint, std::move(cb));
433502
},
434503
socket);
435-
co_return result.first;
436-
}
437-
438-
template <typename executor_t, typename EndPointSeq>
439-
inline async_simple::coro::Lazy<
440-
std::pair<std::error_code, asio::ip::tcp::endpoint>>
441-
async_connect(executor_t *executor, asio::ip::tcp::socket &socket,
442-
const EndPointSeq &endpoint) noexcept {
443-
auto result =
444-
co_await async_io<std::pair<std::error_code, asio::ip::tcp::endpoint>>(
445-
[&](auto &&cb) {
446-
asio::async_connect(socket, endpoint, std::move(cb));
447-
},
448-
socket);
449504
co_return result;
450505
}
451506

@@ -463,6 +518,20 @@ async_resolve(executor_t *executor, asio::ip::tcp::socket &socket,
463518
resolver);
464519
}
465520

521+
template <typename executor_t>
522+
inline async_simple::coro::Lazy<
523+
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>
524+
async_resolve(executor_t *executor, const std::string &host,
525+
const std::string &port) noexcept {
526+
asio::ip::tcp::resolver resolver(executor->get_asio_executor());
527+
co_return co_await async_io<
528+
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
529+
[&](auto &&cb) {
530+
resolver.async_resolve(host, port, std::move(cb));
531+
},
532+
resolver);
533+
}
534+
466535
template <typename Socket>
467536
inline async_simple::coro::Lazy<void> async_close(Socket &socket) noexcept {
468537
callback_awaitor<void> awaitor;
@@ -479,13 +548,37 @@ inline async_simple::coro::Lazy<void> async_close(Socket &socket) noexcept {
479548

480549
#if defined(YLT_ENABLE_SSL) || defined(CINATRA_ENABLE_SSL)
481550
inline async_simple::coro::Lazy<std::error_code> async_handshake(
482-
auto &ssl_stream, asio::ssl::stream_base::handshake_type type) noexcept {
551+
auto &&ssl_stream, asio::ssl::stream_base::handshake_type type) noexcept {
483552
return async_io<std::error_code>(
484553
[&, type](auto &&cb) {
485554
ssl_stream->async_handshake(type, std::move(cb));
486555
},
487556
*ssl_stream);
488557
}
558+
template <typename executor_t>
559+
inline async_simple::coro::Lazy<std::error_code> async_connect(
560+
executor_t *executor, asio::ssl::stream<asio::ip::tcp::socket &> &socket,
561+
const std::string &host, const std::string &port) noexcept {
562+
auto ec = co_await async_connect(executor, socket, host, port);
563+
if (ec) [[unlikely]] {
564+
co_return ec;
565+
}
566+
ec = co_await coro_io::async_handshake(&socket,
567+
asio::ssl::stream_base::client);
568+
co_return ec;
569+
}
570+
template <typename EndPointSeq>
571+
inline async_simple::coro::Lazy<std::error_code> async_connect(
572+
asio::ssl::stream<asio::ip::tcp::socket &> &socket,
573+
const EndPointSeq &endpoint) noexcept {
574+
auto ec = co_await async_connect(socket.next_layer(), endpoint);
575+
if (ec) [[unlikely]] {
576+
co_return ec;
577+
}
578+
ec = co_await coro_io::async_handshake(&socket,
579+
asio::ssl::stream_base::client);
580+
co_return ec;
581+
}
489582
#endif
490583
class period_timer : public asio::steady_timer {
491584
public:
@@ -689,34 +782,15 @@ async_sendfile(asio::ip::tcp::socket &socket, int fd, off_t offset,
689782
}
690783
#endif
691784

692-
struct socket_wrapper_t {
693-
socket_wrapper_t(asio::ip::tcp::socket &&soc,
694-
coro_io::ExecutorWrapper<> *executor)
695-
: socket_(std::make_unique<asio::ip::tcp::socket>(std::move(soc))),
696-
executor_(executor) {}
697-
698-
private:
699-
std::unique_ptr<asio::ip::tcp::socket> socket_;
700-
coro_io::ExecutorWrapper<> *executor_;
701-
#ifdef YLT_ENABLE_SSL
702-
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream_;
703-
#endif
704-
public:
705-
bool use_ssl() const noexcept {
706-
#ifdef YLT_ENABLE_SSL
707-
return ssl_stream_ != nullptr;
708-
#else
709-
return false;
710-
#endif
711-
}
712-
auto get_executor() const noexcept { return executor_; }
713-
std::unique_ptr<asio::ip::tcp::socket> &socket() noexcept { return socket_; }
714-
#ifdef YLT_ENABLE_SSL
715-
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>>
716-
&ssl_stream() noexcept {
717-
return ssl_stream_;
718-
}
719-
#endif
785+
enum protocal { tcp, tcp_with_ssl, rdma };
786+
struct endpoint {
787+
asio::ip::address address;
788+
asio::ip::port_type port;
789+
protocal proto;
720790
};
721791

792+
inline std::ostream &operator<<(std::ostream &stream, const endpoint &ep) {
793+
return stream << ep.address.to_string() << ":" << ep.port;
794+
}
795+
722796
} // namespace coro_io

0 commit comments

Comments
 (0)