Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

callback_awaitable 自适应executor模式 #27

Merged
merged 1 commit into from
Oct 16, 2024

Conversation

microcai
Copy link
Member

@microcai microcai commented Oct 16, 2024

callback_awaitable 自适应executor模式

原先为了避免协程循环导致的爆栈, callback_awaitable 实现成了两种。
一种是在非 executor 环境下调用
另一种是在 executor 环境下调用的 executor_awaitable

非 executor 环境下使用的 callback_awaitable 使用了新的 await_suspend 签名
通过直接返回 coroutine_handle 的方式避免对 .resume() 的直接调用
从而避免了爆栈问题

但是这也导致, callback_awaitable无法在 executor 环境下使用。

现在更新一下 callback_awaitable, 它可以自动判断出来 callback_awaitable 传给你
的 handle 有没有被投递给 executor。如果投递给了 executor 它就 让 await_suspend
返回 noop_coroutine, 等你调用 handle 的时候,它内部再调用对应协程的 resume 来恢
复协程。而如果你没有投递 handle, 而是在 callback_awaitable 传你 handle 的时候立
马调用, 则 await_suspend 就会通过向协程框架返回 协程句柄的方式避免嵌套resume导致的
爆栈。

下面简述原理:

callback_awaitable 调用 用户的回调函数的时候,会传入一个 handle
用户通过调用这个 handle 实现 恢复协程。让 co_await callback_awaitable 得以返回。

实现原理就是检测 handle 被调用的时候,是否是在 callback_awaitable 回调用户的上下文里。
也就是说,如果调用栈是

callback_awaitable::await_suspend -> user_lambda -> handle

那么,在 handle 的处理代码里,就标记一下,而不调用 coro_handle 的 resume

于是,等 user_lambda返回的时候,callback_awaitable::await_suspend 的代码通过检查
标记,就可以知道 handle 是不是被直接调用了。如果是,就 返回coro_handle,
否则返回 noop_coroutine.

如果 handle 的处理代码发现自己的调用栈不是 callback_awaitable::await_suspend 过
来的,则不做这个标记。

检查的方式如下:

  1. 如果它发现自己运行的线程甚至不是 callback_awaitable::await_suspend
    所运行的线程,则必然不在 callback_awaitable::await_suspend 的上下文里。

  2. 通过检查一个共享的变量判断自己是否在 callback_awaitable::await_suspend 里面。

为啥 1. 要单独提出来呢? 因为 方法 2. 里有个隐含的条件,就是 handle 的处理代码,
和 callback_awaitable::await_suspend 调用 user_lambda 后的后续代码,是串行执行的。
如果只依赖 2. 这个方法,则可能判断出错。

@Jackarain
Copy link
Member

如何自动判定是不是在executor中执行的handle?请说明原理,这是最关键的部分,上述并没有提到。

@microcai microcai force-pushed the new_callback_awaitable branch 4 times, most recently from 681d789 to 7eb519a Compare October 16, 2024 04:23
@microcai
Copy link
Member Author

callback_awaitable 自适应executor模式

原先为了避免协程循环导致的爆栈, callback_awaitable 实现成了两种。
一种是在非 executor 环境下调用
另一种是在 executor 环境下调用的 executor_awaitable

非 executor 环境下使用的 callback_awaitable 使用了新的 await_suspend 签名
通过直接返回 coroutine_handle 的方式避免对 .resume() 的直接调用
从而避免了爆栈问题

但是这也导致, callback_awaitable无法在 executor 环境下使用。

现在更新一下 callback_awaitable, 它可以自动判断出来 callback_awaitable 传给你
的 handle 有没有被投递给 executor。如果投递给了 executor 它就 让 await_suspend
返回 noop_coroutine, 等你调用 handle 的时候,它内部再调用对应协程的 resume 来恢
复协程。而如果你没有投递 handle, 而是在 callback_awaitable 传你 handle 的时候立
马调用, 则 await_suspend 就会通过向协程框架返回 协程句柄的方式避免嵌套resume导致的
爆栈。

下面简述原理:

callback_awaitable 调用 用户的回调函数的时候,会传入一个 handle
用户通过调用这个 handle 实现 恢复协程。让 co_await callback_awaitable 得以返回。

实现原理就是检测 handle 被调用的时候,是否是在 callback_awaitable 回调用户的上下文里。
也就是说,如果调用栈是

callback_awaitable::await_suspend -> user_lambda -> handle

那么,在 handle 的处理代码里,就标记一下,而不调用 coro_handle 的 resume

于是,等 user_lambda返回的时候,callback_awaitable::await_suspend 的代码通过检查
标记,就可以知道 handle 是不是被直接调用了。如果是,就 返回coro_handle,
否则返回 noop_coroutine.

如果 handle 的处理代码发现自己的调用栈不是 callback_awaitable::await_suspend 过
来的,则不做这个标记。

检查的方式如下:

  1. 如果它发现自己运行的线程甚至不是 callback_awaitable::await_suspend
    所运行的线程,则必然不在 callback_awaitable::await_suspend 的上下文里。

  2. 通过检查一个共享的变量判断自己是否在 callback_awaitable::await_suspend 里面。

为啥 1. 要单独提出来呢? 因为 方法 2. 里有个隐含的条件,就是 handle 的处理代码,
和 callback_awaitable::await_suspend 调用 user_lambda 后的后续代码,是串行执行的。
如果只依赖 2. 这个方法,则可能判断出错。

@microcai microcai force-pushed the new_callback_awaitable branch 2 times, most recently from 939b422 to e18a6e6 Compare October 16, 2024 04:44
@microcai microcai self-assigned this Oct 16, 2024
@microcai microcai added the enhancement New feature or request label Oct 16, 2024
@microcai microcai force-pushed the new_callback_awaitable branch from e18a6e6 to a39dadb Compare October 16, 2024 05:41
Copy link
Member

@Jackarain Jackarain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主要几个问题

一,关于其中 thread 的判断,似乎是没有必要的,关于这一点,我在下面有详细的解释。
二,就是关于命名和注释应简洁,不应带过多的冗余信息导致阅读上的障碍。
三,ExecutorAwaiter 还有没有继续保留的必要?这需要讨论,从新修改的 CallbackAwaiter 来看,功能上是包含了 ExecutorAwaiter,但根据 c++ 的零开销原则,CallbackAwaiter 引入了 std::shared_ptrExecutorAwaiter 没有这个开销,但这并不是什么大的问题,我最终意见还是删除 ExecutorAwaiter

参考实现:

void call_detect(std::coroutine_handle<> handle, std::shared_ptr<std::atomic_flag> call_flag)
{
	if (call_flag->test_and_set())
	{
		// 如果执行到这里,说明 call_detect 运行在 callback_function_ 返回之后,所以也就
		// 是说运行在 executor 中。
		handle.resume();
	}
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<> handle)
{
	auto call_flag = std::make_shared<std::atomic_flag>();

	if constexpr (std::is_void_v<T>)
	{
		callback_function_([this, handle, call_flag]() mutable
		{
			return call_detect(handle, call_flag);
		});
	}
	else
	{
		callback_function_([this, call_flag, handle](T t) mutable
		{
			this->result_ = std::move(t);
			return call_detect(handle, call_flag);
		});
	}

	if (call_flag->test_and_set())
	{
		// 如果执行到这里,说明 call_detect 已经被执行,这里分 2 种情况:
		//
		// 第一种情况就是
		// 在 executor 线程中执行了 call_detect,是由于 executor 线程快于当前线程。
		//
		// executor 线程快于当前线程的情况下,call_detect 什么都不会做,仅仅只设置 flag。
		// 如果 executor 线程慢于当前线程,则上面的 flag.test_and_set() 会返回 false 并
		// 设置 flag,然后执行 return std::noop_coroutine(); 在此后的 call_detect 中
		// 因为 flag.test_and_set() 为 true 将会 resume 协程。
		//
		// 第二种情况就是 call_detect 直接被 callback_function_ 调用的,call_detect
		// 也仅仅只设置 flag。
		//
		// 无论哪一种情况,我们都可以在这里直接返回 handle 让协程框架维护协程 resume。
		return handle;
	}
	else
	{
		// 如果执行到这里,说明 call_detect 肯定没被执行,说明是由 executor 驱动.
		// executor 驱动即返回 noop_coroutine 即可.
		return std::noop_coroutine();
	}
}

在上面的逻辑就是,无论 executor 线程快于当前线程,还是 executor 线程慢于当前线程,都将进入正确的逻辑当中。
通过 test_and_set 的原子性可以达到这一目标,当任何一线程抢先运行(即当前 call_flagfalse),则都会由抢先者 resume 协程。

因为 await_suspend 中的 if (call_flag->test_and_set()) 是在 callback_function_ 执行之后调用的,即使
call_detect 线程跑的更快(已经调用了 resume),那么在 await_suspend 中则直接返回 std::noop_coroutine, 而不是重复返回 return handle 导致框架再次 resume

constexpr bool await_ready() noexcept
{
return false;
}

auto await_suspend(std::coroutine_handle<> handle)
// 用户调用 handle( ret_value ) 就是在这里执行的.
void resume_by_user(std::coroutine_handle<> handle, std::thread::id thread_id, std::shared_ptr<bool> handler_resume_once_flag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

函数名 resume_by_user 不太具有指导性,鉴于它所实现的功能是用于检查是否为用户直接调用,建议命名为 dircet_call_detect 或更简短的 call_detect,不必太过冗余。

handler_resume_once_flag 根据它的用途及含义,建议命名为 call_flag 会更简洁。

thread_id 似乎没有存在的必要,因为如果是被 executor 调用的话,无论是多线程还是单线程,handle 被调用的时机一定是在 await_suspend 返回之后,也或者说:

在有 executor 的情况下,handle 当在被调用 executor 调用的,那么也就是说 callback_function_ 是在被调用完成之后 handle 再被调用的。

而没有 executor 的情况下,handle 则是在 callback_function_ 中被调用的,此时 callback_function_ 并未完成。

由此可见,我们似乎并不需要关心它是不是在不同 thread 中调用,因为上面2个条件包括了多线程中调用 handle 的情况。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread_id 是非常有必要的。因为多线程executor确实有可能,更快的执行 resume_by_user
我在 callback_function_ 后面插入一个小 sleep 就能很快模拟出来。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用 handle_return_value 这个名字如何?

Copy link
Member

@Jackarain Jackarain Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@microcai 请刷新,我已经在上面给出了一段参考实现,并且我也通过添加 sleep 测试了模拟各种线程快慢的情况,包括你说的这个情况。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我最初就是用的原子。结果发现有问题。才改成判断线程 id.

这里的逻辑是,通过 谁 更快的插旗 来判断 executor 的存在。

在单线程环境下,只要 executor 存在,那必然是 await_suspend 里的代码更快的插旗。
但是,在多线程的环境下,await_suspend 里的代码有可能来不及先插旗,被 resume_by_user 抢先。
于是 await_suspend 里的代码就判断 executor 不存在了。

于是协程被原地 resume. 但是 executor 存在的情况下我们知道这种原地 resume 是错误的。必须等 executor 调度去执行 resume. 所以说线程id的判断是非常有必要的。哪怕用了原子flag操作也是必须的。

// 说明被多线程 executor 投递执行了
// 既然是投递执行的,显然 await_suspend 得返回 noop_coroutine
// 因此这里就需要调用 resume 来恢复协程
if (thread_id != std::this_thread::get_id())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我们似乎并不需要关心它是不是在不同 thread 中调用,上面已经表达过原因了

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里在多线程下有竞争的。去掉线程id 判断会导致下面的逻辑会有竞争从而判断错误。而这里判断了 线程 id 那么下面的代码就肯定是不会有竞争的。

{
auto thread_id = std::this_thread::get_id();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

{
auto thread_id = std::this_thread::get_id();

auto handler_resume_once_flag = std::make_shared<bool>(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议命名为 call_flag 会更简洁


// 如果 resume_by_user 在用户代码直接执行了
// 则 这里要 return handle, 让协程框架进行切换。这样就不会爆栈了
// 如果 handler_resume_once_flag == false, 说明用户使用了 executor 对 handler 进行了延后调用
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议 comment 重新组织语言,用清晰简洁的措词描述这个 flag 相关的机制即可

if (*handler_resume_once_flag)
{
// 这里 handler_resume_once_flag == true
// 说明 resume_by_user 在 callback_function_ 里面就已经被执行了
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果 if 前面的 comment 已经描述清楚,则这里可以不再写相关的 comment 重复赘述。

callback_function_([this](T t) mutable { this->result_ = std::move(t); });
// 如果这里是 false,说明 callback_function_ 里的用户代码没有立即调用 resume_by_user()
// 而是进行了投递操作
// 那么 await_suspend 就必须要返回 noop_coroutine
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上,如果 if 前面的 comment 已经描述清楚,则这里可以不再写相关的 comment 重复赘述。

@microcai
Copy link
Member Author

主要几个问题

一,关于其中 thread 的判断,似乎是没有必要的,关于这一点,我在下面有详细的解释。 二,就是关于命名和注释应简洁,不应带过多的冗余信息导致阅读上的障碍。 三,ExecutorAwaiter 还有没有继续保留的必要?这需要讨论,从新修改的 CallbackAwaiter 来看,功能上是包含了 ExecutorAwaiter,但根据 c++ 的零开销原则,CallbackAwaiter 引入了 std::shared_ptrExecutorAwaiter 没有这个开销,但这并不是什么大的问题,我最终意见还是删除 ExecutorAwaiter

参考实现:

void call_detect(std::coroutine_handle<> handle, std::shared_ptr<std::atomic_flag> call_flag)
{
	if (call_flag->test_and_set())
	{
		// 如果执行到这里,说明 call_detect 运行在 callback_function_ 返回之后,所以也就
		// 是说运行在 executor 中。
		handle.resume();
	}
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<> handle)
{
	auto call_flag = std::make_shared<std::atomic_flag>();

	if constexpr (std::is_void_v<T>)
	{
		callback_function_([this, handle, call_flag]() mutable
		{
			return call_detect(handle, call_flag);
		});
	}
	else
	{
		callback_function_([this, call_flag, handle](T t) mutable
		{
			this->result_ = std::move(t);
			return call_detect(handle, call_flag);
		});
	}

	if (call_flag->test_and_set())
	{
		// 如果执行到这里,说明 call_detect 已经被执行,这里分 2 种情况:
		//
		// 第一种情况就是
		// 在 executor 线程中执行了 call_detect,是由于 executor 线程快于当前线程。
		//
		// executor 线程快于当前线程的情况下,call_detect 什么都不会做,仅仅只设置 flag。
		// 如果 executor 线程慢于当前线程,则上面的 flag.test_and_set() 会返回 false 并
		// 设置 flag,然后执行 return std::noop_coroutine(); 在此后的 call_detect 中
		// 因为 flag.test_and_set() 为 true 将会 resume 协程。
		//
		// 第二种情况就是 call_detect 直接被 callback_function_ 调用的,call_detect
		// 也仅仅只设置 flag。
		//
		// 无论哪一种情况,我们都可以在这里直接返回 handle 让协程框架维护协程 resume。
		return handle;
	}
	else
	{
		// 如果执行到这里,说明 call_detect 肯定没被执行,说明是由 executor 驱动.
		// executor 驱动即返回 noop_coroutine 即可.
		return std::noop_coroutine();
	}
}

ExecutorAwaiter 暂时要留着。因为它 0 开销的传 coroutine_handle 是目前 libevent/libcurl 几个例子所必须的
在这几个例子修正前,要留着 ExecutorAwaiter

@microcai
Copy link
Member Author

test_and_set

其实我最初就是这个版本。用的 atomic_flag . 后来发现多线程下有问题。

@Jackarain
Copy link
Member

Jackarain commented Oct 16, 2024

其实我最初就是这个版本。用的 atomic_flag . 后来发现多线程下有问题。

atomic_flag 确实是刚好可以解决此类多线程问题的,也许是你原来的版本哪里有疏忽了吧

因为你目前的版本对于多线程的判断多于复杂,根据我梳理的思路,是用不到通过 thread id 来处理这个多线程的问题的。

现在可以假设当前线程和 executor 线程分别快慢的各种情况来进行测试,并作梳理

仔细想了下,确实没问题。 无非就是最终协程没有被 executor 的线程调度。
之前有问题是别的问题导致的。参考我 force push 了几个。之前都是 红叉, ctest 跑挂了。
说明我最初的版本问题不是出在竞争上。^_^

@microcai
Copy link
Member Author

咋 quota 变成了 edit 你的 comment 了。。。。

@microcai
Copy link
Member Author

仔细想了下,确实没问题。 无非就是最终协程没有被 executor 的线程调度。
之前有问题是别的问题导致的。参考我 force push 了几个。之前都是 红叉, ctest 跑挂了。
说明我最初的版本问题不是出在竞争上。^_^

仔细想了下,确实没问题。 无非就是最终协程没有被 executor 的线程调度。
之前有问题是别的问题导致的。参考我 force push 了几个。之前都是 红叉, ctest 跑挂了。
说明我最初的版本问题不是出在竞争上。^_^

@Jackarain
Copy link
Member

Jackarain commented Oct 16, 2024

仔细想了下,确实没问题。 无非就是最终协程没有被 executor 的线程调度。 之前有问题是别的问题导致的。参考我 force push 了几个。之前都是 红叉, ctest 跑挂了。 说明我最初的版本问题不是出在竞争上。^_^

假如 executor 的线程调度过快的抢在前面,那么当前协程所在位置肯定还处于 await_suspend 中,但此时此刻,等于说异步已经有了结果,对吧?那么在 await_suspend 中按 return handle; 来让协程框架执行 resume 就可以了。

最终协程没有被 executor 的线程调度,这个问题啊,谁让它的异步已经提前有了结果啊,和同步拿到了结果已经没啥区别了,这里目前我认为,不需要规定协程保证在用户调用 handle 所在的线程,asio 都没作这方面的保证。

@microcai microcai force-pushed the new_callback_awaitable branch from a39dadb to fd27cd1 Compare October 16, 2024 10:23
Copy link
Member

@Jackarain Jackarain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

以下几个问题

1,修复注释中符号名称与项目中的保持一致。
2,是否应该考虑单独写一个 test 用于测试有无 executor 的两种不同的情况。
3,随着功能越来越丰富,我们是否需要添加一个文档,用于说明 ucoro 的 API 特点和使用方法,注意事项等等,不过这个可以在将来去做,不必在本 patch 中去修改。


if (executor_detect_flag->test_and_set())
{
// 如果执行到这里,说明 call_detect 已经被执行,这里分 2 种情况:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请修复 comment 中的提到的变量和函数名称,保持与代码中一致。

}
else
{
callback_function_([this](T t) mutable { this->result_ = std::move(t); });
// 如果执行到这里,说明 call_detect 肯定没被执行,说明是由 executor 驱动.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

原先为了避免协程循环导致的爆栈, callback_awaitable 实现成了两种。
一种是在非 executor 环境下调用
另一种是在 executor 环境下调用的 executor_awaitable

非 executor 环境下使用的 callback_awaitable 使用了新的 await_suspend 签名
通过直接返回 coroutine_handle 的方式避免对 .resume() 的直接调用
从而避免了爆栈问题

但是这也导致, callback_awaitable无法在 executor 环境下使用。

现在更新一下 callback_awaitable, 它可以自动判断出来 callback_awaitable 传给你
的 handle 有没有被投递给 executor。如果投递给了 executor 它就 让 await_suspend
返回 noop_coroutine, 等你调用 handle 的时候,它内部再调用对应协程的 resume 来恢
复协程。而如果你没有投递 handle, 而是在 callback_awaitable 传你 handle 的时候立
马调用, 则 await_suspend 就会通过向协程框架返回 协程句柄的方式避免嵌套resume导致的
爆栈。

下面简述原理:

callback_awaitable 调用 用户的回调函数的时候,会传入一个 handle
用户通过调用这个 handle 实现 恢复协程。让 co_await callback_awaitable 得以返回。

实现原理就是检测 handle 被调用的时候,是否是在 callback_awaitable 回调用户的上下文里。
也就是说,如果调用栈是

callback_awaitable::await_suspend -> user_lambda -> handle

那么,在 handle 的处理代码里,就标记一下,而不调用 coro_handle 的 resume

于是,等 user_lambda返回的时候,callback_awaitable::await_suspend 的代码通过检查
标记,就可以知道 handle 是不是被直接调用了。如果是,就 返回coro_handle,
否则返回 noop_coroutine.

如果 handle 的处理代码发现自己的调用栈不是 callback_awaitable::await_suspend 过
来的,则不做这个标记。

检查的方式如下:

1. 如果它发现自己运行的线程甚至不是 callback_awaitable::await_suspend
所运行的线程,则必然不在 callback_awaitable::await_suspend 的上下文里。

2. 通过检查一个共享的变量判断自己是否在 callback_awaitable::await_suspend 里面。

为啥 1. 要单独提出来呢? 因为 方法 2. 里有个隐含的条件,就是 handle 的处理代码,
和 callback_awaitable::await_suspend 调用 user_lambda 后的后续代码,是串行执行的。
如果只依赖 2. 这个方法,则可能判断出错。
@microcai microcai force-pushed the new_callback_awaitable branch from fd27cd1 to 395fb31 Compare October 16, 2024 10:46
@microcai
Copy link
Member Author

test1 和 test_executor 就分别使用的是 有/冇 executor 的情况。无需写新的 test 。

注释已修正

@microcai microcai merged commit d409284 into master Oct 16, 2024
6 checks passed
@microcai microcai deleted the new_callback_awaitable branch October 16, 2024 12:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants