Skip to content

Commit 49d67e7

Browse files
authored
[cross language]Add py and go example for http and rpc (#949)
1 parent a9c55e6 commit 49d67e7

File tree

9 files changed

+443
-0
lines changed

9 files changed

+443
-0
lines changed

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,28 @@ options:
543543
./benchmark_client # [threads = hardware counts] [client_pre_thread = 20] [pipeline_size = 1] [host = 127.0.0.1] [port = 9000] [test_data_path = ./test_data/echo_test] [test_seconds = 30] [warm_up_seconds = 5]
544544
```
545545

546+
## cross language support
547+
### how to use coro_http with python
548+
See `yalantinglibs/src/coro_http/examples/py_example`
549+
550+
#### build and run py_example
551+
- Set option `ENABLE_pybind11`(in `yalantinglibs/src/coro_http/examples/CMakeLists.txt`) `ON` and then build `py_example`.
552+
553+
- go to `yalantinglibs/src/coro_http/examples/py_example`
554+
555+
- python3 test.py
556+
557+
### how to use coro_rpc with golang
558+
See `yalantinglibs/src/coro_rpc/examples/basic_example/go_example`
559+
560+
#### build and run go_example
561+
- Set option `ENABLE_go`(in `yalantinglibs/src/coro_rpc/examples/base_examples/CMakeLists.txt`) `ON` and then build `coro_rpc`.
562+
563+
- go to `yalantinglibs/src/coro_rpc/examples/base_examples/go_example`
564+
565+
- go build test_rpc.go
566+
567+
- ./test_rpc
546568

547569
## How to generate document
548570

src/coro_http/examples/CMakeLists.txt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,29 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_NAME MATCHES "Windows"
3535
target_link_libraries(coro_http_example wsock32 ws2_32)
3636
target_link_libraries(coro_http_load_balancer wsock32 ws2_32)
3737
target_link_libraries(coro_chat_room wsock32 ws2_32)
38+
endif()
39+
40+
option(ENABLE_pybind11 "Enable pybind11 " OFF)
41+
if(ENABLE_pybind11)
42+
cmake_minimum_required(VERSION 3.14)
43+
project(MyPybind11Project)
44+
45+
include(FetchContent)
46+
47+
FetchContent_Declare(
48+
pybind11
49+
GIT_REPOSITORY https://github.com/pybind/pybind11.git
50+
GIT_TAG v2.13.6
51+
)
52+
53+
FetchContent_MakeAvailable(pybind11)
54+
55+
include_directories(${CMAKE_CURRENT_BINARY_DIR}/_deps/pybind11-src/include)
56+
57+
pybind11_add_module(py_example py_example/py_example.cpp)
58+
set_target_properties(py_example PROPERTIES
59+
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/py_example
60+
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/py_example
61+
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/py_example
62+
)
3863
endif()
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include <pybind11/pybind11.h>
2+
3+
#include <string>
4+
#include <ylt/coro_http/coro_http_client.hpp>
5+
#include <ylt/coro_io/client_pool.hpp>
6+
7+
namespace py = pybind11;
8+
9+
class caller {
10+
public:
11+
caller(py::function callback) : callback_(std::move(callback)) {}
12+
void async_get(std::string url) {
13+
static auto pool =
14+
coro_io::client_pool<coro_http::coro_http_client>::create(url);
15+
pool->send_request([this, url](coro_http::coro_http_client &client)
16+
-> async_simple::coro::Lazy<void> {
17+
auto result = co_await client.async_get(url);
18+
19+
// PyObject *py_str = PyUnicode_FromStringAndSize(
20+
// result.resp_body.data(), result.resp_body.size()); // zero copy
21+
// py::gil_scoped_acquire acquire;
22+
// callback_(py::reinterpret_steal<py::object>(py_str));
23+
24+
py::gil_scoped_acquire acquire;
25+
callback_(result.resp_body);
26+
})
27+
.start([](auto &&) {
28+
});
29+
}
30+
31+
private:
32+
py::function callback_;
33+
};
34+
35+
PYBIND11_MODULE(py_example, m) {
36+
m.def("hello", [] {
37+
return std::string("hello");
38+
});
39+
40+
py::class_<caller>(m, "caller")
41+
.def(py::init<py::function>())
42+
.def("async_get", &caller::async_get,
43+
py::call_guard<py::gil_scoped_release>());
44+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import asyncio
2+
import py_example
3+
4+
async def async_get(url):
5+
loop = asyncio.get_running_loop()
6+
future = loop.create_future()
7+
def cpp_callback(message):
8+
def set_result():
9+
future.set_result(message)
10+
11+
loop.call_soon_threadsafe(set_result)
12+
13+
caller = py_example.caller(cpp_callback)
14+
caller.async_get(url)
15+
result = await future
16+
print(result)
17+
18+
async def main():
19+
await async_get("http://taobao.com")
20+
21+
if __name__ == "__main__":
22+
asyncio.run(main())

src/coro_rpc/examples/base_examples/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ else()
3636
endif()
3737
endif()
3838

39+
option(ENABLE_go "Enable go " OFF)
40+
if(ENABLE_go)
41+
add_library(coro_rpc coro_rpc_lib/coro_rpc.cpp)
42+
set_target_properties(coro_rpc PROPERTIES
43+
ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/go_example"
44+
)
45+
endif()
46+
3947
add_executable(coro_rpc_example_load_balancer load_balancer.cpp)
4048
add_executable(coro_rpc_example_client_pool client_pool.cpp)
4149
add_executable(coro_rpc_example_client_pools client_pools.cpp)
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#include "coro_rpc.h"
2+
3+
#include <ylt/coro_io/load_balancer.hpp>
4+
#include <ylt/coro_rpc/coro_rpc_client.hpp>
5+
#include <ylt/coro_rpc/coro_rpc_server.hpp>
6+
7+
inline char *create_copy_cstr(std::string msg) {
8+
char *buf = (char *)malloc(msg.size() + 1);
9+
memcpy(buf, msg.data(), msg.size());
10+
buf[msg.size()] = '\0';
11+
return buf;
12+
}
13+
14+
inline void set_rpc_result(rpc_result &ret, coro_rpc::rpc_error ec) {
15+
ret.ec = ec.val();
16+
ret.err_msg = create_copy_cstr(ec.msg);
17+
}
18+
19+
// rpc server
20+
inline void ylt_load_service(coro_rpc::context<void> context, uint64_t req_id) {
21+
auto ctx_ptr = std::make_unique<coro_rpc::context<void>>(std::move(context));
22+
load_service(ctx_ptr.release(), req_id);
23+
}
24+
25+
void response_error(void *ctx, uint16_t err_code, const char *err_msg) {
26+
std::unique_ptr<coro_rpc::context<void>> context(
27+
(coro_rpc::context<void> *)ctx);
28+
29+
context->response_error(coro_rpc::err_code(err_code), err_msg);
30+
}
31+
32+
void *response_msg(void *ctx, char *msg, uint64_t size) {
33+
std::unique_ptr<coro_rpc::context<void>> context(
34+
(coro_rpc::context<void> *)ctx);
35+
// wait for response finish;
36+
auto promise = std::make_unique<std::promise<std::error_code>>();
37+
context->get_context_info()->set_complete_handler(
38+
[p = promise.get()](const std::error_code &ec, std::size_t) {
39+
p->set_value(ec);
40+
});
41+
42+
context->get_context_info()->set_response_attachment(
43+
std::string_view(msg, size));
44+
context->response_msg();
45+
return promise.release();
46+
}
47+
48+
rpc_result wait_response_finish(void *p) {
49+
auto promis = (std::promise<std::error_code> *)p;
50+
auto ec = promis->get_future().get();
51+
delete promis;
52+
if (!ec) {
53+
return {};
54+
}
55+
56+
char *buf = create_copy_cstr(ec.message());
57+
return rpc_result{0, buf, 0};
58+
}
59+
60+
void *start_rpc_server(char *addr, int parallel) {
61+
auto server = std::make_unique<coro_rpc::coro_rpc_server>(
62+
parallel, addr, std::chrono::seconds(600));
63+
server->register_handler<ylt_load_service>();
64+
auto res = server->async_start();
65+
if (res.hasResult()) {
66+
ELOG_ERROR << "start server failed";
67+
return nullptr;
68+
}
69+
70+
return server.release();
71+
}
72+
73+
void stop_rpc_server(void *server) {
74+
if (server != nullptr) {
75+
auto s = (coro_rpc::coro_rpc_server *)server;
76+
s->stop();
77+
delete s;
78+
}
79+
}
80+
81+
// rpc client
82+
void *create_client_pool(char *addr, int req_timeout_sec) {
83+
std::vector<std::string_view> hosts{std::string_view(addr)};
84+
auto ld = coro_io::load_balancer<coro_rpc::coro_rpc_client>::create(
85+
hosts, {.pool_config{.client_config = {
86+
.request_timeout_duration =
87+
std::chrono::seconds{req_timeout_sec}}}});
88+
auto ld_ptr = std::make_unique<decltype(ld)>(std::move(ld));
89+
return ld_ptr.release();
90+
}
91+
92+
void free_client_pool(void *pool) {
93+
auto ptr = (coro_io::load_balancer<coro_rpc::coro_rpc_client> *)pool;
94+
delete ptr;
95+
}
96+
97+
rpc_result load(void *pool, uint64_t req_id, char *dest, uint64_t dest_len) {
98+
using namespace async_simple::coro;
99+
rpc_result ret{};
100+
if (dest == nullptr || dest_len == 0) {
101+
set_rpc_result(ret,
102+
coro_rpc::rpc_error(coro_rpc::errc::invalid_rpc_arguments));
103+
return ret;
104+
}
105+
106+
auto ld = (coro_io::load_balancer<coro_rpc::coro_rpc_client> *)pool;
107+
auto lazy = [&]() -> Lazy<void> {
108+
auto result =
109+
co_await ld->send_request([&](coro_rpc::coro_rpc_client &client,
110+
std::string_view hostname) -> Lazy<void> {
111+
client.set_resp_attachment_buf(std::span<char>(dest, dest_len));
112+
auto result = co_await client.call<ylt_load_service>(req_id);
113+
if (!result) {
114+
set_rpc_result(ret, result.error());
115+
co_return;
116+
}
117+
118+
if (!client.is_resp_attachment_in_external_buf()) {
119+
set_rpc_result(
120+
ret, coro_rpc::rpc_error(coro_rpc::errc::message_too_large));
121+
co_return;
122+
}
123+
ret.len = client.get_resp_attachment().size();
124+
});
125+
};
126+
127+
syncAwait(lazy());
128+
129+
return ret;
130+
}
131+
132+
// log
133+
void init_rpc_log(char *log_filename, int log_level, uint64_t max_file_size,
134+
uint64_t max_file_num, bool async) {
135+
if (log_filename == nullptr) {
136+
easylog::set_min_severity((easylog::Severity)log_level);
137+
ELOG_INFO << "init log, log_level: " << log_level;
138+
return;
139+
}
140+
141+
std::string filename(log_filename);
142+
easylog::init_log((easylog::Severity)log_level, filename, async, true,
143+
max_file_size, max_file_num, false);
144+
ELOG_INFO << "init log " << log_level << ", " << filename << ","
145+
<< max_file_size << "," << max_file_num;
146+
}
147+
148+
void flush_rpc_log() { easylog::flush(); }
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
#include <stdbool.h>
3+
#include <stdint.h>
4+
#include <stdlib.h>
5+
#ifdef __cplusplus
6+
extern "C" {
7+
#endif
8+
9+
typedef struct {
10+
int ec;
11+
char *err_msg;
12+
uint64_t len;
13+
} rpc_result;
14+
15+
// ip:port
16+
extern void load_service(void *ctx, uint64_t req_id);
17+
extern void *response_msg(void *ctx, char *msg, uint64_t size);
18+
extern void response_error(void *ctx, uint16_t err_code, const char *err_msg);
19+
extern rpc_result wait_response_finish(void *p);
20+
extern void *start_rpc_server(char *addr, int parallel);
21+
extern void stop_rpc_server(void *server);
22+
23+
extern void *create_client_pool(char *addr, int req_timeout_sec);
24+
extern void free_client_pool(void *pool);
25+
extern rpc_result load(void *pool, uint64_t req_id, char *dest,
26+
uint64_t dest_len);
27+
28+
/*
29+
enum log_level {
30+
NONE = 0,
31+
TRACE,
32+
DEBUG,
33+
INFO,
34+
WARN,
35+
WARNING = WARN,
36+
ERROR,
37+
CRITICAL,
38+
FATAL = CRITICAL,
39+
};
40+
*/
41+
extern void init_rpc_log(char *log_filename, int log_level,
42+
uint64_t max_file_size, uint64_t max_file_num,
43+
bool async);
44+
extern void flush_rpc_log();
45+
46+
#ifdef __cplusplus
47+
}
48+
#endif

0 commit comments

Comments
 (0)