Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

callback_awaitable 自适应executor模式 #27

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions include/ucoro/awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace std
#include <functional>
#include <memory>
#include <type_traits>
#include <thread>

#if defined(DEBUG) || defined(_DEBUG)
#if defined(ENABLE_DEBUG_CORO_LEAK)
Expand Down Expand Up @@ -525,28 +526,76 @@ namespace ucoro
template<typename T, typename CallbackFunction>
struct CallbackAwaiter : public CallbackAwaiterBase<T>
{
CallbackAwaiter(const CallbackAwaiter&) = delete;
CallbackAwaiter& operator = (const CallbackAwaiter&) = delete;
public:
explicit CallbackAwaiter(CallbackFunction&& callback_function)
: callback_function_(std::move(callback_function))
: callback_function_(std::forward<CallbackFunction>(callback_function))
{
}

CallbackAwaiter(CallbackAwaiter&&) = default;

constexpr bool await_ready() noexcept
{
return false;
}

auto await_suspend(std::coroutine_handle<> handle)
// 用户调用 handle( ret_value ) 就是在这里执行的.
void resume_coro(std::coroutine_handle<> handle, std::shared_ptr<std::atomic_flag> executor_detect_flag)
{
if (executor_detect_flag->test_and_set())
{
// 如果执行到这里,说明 executor_detect_flag 运行在 callback_function_ 返回之后,所以也就
// 是说运行在 executor 中。
handle.resume();
}
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<> handle)
{
auto executor_detect_flag = std::make_shared<std::atomic_flag>();

if constexpr (std::is_void_v<T>)
{
callback_function_([]() {});
callback_function_([this, handle, executor_detect_flag]() mutable
{
return resume_coro(handle, executor_detect_flag);
});
}
else
{
callback_function_([this, executor_detect_flag, handle](T t) mutable
{
this->result_ = std::move(t);
return resume_coro(handle, executor_detect_flag);
});
}

if (executor_detect_flag->test_and_set())
{
// 如果执行到这里,说明 executor_detect_flag 已经被执行,这里分 2 种情况:
//
// 第一种情况就是
// 在 executor 线程中执行了 executor_detect_flag executor 线程快于当前线程。
//
// executor 线程快于当前线程的情况下,executor_detect_flag 什么都不会做,仅仅只设置 flag。
// 如果 executor 线程慢于当前线程,则上面的 flag.test_and_set() 会返回 false 并
// 设置 flag,然后执行 return std::noop_coroutine(); 在此后的 executor_detect_flag 中
// 因为 flag.test_and_set() 为 true 将会 resume 协程。
//
// 第二种情况就是 executor_detect_flag 直接被 callback_function_ executor_detect_flag
// 也仅仅只设置 flag。
//
// 无论哪一种情况,我们都可以在这里直接返回 handle 让协程框架维护协程 resume。
return handle;
}
else
{
callback_function_([this](T t) mutable { this->result_ = std::move(t); });
// 如果执行到这里,说明 executor_detect_flag 肯定没被执行,说明是由 executor 驱动.
// executor 驱动即返回 noop_coroutine 即可.
return std::noop_coroutine();
}
return handle;
}

private:
Expand Down
2 changes: 1 addition & 1 deletion tests/test5/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ boost::asio::io_context main_ioc;

ucoro::awaitable<int> coro_compute_int(int value)
{
auto ret = co_await executor_awaitable<int>([value](auto handle) {
auto ret = co_await callback_awaitable<int>([value](auto handle) {
main_ioc.post([value, handle = std::move(handle)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(0));
std::cout << value << " value\n";
Expand Down
2 changes: 1 addition & 1 deletion tests/test_asio/test_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ boost::asio::awaitable<int> asio_coro_test()

ucoro::awaitable<int> coro_compute_int(int value)
{
auto ret = co_await executor_awaitable<int>([value](auto handle) {
auto ret = co_await callback_awaitable<int>([value](auto handle) {
main_ioc.post([value, handle = std::move(handle)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(0));
std::cout << value << " value\n";
Expand Down
2 changes: 1 addition & 1 deletion tests/test_executor/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ ucoro::awaitable<int> coro_compute_int(int value)
{
executor_service* executor = co_await ucoro::local_storage_t<executor_service*>();

auto ret = co_await executor_awaitable<int>([executor, value](auto handle)
auto ret = co_await callback_awaitable<int>([executor, value](auto handle)
{
executor->enqueue([value, handle = std::move(handle)]() mutable
{
Expand Down
8 changes: 3 additions & 5 deletions tests/testlibuv/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

ucoro::awaitable<void> async_sleep_with_uv_timer(int ms)
{
co_await executor_awaitable<void>([ms](auto continuation)
co_await callback_awaitable<void>([ms](auto continuation)
{
struct uv_timer_with_data : uv_timer_s
{
Expand All @@ -15,16 +15,14 @@ ucoro::awaitable<void> async_sleep_with_uv_timer(int ms)
: continuation_(c){}
};

uv_timer_with_data* timer_handle = new uv_timer_with_data { std::move(continuation) };
uv_timer_with_data* timer_handle = new uv_timer_with_data { std::forward<decltype(continuation)>(continuation) };

uv_timer_init(uv_default_loop(), timer_handle);
uv_timer_start(timer_handle, [](uv_timer_t* handle)
{
uv_timer_stop(handle);
decltype(continuation) continuation_ = std::move(reinterpret_cast<uv_timer_with_data*>(handle)->continuation_);
reinterpret_cast<uv_timer_with_data*>(handle)->continuation_();
delete handle;

continuation_();
}, ms, false);

});
Expand Down
2 changes: 1 addition & 1 deletion tests/testqt/testqt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

ucoro::awaitable<int> coro_compute_int(int value)
{
auto ret = co_await executor_awaitable<int>([value](auto handle) {
auto ret = co_await callback_awaitable<int>([value](auto handle) {
QTimer::singleShot(0, [value, handle = std::move(handle)]() mutable {
std::cout << value << " value\n";
handle(value * 100);
Expand Down