Burger实现的是非抢占式的非对称 N : 1 协程
在特定的位置(比如阻塞I/O的调用点。协程通常都以I/O作为调度基础),由当前协程自己主动出让CPU(Yield), 如何阻塞后切出是有Hook函数决定的(下面讲)
Thread -> main_co <-----------> sub_co
|
|
sub_co
因为我们是非对称协程,所以我们拥有两种协程
main_co, 只负责切换,回收, 不分配栈空间,每个线程拥有一个,通过无参构造函数构造
sub_co, 分配默认128k栈空间,包含一个callBack,是一个执行体Task
我们支持的是1:N模式,每一个线程对应一个协程队列,可以完全无锁编写同步风格代码,对于IO密集型友好
现有的 C++ 协程库均基于两种方案来实现协程上下文切换:
- 利用汇编代码
- 利用操作系统提供的
API
我们此处采用Boost.context
的两个API`来作为协程的创建和切换
intptr_t jump_fcontext( fcontext_t * ofc, fcontext_t nfc, intptr_t vp, bool preserve_fpu = false);
fcontext_t make_fcontext( void * sp, std::size_t size, void (* fn)( intptr_t) );
// 构造
ctx_ = make_fcontext(static_cast<char*>(stack_) + stackSize_, stackSize_, &Coroutine::RunInCo);
// 切换
jump_fcontext(&curCo->ctx_, t_main_co->ctx_, 0);
// 一共 72 + bytes
/****************************************************************************************
* *
* ---------------------------------------------------------------------------------- *
* | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | *
* ---------------------------------------------------------------------------------- *
* | 0x0 | 0x4 | 0x8 | 0xc | 0x10 | 0x14 | 0x18 | 0x1c | *
* ---------------------------------------------------------------------------------- *
* | fc_mxcsr|fc_x87_cw| R12 | R13 | R14 | *
* ---------------------------------------------------------------------------------- *
* ---------------------------------------------------------------------------------- *
* | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | *
* ---------------------------------------------------------------------------------- *
* | 0x20 | 0x24 | 0x28 | 0x2c | 0x30 | 0x34 | 0x38 | 0x3c | *
* ---------------------------------------------------------------------------------- *
* | R15 | RBX | RBP | RIP | *
* ---------------------------------------------------------------------------------- *
* ---------------------------------------------------------------------------------- *
* | 16 | 17 | | *
* ---------------------------------------------------------------------------------- *
* | 0x40 | 0x44 | | *
* ---------------------------------------------------------------------------------- *
* | EXIT | | *
* ---------------------------------------------------------------------------------- *
* *
****************************************************************************************/
*
* -------------------------------------------------------------------------------
* context: | r12 | r13 | r14 | r15 | rbx | rbp | rip | end | ...
* -------------------------------------------------------------------------------
* 8 16 24 32 40 48 56 64 |
* 0x8 0x10 0x18 0x20 0x28 0x30 0x38 0x40 | 16-align for macosx
* |
* rsp when jump to function
### make_fcontext
/*
* @param stackdata the stack data (rdi)
* @param stacksize the stack size (rsi)
* @param func the entry function (rdx)
*
* @return the context pointer (rax)
*/
// notes: 当整形参数少于7个时, 参数从左到右放入寄存器: rdi, rsi, rdx, rcx, r8, r9。
// rax用于第一个返回寄存器
fcontext_t make_fcontext( void * sp, std::size_t size, void (* fn)( intptr_t) );
// 代码实现
make_fcontext:
// 保存栈顶指针到rax
movq %rdi, %rax
/* 先对栈指针进行16字节对齐
*
*
* ------------------------------
* context: | retaddr | padding ... |
* ------------------------------
* | |
* | 此处16字节对齐
* |
* esp到此处时,会进行ret
*
* 这么做,主要是因为macosx下,对调用栈布局进行了优化,在保存调用函数返回地址的堆栈处,需要进行16字节对齐,方便利用SIMD进行优化
*/
// andq $-16, %rax 表示低4位取0。 -16 的补码表示为0xfffffffff0.
andq $-16, %rax
// 保留context需要的一些空间,因为context和stack是在一起的,stack底指针就是context
leaq -0x48(%rax), %rax
// 保存func函数地址到context.rip
movq %rdx, 0x38(%rax)
// 保存fpu 总大小8字节。
// 操作系统使用这些专用寄存器在任务切换时保存状态信息。
stmxcsr (%rax)
fnstcw 0x4(%rax)
// 计算finish的绝对地址,保存到栈的0x40位置。
// leaq finish(%rip), %rcx 表示finish是相对位置+rip 就是finish的函数的地址。
leaq finish(%rip), %rcx
movq %rcx, 0x40(%rax)
// 返回,rax 作为返回值,目前的指向可以当做新栈的栈顶,相当于rsp
ret /* return pointer to context-data */
finish:
/* exit code is zero */
xorq %rdi, %rdi
/* exit application */
call _exit@PLT
hlt
// 为什么会预留72字节大小。首先知道jump_fcontext 在新栈需要 pop 的大小为,fpu(8字节)+ rbp rbx r12 ~ r15 (8*6 = 48 字节) = 56 字节。还会继续POP rip 8 字节,所以可以看到第二步中 movq %rdx, 0x38(%rax),就是将rip 保存到这个位置。
目前已经64字节了,栈还有存储什么呢,协程(fn 函数)运行完成后会退出调用ret,其实就是POP到 rip.所以保存是finish 函数指针 大小8字节。总共 72 字节。
jump_fcontext:
// 实际上在call本函数时,就已经执行了pushq %rip,保存了返回后的下一条指令的地址
// 保存寄存器
pushq %rbp /* save RBP */
pushq %rbx /* save RBX */
pushq %r15 /* save R15 */
pushq %r14 /* save R14 */
pushq %r13 /* save R13 */
pushq %r12 /* save R12 */
// 预留fpu 8个字节空间
leaq -0x8(%rsp), %rsp
// 判断是否保存fpu
// rcx是第四个参数,判断是否等于0。如果为0,跳转到1标示的位置。也就是preserve_fpu 。
// 当preserve_fpu = true 的时候,需要执行2个指令是将浮点型运算的2个32位寄存器数据保存到第2步中预留的8字节空间。
cmp $0, %rcx
je 1f
stmxcsr (%rsp)
fnstcw 0x4(%rsp)
1:
// 将rsp 保存到第一参数(第一个参数保存在rdi)指向的内存。
// fcontext_t *ofc 第一参数ofc指向的内存中保存是 rsp 的指针
movq %rsp, (%rdi)
// 实现了将第二个参数复制到 rsp.(此时通过改变%rsp,已经完成了运行栈的切换)
movq %rsi, %rsp
// 判断是否保存了fpu,如果保存了就恢复保存在nfx 栈上的 fpu相关数据到响应的寄存器。
cmp $0, %rcx
je 2f
ldmxcsr (%rsp)
fldcw 0x4(%rsp)
2:
//将rsp 存储的地址+8(8字节fpu),按顺序将栈中数据恢复到寄存器中。
leaq 0x8(%rsp), %rsp
popq %r12 /* restrore R12 */
popq %r13 /* restrore R13 */
popq %r14 /* restrore R14 */
popq %r15 /* restrore R15 */
popq %rbx /* restrore RBX */
popq %rbp /* restrore RBP */
// 设置返回值,实现指令跳转。(刚刚提到调用前执行了push %rip, 所以这里就得到了下一条指令的地址)
popq %r8
/* use third arg as return-value after jump */
movq %rdx, %rax
/* use third arg as first arg in context function */
movq %rdx, %rdi
/* indirect jump to context */(从下一条指令处开始执行, %rsp依然在协程的运行栈中)
jmp *%r8
.size jump_fcontext,.-jump_fcontext
// todo
// https://segmentfault.com/a/1190000019154852
我们的协程拥有三种状态,
enum class State {
EXEC, // 运行状态
HOLD, // 挂起状态
TERM // 未执行状态
};
我们通过resume()
从main_co
切换到当前协程co
执行
通过Yield()
挂起当前正在执行的协程,切换到主协程执行
void Coroutine::RunInCo(intptr_t vp) {
Coroutine::ptr cur = GetCurCo();
DEBUG("Co : {} - {} running", cur->getCoId(), cur->getName());
cur->cb_();
cur->cb_ = nullptr;
cur->setState(State::TERM);
DEBUG("Co : {} - {} run end", cur->getCoId(), cur->getName());
cur.reset(); // 防止无法析构
Coroutine::Yield(); //重新返回主协程
}
我们在创建协程的时候设置了这个函数为上下文环境,当resume到当前协程时候,跳转到此上下文执行,去调用callBack执行,执行结束后注意此处将cur.reset(),因为这里我们执行完Yield切换出去,这里协程就算完成结束了,而此处的上下文环境还保存着,这里局部智能指针还在其中有一个引用计数
1 -- N 1 -- M
scheduler --> thread --> co
内部带有一个线程池, 每一个IO线程都有一个独一无二的执行器Processor
协程调度器将协程分配到线程(Processor
)上去执行
我们对上层用户不暴露协程,上层只需要传入callBack
,我们内部将callBack
交给一个工作Processo
r,并且调用的是proc->addPendingTask()
,而不是直接addTask
(下面Processor讲解为何)
void Scheduler::addTask(const Coroutine::Callback& task, const std::string& name) {
Processor* proc = pickOneWorkProcessor();
assert(proc != nullptr);
proc->addPendingTask(task, name);
}
总的来说,Scheduler
与上层交互去增加任务,在内部将其分发给各个Processor
处理,隐藏底层怎么处理的细节。
多线程情况下Scheduler
有一个mainProc
和多个workProc
, mainProc
的职责相当于一个包工头,因为mainProc
所在线程主要负责accept
连接(和客户见面交接)
// https://github.com/BurgerGroup/Burger/blob/main/burger/net/CoTcpServer.cc
void CoTcpServer::start() {
if(started_.getAndSet(1) == 0) {
sched_->startAsync();
listenSock_->listen();
sched_->addMainTask(std::bind(&CoTcpServer::startAccept, this), "Accept");
}
}
然后将连接好的connFd
生成conn
, 和connHandler
交给workProc
去处理(打工人干活)
// https://github.com/BurgerGroup/Burger/blob/main/burger/net/CoTcpServer.cc
void CoTcpServer::startAccept() {
while(started_.get()) {
InetAddress peerAddr;
int connfd = listenSock_->accept(peerAddr);
if(connfd > 0) {
...
// 将conn交给一个sub processor
Processor* proc = sched_->pickOneWorkProcessor();
CoTcpConnection::ptr conn = std::make_shared<CoTcpConnection>(proc, connfd,
listenAddr_, peerAddr, connName);
proc->addTask(std::bind(connHandler_, conn), "connHandler");
} else {
...
}
}
当然如果整个公司只有这么一个包工头,那么连接和工作的事都由mainProc来干。
Processor
名为运行器,是和协程的运行相关,其最核心功能在于run()
和addTask()
。
addTask
线程安全
void Processor::addTask(const Coroutine::Callback& cb, const std::string& name) {
if(isInProcThread())
addTask(resetAndGetCo(cb, name));
else
addPendingTask(cb, name);
}
其中我们本线程内会调用resetAndGetCo()
来包装callback
成协程,我们这里有两个queue
,一个是等待执行队列 runnableCoQue_
,一个是执行完后放入的idleCoQue_
,等待我们复用这些已经执行的co,避免重复malloc, free。
跨线程调用addPendingTask()
, 在run中由该线程来将其包装成co。
void Processor::run() {
assertInProcThread();
TRACE("Processor {} start running", fmt::ptr(this));
stop_ = false;
setHookEnabled(true);
//没有可以执行协程时调用epoll协程
Coroutine::ptr epollCo = std::make_shared<Coroutine>(std::bind(&CoEpoll::poll, &epoll_, kEpollTimeMs), "Epoll");
epollCo->setState(Coroutine::State::EXEC);
Coroutine::ptr cur;
while (!stop_ || !runnableCoQue_.empty()) {
//没有协程时执行epoll协程
if (runnableCoQue_.empty()) {
cur = epollCo;
epoll_.setEpolling(true);
} else {
cur = runnableCoQue_.front();
runnableCoQue_.pop();
}
cur->resume();
if (cur->getState() == Coroutine::State::TERM) {
--load_;
idleCoQue_.push(cur);
}
// 避免在其他线程添加任务时错误地创建多余的协程(确保协程只在processor中)
addPendingTasksIntoQueue();
}
TRACE("Processor {} stop running", fmt::ptr(this));
epollCo->resume(); // epoll进去把cb执行完
}
我们在run()
里创建一个epoll co
,如果没其他协程等待执行就进入epoll::poll()
等待新事件到来,然后将跨线程和Scheduler
添加的cb
包装成co
装入执行队列。
最后我们epollCo->resume()
进入epoll协程跳出循环执行完(否则其在析构时还在挂起状态,而不是正常结束状态)。
epoll的职责是关注事件,而我们此处都是加入队列等待执行co,并无直接和epoll交互,我们如何添加事件关注,详细见 epoll
我们在创建时,processor会创建三个co,一个是每个线程所有的main_co, 一个是Wake co, 用来唤醒epoll的,还有一个是处理超时事件的timerQue。详见coTimerQueue
https://www.bbsmax.com/A/nAJvOaG85r/