-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
callback_awaitable 自适应executor模式 #27
Conversation
如何自动判定是不是在executor中执行的handle?请说明原理,这是最关键的部分,上述并没有提到。 |
681d789
to
7eb519a
Compare
callback_awaitable 自适应executor模式 原先为了避免协程循环导致的爆栈, callback_awaitable 实现成了两种。 非 executor 环境下使用的 callback_awaitable 使用了新的 await_suspend 签名 但是这也导致, callback_awaitable无法在 executor 环境下使用。 现在更新一下 callback_awaitable, 它可以自动判断出来 callback_awaitable 传给你 下面简述原理: callback_awaitable 调用 用户的回调函数的时候,会传入一个 handle 实现原理就是检测 handle 被调用的时候,是否是在 callback_awaitable 回调用户的上下文里。 callback_awaitable::await_suspend -> user_lambda -> handle 那么,在 handle 的处理代码里,就标记一下,而不调用 coro_handle 的 resume 于是,等 user_lambda返回的时候,callback_awaitable::await_suspend 的代码通过检查 如果 handle 的处理代码发现自己的调用栈不是 callback_awaitable::await_suspend 过 检查的方式如下:
为啥 1. 要单独提出来呢? 因为 方法 2. 里有个隐含的条件,就是 handle 的处理代码, |
939b422
to
e18a6e6
Compare
e18a6e6
to
a39dadb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
主要几个问题
一,关于其中 thread
的判断,似乎是没有必要的,关于这一点,我在下面有详细的解释。
二,就是关于命名和注释应简洁,不应带过多的冗余信息导致阅读上的障碍。
三,ExecutorAwaiter
还有没有继续保留的必要?这需要讨论,从新修改的 CallbackAwaiter
来看,功能上是包含了 ExecutorAwaiter
,但根据 c++
的零开销原则,CallbackAwaiter
引入了 std::shared_ptr
而 ExecutorAwaiter
没有这个开销,但这并不是什么大的问题,我最终意见还是删除 ExecutorAwaiter
参考实现:
void call_detect(std::coroutine_handle<> handle, std::shared_ptr<std::atomic_flag> call_flag)
{
if (call_flag->test_and_set())
{
// 如果执行到这里,说明 call_detect 运行在 callback_function_ 返回之后,所以也就
// 是说运行在 executor 中。
handle.resume();
}
}
std::coroutine_handle<> await_suspend(std::coroutine_handle<> handle)
{
auto call_flag = std::make_shared<std::atomic_flag>();
if constexpr (std::is_void_v<T>)
{
callback_function_([this, handle, call_flag]() mutable
{
return call_detect(handle, call_flag);
});
}
else
{
callback_function_([this, call_flag, handle](T t) mutable
{
this->result_ = std::move(t);
return call_detect(handle, call_flag);
});
}
if (call_flag->test_and_set())
{
// 如果执行到这里,说明 call_detect 已经被执行,这里分 2 种情况:
//
// 第一种情况就是
// 在 executor 线程中执行了 call_detect,是由于 executor 线程快于当前线程。
//
// executor 线程快于当前线程的情况下,call_detect 什么都不会做,仅仅只设置 flag。
// 如果 executor 线程慢于当前线程,则上面的 flag.test_and_set() 会返回 false 并
// 设置 flag,然后执行 return std::noop_coroutine(); 在此后的 call_detect 中
// 因为 flag.test_and_set() 为 true 将会 resume 协程。
//
// 第二种情况就是 call_detect 直接被 callback_function_ 调用的,call_detect
// 也仅仅只设置 flag。
//
// 无论哪一种情况,我们都可以在这里直接返回 handle 让协程框架维护协程 resume。
return handle;
}
else
{
// 如果执行到这里,说明 call_detect 肯定没被执行,说明是由 executor 驱动.
// executor 驱动即返回 noop_coroutine 即可.
return std::noop_coroutine();
}
}
在上面的逻辑就是,无论 executor
线程快于当前线程,还是 executor
线程慢于当前线程,都将进入正确的逻辑当中。
通过 test_and_set
的原子性可以达到这一目标,当任何一线程抢先运行(即当前 call_flag
为 false
),则都会由抢先者 resume
协程。
因为 await_suspend
中的 if (call_flag->test_and_set())
是在 callback_function_
执行之后调用的,即使
call_detect
线程跑的更快(已经调用了 resume),那么在 await_suspend
中则直接返回 std::noop_coroutine
, 而不是重复返回 return handle
导致框架再次 resume
。
include/ucoro/awaitable.hpp
Outdated
constexpr bool await_ready() noexcept | ||
{ | ||
return false; | ||
} | ||
|
||
auto await_suspend(std::coroutine_handle<> handle) | ||
// 用户调用 handle( ret_value ) 就是在这里执行的. | ||
void resume_by_user(std::coroutine_handle<> handle, std::thread::id thread_id, std::shared_ptr<bool> handler_resume_once_flag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
函数名 resume_by_user
不太具有指导性,鉴于它所实现的功能是用于检查是否为用户直接调用,建议命名为 dircet_call_detect
或更简短的 call_detect
,不必太过冗余。
handler_resume_once_flag
根据它的用途及含义,建议命名为 call_flag
会更简洁。
thread_id
似乎没有存在的必要,因为如果是被 executor
调用的话,无论是多线程还是单线程,handle 被调用的时机一定是在 await_suspend
返回之后,也或者说:
在有 executor
的情况下,handle
当在被调用 executor
调用的,那么也就是说 callback_function_
是在被调用完成之后 handle
再被调用的。
而没有 executor
的情况下,handle
则是在 callback_function_
中被调用的,此时 callback_function_
并未完成。
由此可见,我们似乎并不需要关心它是不是在不同 thread
中调用,因为上面2个条件包括了多线程中调用 handle
的情况。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thread_id 是非常有必要的。因为多线程executor确实有可能,更快的执行 resume_by_user
我在 callback_function_ 后面插入一个小 sleep 就能很快模拟出来。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
使用 handle_return_value 这个名字如何?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@microcai 请刷新,我已经在上面给出了一段参考实现,并且我也通过添加 sleep 测试了模拟各种线程快慢的情况,包括你说的这个情况。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我最初就是用的原子。结果发现有问题。才改成判断线程 id.
这里的逻辑是,通过 谁 更快的插旗 来判断 executor 的存在。
在单线程环境下,只要 executor 存在,那必然是 await_suspend 里的代码更快的插旗。
但是,在多线程的环境下,await_suspend 里的代码有可能来不及先插旗,被 resume_by_user 抢先。
于是 await_suspend 里的代码就判断 executor 不存在了。
于是协程被原地 resume. 但是 executor 存在的情况下我们知道这种原地 resume 是错误的。必须等 executor 调度去执行 resume. 所以说线程id的判断是非常有必要的。哪怕用了原子flag操作也是必须的。
include/ucoro/awaitable.hpp
Outdated
// 说明被多线程 executor 投递执行了 | ||
// 既然是投递执行的,显然 await_suspend 得返回 noop_coroutine | ||
// 因此这里就需要调用 resume 来恢复协程 | ||
if (thread_id != std::this_thread::get_id()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我们似乎并不需要关心它是不是在不同 thread
中调用,上面已经表达过原因了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里在多线程下有竞争的。去掉线程id 判断会导致下面的逻辑会有竞争从而判断错误。而这里判断了 线程 id 那么下面的代码就肯定是不会有竞争的。
include/ucoro/awaitable.hpp
Outdated
{ | ||
auto thread_id = std::this_thread::get_id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同上
include/ucoro/awaitable.hpp
Outdated
{ | ||
auto thread_id = std::this_thread::get_id(); | ||
|
||
auto handler_resume_once_flag = std::make_shared<bool>(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
建议命名为 call_flag 会更简洁
include/ucoro/awaitable.hpp
Outdated
|
||
// 如果 resume_by_user 在用户代码直接执行了 | ||
// 则 这里要 return handle, 让协程框架进行切换。这样就不会爆栈了 | ||
// 如果 handler_resume_once_flag == false, 说明用户使用了 executor 对 handler 进行了延后调用 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
建议 comment
重新组织语言,用清晰简洁的措词描述这个 flag
相关的机制即可
include/ucoro/awaitable.hpp
Outdated
if (*handler_resume_once_flag) | ||
{ | ||
// 这里 handler_resume_once_flag == true | ||
// 说明 resume_by_user 在 callback_function_ 里面就已经被执行了 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果 if
前面的 comment
已经描述清楚,则这里可以不再写相关的 comment
重复赘述。
include/ucoro/awaitable.hpp
Outdated
callback_function_([this](T t) mutable { this->result_ = std::move(t); }); | ||
// 如果这里是 false,说明 callback_function_ 里的用户代码没有立即调用 resume_by_user() | ||
// 而是进行了投递操作 | ||
// 那么 await_suspend 就必须要返回 noop_coroutine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同上,如果 if
前面的 comment
已经描述清楚,则这里可以不再写相关的 comment
重复赘述。
ExecutorAwaiter 暂时要留着。因为它 0 开销的传 coroutine_handle 是目前 libevent/libcurl 几个例子所必须的 |
其实我最初就是这个版本。用的 atomic_flag . 后来发现多线程下有问题。 |
因为你目前的版本对于多线程的判断多于复杂,根据我梳理的思路,是用不到通过 现在可以假设当前线程和 仔细想了下,确实没问题。 无非就是最终协程没有被 executor 的线程调度。 |
咋 quota 变成了 edit 你的 comment 了。。。。 |
仔细想了下,确实没问题。 无非就是最终协程没有被 executor 的线程调度。 |
假如 最终协程没有被 |
a39dadb
to
fd27cd1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
以下几个问题
1,修复注释中符号名称与项目中的保持一致。
2,是否应该考虑单独写一个 test 用于测试有无 executor 的两种不同的情况。
3,随着功能越来越丰富,我们是否需要添加一个文档,用于说明 ucoro 的 API 特点和使用方法,注意事项等等,不过这个可以在将来去做,不必在本 patch 中去修改。
include/ucoro/awaitable.hpp
Outdated
|
||
if (executor_detect_flag->test_and_set()) | ||
{ | ||
// 如果执行到这里,说明 call_detect 已经被执行,这里分 2 种情况: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
请修复 comment 中的提到的变量和函数名称,保持与代码中一致。
include/ucoro/awaitable.hpp
Outdated
} | ||
else | ||
{ | ||
callback_function_([this](T t) mutable { this->result_ = std::move(t); }); | ||
// 如果执行到这里,说明 call_detect 肯定没被执行,说明是由 executor 驱动. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同上
原先为了避免协程循环导致的爆栈, callback_awaitable 实现成了两种。 一种是在非 executor 环境下调用 另一种是在 executor 环境下调用的 executor_awaitable 非 executor 环境下使用的 callback_awaitable 使用了新的 await_suspend 签名 通过直接返回 coroutine_handle 的方式避免对 .resume() 的直接调用 从而避免了爆栈问题 但是这也导致, callback_awaitable无法在 executor 环境下使用。 现在更新一下 callback_awaitable, 它可以自动判断出来 callback_awaitable 传给你 的 handle 有没有被投递给 executor。如果投递给了 executor 它就 让 await_suspend 返回 noop_coroutine, 等你调用 handle 的时候,它内部再调用对应协程的 resume 来恢 复协程。而如果你没有投递 handle, 而是在 callback_awaitable 传你 handle 的时候立 马调用, 则 await_suspend 就会通过向协程框架返回 协程句柄的方式避免嵌套resume导致的 爆栈。 下面简述原理: callback_awaitable 调用 用户的回调函数的时候,会传入一个 handle 用户通过调用这个 handle 实现 恢复协程。让 co_await callback_awaitable 得以返回。 实现原理就是检测 handle 被调用的时候,是否是在 callback_awaitable 回调用户的上下文里。 也就是说,如果调用栈是 callback_awaitable::await_suspend -> user_lambda -> handle 那么,在 handle 的处理代码里,就标记一下,而不调用 coro_handle 的 resume 于是,等 user_lambda返回的时候,callback_awaitable::await_suspend 的代码通过检查 标记,就可以知道 handle 是不是被直接调用了。如果是,就 返回coro_handle, 否则返回 noop_coroutine. 如果 handle 的处理代码发现自己的调用栈不是 callback_awaitable::await_suspend 过 来的,则不做这个标记。 检查的方式如下: 1. 如果它发现自己运行的线程甚至不是 callback_awaitable::await_suspend 所运行的线程,则必然不在 callback_awaitable::await_suspend 的上下文里。 2. 通过检查一个共享的变量判断自己是否在 callback_awaitable::await_suspend 里面。 为啥 1. 要单独提出来呢? 因为 方法 2. 里有个隐含的条件,就是 handle 的处理代码, 和 callback_awaitable::await_suspend 调用 user_lambda 后的后续代码,是串行执行的。 如果只依赖 2. 这个方法,则可能判断出错。
fd27cd1
to
395fb31
Compare
test1 和 test_executor 就分别使用的是 有/冇 executor 的情况。无需写新的 test 。 注释已修正 |
callback_awaitable 自适应executor模式
原先为了避免协程循环导致的爆栈, callback_awaitable 实现成了两种。
一种是在非 executor 环境下调用
另一种是在 executor 环境下调用的 executor_awaitable
非 executor 环境下使用的 callback_awaitable 使用了新的 await_suspend 签名
通过直接返回 coroutine_handle 的方式避免对 .resume() 的直接调用
从而避免了爆栈问题
但是这也导致, callback_awaitable无法在 executor 环境下使用。
现在更新一下 callback_awaitable, 它可以自动判断出来 callback_awaitable 传给你
的 handle 有没有被投递给 executor。如果投递给了 executor 它就 让 await_suspend
返回 noop_coroutine, 等你调用 handle 的时候,它内部再调用对应协程的 resume 来恢
复协程。而如果你没有投递 handle, 而是在 callback_awaitable 传你 handle 的时候立
马调用, 则 await_suspend 就会通过向协程框架返回 协程句柄的方式避免嵌套resume导致的
爆栈。
下面简述原理:
callback_awaitable 调用 用户的回调函数的时候,会传入一个 handle
用户通过调用这个 handle 实现 恢复协程。让 co_await callback_awaitable 得以返回。
实现原理就是检测 handle 被调用的时候,是否是在 callback_awaitable 回调用户的上下文里。
也就是说,如果调用栈是
callback_awaitable::await_suspend -> user_lambda -> handle
那么,在 handle 的处理代码里,就标记一下,而不调用 coro_handle 的 resume
于是,等 user_lambda返回的时候,callback_awaitable::await_suspend 的代码通过检查
标记,就可以知道 handle 是不是被直接调用了。如果是,就 返回coro_handle,
否则返回 noop_coroutine.
如果 handle 的处理代码发现自己的调用栈不是 callback_awaitable::await_suspend 过
来的,则不做这个标记。
检查的方式如下:
如果它发现自己运行的线程甚至不是 callback_awaitable::await_suspend
所运行的线程,则必然不在 callback_awaitable::await_suspend 的上下文里。
通过检查一个共享的变量判断自己是否在 callback_awaitable::await_suspend 里面。
为啥 1. 要单独提出来呢? 因为 方法 2. 里有个隐含的条件,就是 handle 的处理代码,
和 callback_awaitable::await_suspend 调用 user_lambda 后的后续代码,是串行执行的。
如果只依赖 2. 这个方法,则可能判断出错。