Skip to content

Commit 45d5061

Browse files
authored
update async-simple (#882)
* update async-simple
1 parent 17a2139 commit 45d5061

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+3658
-764
lines changed

include/ylt/coro_io/client_pool.hpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,8 @@ class client_pool : public std::enable_shared_from_this<
257257
this->weak_from_this(), clients,
258258
(std::max)(collect_time, std::chrono::milliseconds{50}),
259259
pool_config_.idle_queue_per_max_clear_count)
260-
.via(coro_io::get_global_executor())
261-
.start([](auto&&) {
262-
});
260+
.directlyStart([](auto&&) {
261+
},coro_io::get_global_executor());
263262
}
264263
}
265264
}

include/ylt/coro_io/coro_io.hpp

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -429,66 +429,6 @@ async_simple::coro::Lazy<std::pair<
429429
});
430430
}
431431

432-
template <typename T>
433-
inline decltype(auto) select_impl(T &pair) {
434-
using Func = std::tuple_element_t<1, std::remove_cvref_t<T>>;
435-
using ValueType =
436-
typename std::tuple_element_t<0, std::remove_cvref_t<T>>::ValueType;
437-
using return_type = std::invoke_result_t<Func, async_simple::Try<ValueType>>;
438-
439-
auto &callback = std::get<1>(pair);
440-
if constexpr (coro_io::is_lazy_v<return_type>) {
441-
auto executor = std::get<0>(pair).getExecutor();
442-
return std::make_pair(
443-
std::move(std::get<0>(pair)),
444-
[executor, callback = std::move(callback)](auto &&val) {
445-
if (executor) {
446-
callback(std::move(val)).via(executor).start([](auto &&) {
447-
});
448-
}
449-
else {
450-
callback(std::move(val)).start([](auto &&) {
451-
});
452-
}
453-
});
454-
}
455-
else {
456-
return pair;
457-
}
458-
}
459-
460-
template <typename... T>
461-
inline auto select(T &&...args) {
462-
return async_simple::coro::collectAny(select_impl(args)...);
463-
}
464-
465-
template <typename T, typename Callback>
466-
inline auto select(std::vector<T> vec, Callback callback) {
467-
if constexpr (coro_io::is_lazy_v<Callback>) {
468-
std::vector<async_simple::Executor *> executors;
469-
for (auto &lazy : vec) {
470-
executors.push_back(lazy.getExecutor());
471-
}
472-
473-
return async_simple::coro::collectAny(
474-
std::move(vec),
475-
[executors, callback = std::move(callback)](size_t index, auto &&val) {
476-
auto executor = executors[index];
477-
if (executor) {
478-
callback(index, std::move(val)).via(executor).start([](auto &&) {
479-
});
480-
}
481-
else {
482-
callback(index, std::move(val)).start([](auto &&) {
483-
});
484-
}
485-
});
486-
}
487-
else {
488-
return async_simple::coro::collectAny(std::move(vec), std::move(callback));
489-
}
490-
}
491-
492432
template <typename Socket, typename AsioBuffer>
493433
std::pair<asio::error_code, size_t> read_some(Socket &sock,
494434
AsioBuffer &&buffer) {

include/ylt/coro_io/detail/client_queue.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class client_queue {
8080
if (size_[index]) {
8181
std::size_t result =
8282
queue_[index].try_dequeue_bulk(fake_iter{}, max_clear_cnt);
83+
8384
size_[index] -= result;
8485
return result;
8586
}

include/ylt/coro_io/io_context_pool.hpp

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@
1717
#include <async_simple/Executor.h>
1818
#include <async_simple/coro/Lazy.h>
1919

20-
#include <asio/dispatch.hpp>
2120
#include <asio/io_context.hpp>
21+
#include <asio/post.hpp>
2222
#include <asio/steady_timer.hpp>
2323
#include <atomic>
24+
#include <cstdint>
2425
#include <future>
2526
#include <iostream>
2627
#include <memory>
2728
#include <mutex>
2829
#include <thread>
2930
#include <type_traits>
3031
#include <vector>
32+
33+
#include "asio/dispatch.hpp"
3134
#ifdef __linux__
3235
#include <pthread.h>
3336
#include <sched.h>
@@ -51,25 +54,25 @@ class ExecutorWrapper : public async_simple::Executor {
5154
using context_t = std::remove_cvref_t<decltype(executor_.context())>;
5255

5356
virtual bool schedule(Func func) override {
54-
if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) {
55-
executor_.dispatch(std::move(func));
57+
asio::post(executor_, std::move(func));
58+
return true;
59+
}
60+
61+
virtual bool schedule(Func func, uint64_t hint) override {
62+
if (hint >=
63+
static_cast<uint64_t>(async_simple::Executor::Priority::YIELD)) {
64+
asio::post(executor_, std::move(func));
5665
}
5766
else {
5867
asio::dispatch(executor_, std::move(func));
5968
}
60-
6169
return true;
6270
}
6371

6472
virtual bool checkin(Func func, void *ctx) override {
6573
using context_t = std::remove_cvref_t<decltype(executor_.context())>;
6674
auto &executor = *(context_t *)ctx;
67-
if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) {
68-
executor.post(std::move(func));
69-
}
70-
else {
71-
asio::dispatch(executor, std::move(func));
72-
}
75+
asio::post(executor, std::move(func));
7376
return true;
7477
}
7578
virtual void *checkout() override { return &executor_.context(); }
@@ -99,6 +102,14 @@ class ExecutorWrapper : public async_simple::Executor {
99102
fn();
100103
});
101104
}
105+
void schedule(Func func, Duration dur, uint64_t hint,
106+
async_simple::Slot *slot = nullptr) override {
107+
auto timer = std::make_unique<asio::steady_timer>(executor_, dur);
108+
auto tm = timer.get();
109+
tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) {
110+
fn();
111+
});
112+
}
102113
};
103114

104115
template <typename ExecutorImpl = asio::io_context>

include/ylt/coro_rpc/impl/coro_connection.hpp

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,17 @@ context_info_t<rpc_protocol> *&set_context();
122122

123123
class coro_connection : public std::enable_shared_from_this<coro_connection> {
124124
public:
125+
template <typename rpc_protocol_t>
126+
struct connection_lazy_ctx : public async_simple::coro::LazyLocalBase {
127+
inline static char tag;
128+
// init LazyLocalBase by unique address
129+
connection_lazy_ctx(std::shared_ptr<context_info_t<rpc_protocol_t>> info)
130+
: LazyLocalBase(&tag), info_(std::move(info)) {}
131+
static bool classof(const LazyLocalBase *base) {
132+
return base->getTypeTag() == &tag;
133+
}
134+
std::shared_ptr<context_info_t<rpc_protocol_t>> info_;
135+
};
125136
/*!
126137
*
127138
* @param io_context
@@ -276,26 +287,30 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
276287
auto coro_handler = router.get_coro_handler(key);
277288
set_rpc_return_by_callback();
278289
router.route_coro(coro_handler, payload, serialize_proto.value(), key)
279-
.via(executor_)
280-
.setLazyLocal((void *)context_info.get())
281-
.start([context_info](auto &&result) mutable {
282-
std::pair<coro_rpc::err_code, std::string> &ret = result.value();
283-
if (ret.first)
284-
AS_UNLIKELY {
285-
ELOGI << "rpc error in function:"
286-
<< context_info->get_rpc_function_name()
287-
<< ". error code:" << ret.first.ec
288-
<< ". message : " << ret.second;
289-
}
290-
auto executor = context_info->conn_->get_executor();
291-
executor->schedule([context_info = std::move(context_info),
292-
ret = std::move(ret)]() mutable {
293-
context_info->conn_->template direct_response_msg<rpc_protocol>(
294-
ret.first, ret.second, context_info->req_head_,
295-
std::move(context_info->resp_attachment_),
296-
std::move(context_info->complete_handler_));
297-
});
298-
});
290+
.template setLazyLocal<connection_lazy_ctx<rpc_protocol>>(
291+
context_info)
292+
.directlyStart(
293+
[context_info](auto &&result) mutable {
294+
std::pair<coro_rpc::err_code, std::string> &ret =
295+
result.value();
296+
if (ret.first)
297+
AS_UNLIKELY {
298+
ELOGI << "rpc error in function:"
299+
<< context_info->get_rpc_function_name()
300+
<< ". error code:" << ret.first.ec
301+
<< ". message : " << ret.second;
302+
}
303+
auto executor = context_info->conn_->get_executor();
304+
executor->schedule([context_info = std::move(context_info),
305+
ret = std::move(ret)]() mutable {
306+
context_info->conn_
307+
->template direct_response_msg<rpc_protocol>(
308+
ret.first, ret.second, context_info->req_head_,
309+
std::move(context_info->resp_attachment_),
310+
std::move(context_info->complete_handler_));
311+
});
312+
},
313+
executor_);
299314
}
300315
else {
301316
coro_rpc::detail::set_context<rpc_protocol>() = context_info.get();

include/ylt/coro_rpc/impl/coro_rpc_client.hpp

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ class coro_rpc_client {
345345

346346
void close() {
347347
// ELOG_INFO << "client_id " << config_.client_id << " close";
348-
close_socket(control_);
348+
close_socket_async(control_);
349349
}
350350

351351
bool set_req_attachment(std::string_view attachment) {
@@ -375,18 +375,19 @@ class coro_rpc_client {
375375
bool value = false;
376376
};
377377

378-
void reset() {
379-
close_socket(control_);
378+
async_simple::coro::Lazy<void> reset() {
379+
co_await close_socket(control_);
380380
control_->socket_ =
381381
asio::ip::tcp::socket(control_->executor_.get_asio_executor());
382382
control_->is_timeout_ = false;
383383
control_->has_closed_ = false;
384+
co_return;
384385
}
385386
static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; }
386387

387388
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl() {
388389
if (should_reset_) {
389-
reset();
390+
co_await reset();
390391
}
391392
else {
392393
should_reset_ = true;
@@ -413,17 +414,15 @@ class coro_rpc_client {
413414
std::error_code err_code;
414415
timer_->cancel(err_code);
415416

416-
if (ec) {
417-
if (control_->is_timeout_) {
418-
co_return errc::timed_out;
419-
}
420-
co_return errc::not_connected;
421-
}
422-
423417
if (control_->is_timeout_) {
424418
ELOG_WARN << "client_id " << config_.client_id << " connect timeout";
425419
co_return errc::timed_out;
426420
}
421+
else if (ec) {
422+
ELOG_WARN << "client_id " << config_.client_id
423+
<< " failed:" << ec.message();
424+
co_return errc::not_connected;
425+
}
427426
if (config_.enable_tcp_no_delay == true) {
428427
control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
429428
}
@@ -482,7 +481,7 @@ class coro_rpc_client {
482481
}
483482
if (auto self = socket_watcher.lock()) {
484483
self->is_timeout_ = is_timeout;
485-
close_socket(self);
484+
close_socket_async(self);
486485
co_return true;
487486
}
488487
co_return false;
@@ -710,18 +709,40 @@ class coro_rpc_client {
710709
executor_(executor) {}
711710
};
712711

713-
static void close_socket(
712+
static void close_socket_async(
714713
std::shared_ptr<coro_rpc_client::control_t> control) {
715714
bool expected = false;
716715
if (!control->has_closed_.compare_exchange_strong(expected, true)) {
717716
return;
718717
}
719-
control->executor_.schedule([control]() {
720-
asio::error_code ignored_ec;
721-
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both,
722-
ignored_ec);
723-
control->socket_.close(ignored_ec);
718+
asio::dispatch(control->executor_.get_asio_executor(), [control]() {
719+
assert(&control->executor_.get_asio_executor().context() ==
720+
&control->socket_.get_executor().context());
721+
control->has_closed_ = true;
722+
asio::error_code ec;
723+
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
724+
control->socket_.close(ec);
724725
});
726+
return;
727+
}
728+
729+
static async_simple::coro::Lazy<void> close_socket(
730+
std::shared_ptr<coro_rpc_client::control_t> control) {
731+
bool expected = false;
732+
if (!control->has_closed_.compare_exchange_strong(expected, true)) {
733+
co_return;
734+
}
735+
co_await coro_io::post(
736+
[control = control.get()]() {
737+
assert(&control->executor_.get_asio_executor().context() ==
738+
&control->socket_.get_executor().context());
739+
control->has_closed_ = true;
740+
asio::error_code ec;
741+
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
742+
control->socket_.close(ec);
743+
},
744+
&control->executor_);
745+
co_return;
725746
}
726747

727748
#ifdef UNIT_TEST_INJECT
@@ -861,7 +882,7 @@ class coro_rpc_client {
861882
break;
862883
}
863884
} while (true);
864-
close_socket(controller);
885+
close_socket_async(controller);
865886
send_err_response(controller.get(), ret.first);
866887
co_return;
867888
}
@@ -891,7 +912,7 @@ class coro_rpc_client {
891912
handle_response_buffer<T>(ret.buffer_.read_buf_, ret.errc_, has_error);
892913
if (has_error) {
893914
if (auto w = watcher.lock(); w) {
894-
close_socket(std::move(w));
915+
close_socket_async(std::move(w));
895916
}
896917
}
897918
if (result) {

include/ylt/coro_rpc/impl/protocol/coro_rpc_protocol.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,10 @@ using context = coro_rpc::context_base<return_msg_type,
213213

214214
template <typename rpc_protocol = coro_rpc::protocol::coro_rpc_protocol>
215215
async_simple::coro::Lazy<context_info_t<rpc_protocol>*> get_context_in_coro() {
216-
auto* ctx = co_await async_simple::coro::LazyLocals{};
216+
auto* ctx = co_await async_simple::coro::CurrentLazyLocals<
217+
coro_connection::connection_lazy_ctx<rpc_protocol>>{};
217218
assert(ctx != nullptr);
218-
co_return (context_info_t<rpc_protocol>*) ctx;
219+
co_return ctx->info_.get();
219220
}
220221

221222
namespace detail {

include/ylt/thirdparty/async_simple/Collect.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
#ifndef ASYNC_SIMPLE_COLLECT_H
1717
#define ASYNC_SIMPLE_COLLECT_H
1818

19-
#include <exception>
19+
#ifndef ASYNC_SIMPLE_USE_MODULES
2020
#include <iterator>
2121
#include <vector>
22-
#include "async_simple/Common.h"
2322
#include "async_simple/Future.h"
2423
#include "async_simple/Try.h"
2524

26-
#include <iostream>
25+
#endif // ASYNC_SIMPLE_USE_MODULES
2726

2827
namespace async_simple {
2928

0 commit comments

Comments
 (0)