Skip to content

Commit 967758d

Browse files
authored
[coro_rpc][feat and fix]Main support local ip (#965)
1 parent c9aa3a0 commit 967758d

File tree

8 files changed

+142
-25
lines changed

8 files changed

+142
-25
lines changed

include/ylt/coro_io/socket_wrapper.hpp

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,22 @@
2222
namespace coro_io {
2323
struct socket_wrapper_t {
2424
// construct by listen tcp
25-
socket_wrapper_t() {};
26-
socket_wrapper_t(coro_io::ExecutorWrapper<> *executor)
27-
: socket_(std::make_unique<asio::ip::tcp::socket>(
28-
executor->get_asio_executor())),
29-
executor_(executor) {
25+
socket_wrapper_t(){};
26+
socket_wrapper_t(coro_io::ExecutorWrapper<> *executor,
27+
const std::string &local_ip = "")
28+
: executor_(executor) {
29+
if (local_ip.empty()) {
30+
socket_ = std::make_unique<asio::ip::tcp::socket>(
31+
executor->get_asio_executor());
32+
}
33+
else {
34+
asio::error_code ec;
35+
socket_ = std::make_unique<asio::ip::tcp::socket>(
36+
executor->get_asio_executor(),
37+
asio::ip::tcp::endpoint(asio::ip::address::from_string(local_ip, ec),
38+
0));
39+
}
40+
3041
init_client(true);
3142
};
3243
socket_wrapper_t(asio::ip::tcp::socket &&soc,
@@ -167,8 +178,8 @@ struct socket_wrapper_t {
167178
ssl_stream_ = std::make_unique<asio::ssl::stream<asio::ip::tcp::socket &>>(
168179
*socket_, ssl_ctx);
169180
}
170-
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> &
171-
ssl_stream() noexcept {
181+
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>>
182+
&ssl_stream() noexcept {
172183
return ssl_stream_;
173184
}
174185
using tcp_socket_with_ssl_t = asio::ssl::stream<asio::ip::tcp::socket &>;

include/ylt/coro_rpc/impl/coro_rpc_client.hpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ class coro_rpc_client {
196196
std::chrono::milliseconds request_timeout_duration;
197197
std::string host;
198198
std::string port;
199+
std::string local_ip;
199200
std::variant<tcp_config
200201
#ifdef YLT_ENABLE_SSL
201202
,
@@ -214,6 +215,7 @@ class coro_rpc_client {
214215
host(),
215216
port(),
216217
socket_config(tcp_config{}) {}
218+
config(const std::string &loc_ip) : config() { local_ip = loc_ip; }
217219
config(config &&) = default;
218220
config(const config &) = default;
219221
config &operator=(const config &) = default;
@@ -240,16 +242,19 @@ class coro_rpc_client {
240242
*/
241243
coro_rpc_client(
242244
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor(),
243-
config conf = config{})
244-
: control_(
245-
std::make_shared<control_t>(executor->get_asio_executor(), false)),
245+
config conf = {})
246+
: control_(std::make_shared<control_t>(executor->get_asio_executor(),
247+
false, conf.local_ip)),
246248
timer_(std::make_unique<coro_io::period_timer>(
247249
executor->get_asio_executor())) {
248250
if (!init_config(config{})) [[unlikely]] {
249251
close();
250252
}
251253
}
252254

255+
coro_rpc_client(const std::string &local_ip)
256+
: coro_rpc_client(coro_io::get_global_executor(), config(local_ip)) {}
257+
253258
std::string_view get_host() const { return config_.host; }
254259

255260
std::string_view get_port() const { return config_.port; }
@@ -855,11 +860,12 @@ class coro_rpc_client {
855860
std::unordered_map<uint32_t, handler_t> response_handler_table_;
856861
resp_body resp_buffer_;
857862
std::atomic<uint32_t> recving_cnt_ = 0;
858-
control_t(asio::io_context::executor_type executor, bool is_timeout)
863+
control_t(asio::io_context::executor_type executor, bool is_timeout,
864+
const std::string &local_ip = "")
859865
: is_timeout_(is_timeout),
860866
has_closed_(false),
861867
executor_(executor),
862-
socket_wrapper_(&executor_) {}
868+
socket_wrapper_(&executor_, local_ip) {}
863869
};
864870

865871
static void close_socket_async(
@@ -958,8 +964,11 @@ class coro_rpc_client {
958964
header, std::string_view{buffer, buffer + sizeof(buffer)});
959965
assert(!ec);
960966
if (ret.first) {
961-
ELOG_ERROR << "read rpc head failed, error msg:" << ret.first.message()
962-
<< ". close the socket.value=" << ret.first.value();
967+
if (ret.first != asio::error::eof) {
968+
ELOG_ERROR << "read rpc head failed, error msg:"
969+
<< ret.first.message()
970+
<< ". close the socket.value=" << ret.first.value();
971+
}
963972
break;
964973
}
965974
auto iter = controller->response_handler_table_.find(header.seq_num);

include/ylt/standalone/cinatra/coro_http_server.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ class coro_http_server {
670670
std::string_view head_msg) {
671671
auto conn = accept_impl(std::move(soc), true);
672672
conn->add_head(head_msg);
673-
start_one(conn).via(conn->get_executor()).detach();
673+
start_one(conn, true).via(conn->get_executor()).detach();
674674
}
675675

676676
private:
@@ -759,6 +759,11 @@ class coro_http_server {
759759
co_await conn->start();
760760
}
761761

762+
async_simple::coro::Lazy<void> start_one(
763+
std::shared_ptr<coro_http_connection> conn, bool has_shake) noexcept {
764+
co_await conn->start(has_shake);
765+
}
766+
762767
void close_acceptor() {
763768
asio::dispatch(acceptor_.get_executor(), [this]() {
764769
asio::error_code ec;

src/coro_rpc/examples/base_examples/coro_rpc_lib/coro_rpc.cpp

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "coro_rpc.h"
22

3+
#include <asio/ip/host_name.hpp>
34
#include <ylt/coro_io/load_balancer.hpp>
45
#include <ylt/coro_rpc/coro_rpc_client.hpp>
56
#include <ylt/coro_rpc/coro_rpc_server.hpp>
@@ -83,17 +84,37 @@ void stop_rpc_server(void *server) {
8384
}
8485

8586
// rpc client
86-
void *create_client_pool(char *addr, int req_timeout_sec, bool enable_ib) {
87+
void *create_client_pool(char *addr, client_config conf) {
8788
std::vector<std::string_view> hosts{std::string_view(addr)};
8889
coro_io::client_pool<coro_rpc::coro_rpc_client>::pool_config pool_conf{};
8990
#ifdef YLT_ENABLE_IBV
90-
if (enable_ib) {
91+
if (conf.enable_ib) {
9192
coro_io::ibverbs_config ib_conf{};
9293
pool_conf.client_config.socket_config = ib_conf;
9394
}
9495
#endif
95-
pool_conf.client_config.request_timeout_duration =
96-
std::chrono::seconds{req_timeout_sec};
96+
if (conf.connect_timeout_sec != 0) {
97+
pool_conf.client_config.connect_timeout_duration =
98+
std::chrono::seconds{conf.connect_timeout_sec};
99+
}
100+
101+
if (conf.req_timeout_sec != 0) {
102+
pool_conf.client_config.request_timeout_duration =
103+
std::chrono::seconds{conf.req_timeout_sec};
104+
}
105+
106+
if (conf.local_ip == nullptr) {
107+
pool_conf.client_config.local_ip = "localhost";
108+
}
109+
else {
110+
pool_conf.client_config.local_ip = conf.local_ip;
111+
}
112+
113+
ELOG_INFO << "client config connect timeout seconds: "
114+
<< conf.connect_timeout_sec
115+
<< ", request timeout seconds: " << conf.req_timeout_sec
116+
<< ", local_ip: " << pool_conf.client_config.local_ip
117+
<< ", enable ibverbs: " << conf.enable_ib;
97118

98119
auto ld = coro_io::load_balancer<coro_rpc::coro_rpc_client>::create(
99120
hosts, {pool_conf});
@@ -158,3 +179,22 @@ void init_rpc_log(char *log_filename, int log_level, uint64_t max_file_size,
158179
}
159180

160181
void flush_rpc_log() { easylog::flush(); }
182+
183+
char *get_first_local_ip() {
184+
std::string local_ip = "localhost";
185+
using asio::ip::tcp;
186+
tcp::resolver resolver(coro_io::get_global_executor()->get_asio_executor());
187+
tcp::resolver::query query(asio::ip::host_name(), "");
188+
tcp::resolver::iterator iter = resolver.resolve(query);
189+
tcp::resolver::iterator end; // End marker.
190+
while (iter != end) {
191+
tcp::endpoint ep = *iter++;
192+
auto addr = ep.address();
193+
if (addr.is_v4()) {
194+
local_ip = addr.to_string();
195+
break;
196+
}
197+
}
198+
199+
return create_copy_cstr(local_ip);
200+
}

src/coro_rpc/examples/base_examples/coro_rpc_lib/coro_rpc.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ typedef struct {
1212
uint64_t len;
1313
} rpc_result;
1414

15+
typedef struct {
16+
int connect_timeout_sec;
17+
int req_timeout_sec;
18+
char *local_ip;
19+
bool enable_ib;
20+
} client_config;
21+
1522
// ip:port
1623
extern void load_service(void *ctx, uint64_t req_id);
1724
extern void *response_msg(void *ctx, char *msg, uint64_t size);
@@ -20,12 +27,10 @@ extern rpc_result wait_response_finish(void *p);
2027
extern void *start_rpc_server(char *addr, int parallel, bool enable_ib);
2128
extern void stop_rpc_server(void *server);
2229

23-
extern void *create_client_pool(char *addr, int req_timeout_sec,
24-
bool enable_ib);
30+
extern void *create_client_pool(char *addr, client_config conf);
2531
extern void free_client_pool(void *pool);
2632
extern rpc_result load(void *pool, uint64_t req_id, char *dest,
2733
uint64_t dest_len);
28-
2934
/*
3035
enum log_level {
3136
NONE = 0,
@@ -44,6 +49,8 @@ extern void init_rpc_log(char *log_filename, int log_level,
4449
bool async);
4550
extern void flush_rpc_log();
4651

52+
extern char *get_first_local_ip();
53+
4754
#ifdef __cplusplus
4855
}
4956
#endif

src/coro_rpc/examples/base_examples/go_example/test_rpc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import(
99
/*
1010
#cgo CXXFLAGS: -std=c++20
1111
#cgo CFLAGS: -I../coro_rpc_lib
12-
#cgo LDFLAGS: -L./ -lcoro_rpc -lm -lstdc++ -libverbs
12+
#cgo LDFLAGS: -L./ -lcoro_rpc -lm -lstdc++
1313
#include "coro_rpc.h"
1414
*/
1515
import "C"
@@ -54,7 +54,8 @@ func load_service(ctx unsafe.Pointer, req_id C.uint64_t) {
5454

5555
func test_client(host string, len int) {
5656
peer := C.CString(host)
57-
pool := C.create_client_pool(peer, C.int(30), C.bool(true))
57+
conf := C.client_config{connect_timeout_sec:15, req_timeout_sec:30, local_ip:C.get_first_local_ip(), enable_ib:C.bool(false)}
58+
pool := C.create_client_pool(peer, conf)
5859

5960
outbuf := make([]byte, len)
6061
for i := 0; i < 3; i++ {

src/coro_rpc/tests/test_coro_rpc_client.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <async_simple/coro/SyncAwait.h>
1818

1919
#include <asio/io_context.hpp>
20+
#include <asio/ip/host_name.hpp>
2021
#include <chrono>
2122
#include <cstddef>
2223
#include <memory>
@@ -170,6 +171,49 @@ TEST_CASE("testing client") {
170171
thd.join();
171172
}
172173

174+
std::string get_first_local_ip() {
175+
using asio::ip::tcp;
176+
tcp::resolver resolver(coro_io::get_global_executor()->get_asio_executor());
177+
tcp::resolver::query query(asio::ip::host_name(), "");
178+
tcp::resolver::iterator iter = resolver.resolve(query);
179+
tcp::resolver::iterator end; // End marker.
180+
while (iter != end) {
181+
tcp::endpoint ep = *iter++;
182+
auto addr = ep.address();
183+
if (addr.is_v4()) {
184+
return addr.to_string();
185+
}
186+
}
187+
188+
return "localhost";
189+
}
190+
191+
TEST_CASE("testing client with local ip") {
192+
coro_rpc_server server(1, 8901);
193+
server.register_handler<hello>();
194+
auto res = server.async_start();
195+
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
196+
197+
std::string local_ip = get_first_local_ip();
198+
ELOG_INFO << "local ip: " << local_ip;
199+
coro_rpc_client client(local_ip);
200+
auto ec = client.sync_connect("127.0.0.1", "8901");
201+
REQUIRE_MESSAGE(!ec, ec.message());
202+
203+
auto ret = client.sync_call<hello>();
204+
CHECK(ret.value() == "hello"s);
205+
206+
coro_rpc_server server1(1, 8902, local_ip);
207+
server1.register_handler<hello>();
208+
res = server1.async_start();
209+
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
210+
ec = client.sync_connect(local_ip, "8902");
211+
REQUIRE_MESSAGE(!ec, ec.message());
212+
213+
ret = client.sync_call<hello>();
214+
CHECK(ret.value() == "hello"s);
215+
}
216+
173217
TEST_CASE("testing client with inject server") {
174218
g_action = {};
175219
std::string port = std::to_string(coro_rpc_server_port);

src/coro_rpc/tests/test_coro_rpc_server.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ TEST_CASE("testing coro rpc ssl subserver") {
557557
CHECK_MESSAGE(!result.net_err, result.net_err.message());
558558
result = syncAwait(cli.async_get("/index.html"));
559559
CHECK_MESSAGE(result.status == (int)coro_http::status_type::ok,
560-
result.status);
560+
result.net_err.message());
561561
CHECK_MESSAGE(result.resp_body == http_body, result.resp_body);
562562
}
563563
#endif

0 commit comments

Comments
 (0)