libtask协程库实现源码学习
协程的概念不多说了,轻量级线程,其最大的优势就是协程之间的切换代价非常低,理论上是单线程运行,只是在应用层进行了上下文手动切换。
其最重要的实现函数是makecontext, getcontext, swapcontext 这一组函数,具体的协程上下文切换由他们完成。具体就不多说了,这些函数在glibc里面是已汇编的形式提供的,实际上做的工作就是讲各个寄存器,堆栈指针,指令指针等全部保存起来,或者进行切换,从而达到协程切换的目的。我们知道程序在CPU上运行的时候,注意依赖2个重要的东西:
- 1.堆栈指针ESP寄存器用来找到堆栈的顶部位置,从而实现去参数的目的。 这个告诉CPU堆栈在哪,也就是数据在哪。
- 2.指令指针EIP寄存器用来存放下一次CPU将要执行的代码段指令的偏移量。这个是CPU将要执行的代码。
程序的运行主要依靠上面2个变量,他们代表了一个线程的上下文环境。改变他们就可以达到执行其他位置的代码的目的。
0.基本原理
这里稍微介绍一下makecontext就可以理解其含义了。代码在sysdeps/unix/sysv/linux/i386/makecontext.S, 但是是汇编代码,在libtask里面提供了一份C语言版本的简化代码,注释如下:
#ifdef NEEDX86MAKECONTEXT
void
makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...)
{
int *sp;
sp = (int*)ucp->uc_stack.ss_sp+ucp->uc_stack.ss_size/4;//移动堆栈到末尾,然后从后面开始将参数拷贝上去
sp -= argc;
sp = (void*)((uintptr_t)sp - (uintptr_t)sp%16); /* 16-align for OS X */
memmove(sp, &argc+1, argc*sizeof(int));
*--sp = 0; /* return address */
ucp->uc_mcontext.mc_eip = (long)func;//设置指令指针寄存器的位置为参数的func
ucp->uc_mcontext.mc_esp = (int)sp;//堆栈寄存器。
}
#endif
上面需要注意的是堆栈是向下生长的,makecontext需要用户提供堆栈空间,这个我们自己按实际需求设置就行,只要不会造成堆栈溢出就可以了。设置的堆栈空间会在makecontext里面进行初始化,比如设置参数,返回值等。进行swapcontext的时候就会用到mc_eip, mc_esp等。
1.使用libtask协程
下面看一下官方的httpload.c的协程例子,实现HTTP下载或者说压力测试吧。
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <task.h>
#include <stdlib.h>
enum{ STACK = 32768 };
char *server;
char *url;
void fetchtask(void*);
void taskmain(int argc, char **argv){
int i, n;
if(argc != 4){
fprintf(stderr, "usage: httpload n server url\n");
taskexitall(1);
}
n = atoi(argv[1]);
server = argv[2];
url = argv[3];
for(i=0; i<n; i++){
taskcreate(fetchtask, 0, STACK);//创建协程,设置为可以运行的状态.
//其实fetchtask不是上下文切换的第一个函数,taskstart才是,后者立即调用fetchtask
while(taskyield() > 1)//主动释放CPU,这里循环其实是为了给其他协程足够的机会
;
sleep(1);
}
}
void fetchtask(void *v) {
int fd, n;
char buf[512];
fprintf(stderr, "starting...\n");
for(;;){
if((fd = netdial(TCP, server, 80)) < 0){//异步连接服务器,会造成协程切换
fprintf(stderr, "dial %s: %s (%s)\n", server, strerror(errno), taskgetstate());
continue;
}
snprintf(buf, sizeof buf, "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n", url, server);
fdwrite(fd, buf, strlen(buf));
while((n = fdread(fd, buf, sizeof buf)) > 0){
//buf[n] = '\0';
//printf("buf:%s", buf);
}
close(fd);
write(1, ".", 1);
}
}
从上面可以看出,使用libtask库后,程序的开始函数变为了taskmain, 原先的main被接管了。调用taskcreate创建协程,执行函数设置为fetchtask, 堆栈大小32K,其工作就是建立几个协程,然后发送HTTP请求。具体不多介绍啦,待会围绕这个例子来介绍下其原理。
2. 创建协程taskcreate
协程创建方法为 taskcreate(fetchtask, 0, STACK);,参数为执行函数和参数,还有堆栈大小。其为协程分配了Task数据结构,并且仅限初始化,将协程设置为可运行状态,这样就可以执行了。
Task结构比较简单,一些链表指针,加上一个ucontext_t 的成员,堆栈指针地址,大小,以及执行函数和参数,如下:
struct Task {
char name[256]; // offset known to acid
char state[256];
Task *next;//协程的双向链表
Task *prev;
Task *allnext;
Task *allprev;
Context context;//实际上就是ucontext_t的包装,后者就是系统里面的ucontext结构
uvlong alarmtime;
uint id;
uchar *stk;//堆栈起始位置,使用的时候是按向下使用的方式的
uint stksize;//堆栈长度
int exiting;
int alltaskslot;//alltask[]的当前占用最高槽位数,总数能到alltaskslot+ 64 - alltaskslot%64
int system;
int ready;//为1代表在运行状态
void (*startfn)(void*);
void *startarg;
void *udata;
};
int
taskcreate(void (*fn)(void*), void *arg, uint stack)
{
int id;
Task *t;
t = taskalloc(fn, arg, stack);//初始化申请Task数据结构,设置堆栈内容等
taskcount++;//当前OK的协程数目,不包括系统级别的协程
id = t->id;
if(nalltask%64 == 0){//一次申请64个槽位,只能往后放
alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0]));
if(alltask == nil){
fprint(2, "out of memory\n");
abort();
}
}
t->alltaskslot = nalltask;
alltask[nalltask++] = t;
taskready(t);//加入taskrunqueue的运行队列
return id;
}
其中taskalloc函数进行了内存分配,数据初始化,主要通过makecontext设置了协程的上下文结构context,将协程的初始化运行函数设置为taskstart,后者会直接调用应用层的执行函数的。详见代码注释:
static Task*
taskalloc(void (*fn)(void*), void *arg, uint stack)
{//申请Task结构,初始化信号,堆栈,运行函数等,调用makecontext模拟堆栈内容,设置堆栈指针esp位置灯
Task *t;
sigset_t zero;
uint x, y;
ulong z;
/* allocate the task and stack together */
t = malloc(sizeof *t+stack);//存放Task结构和后面的堆栈
if(t == nil){
fprint(2, "taskalloc malloc: %r\n");
abort();
}
memset(t, 0, sizeof *t);
t->stk = (uchar*)(t+1);//注意这里加1,实际上加的是一个Task结构,也就是移动了sizeof(Task).
t->stksize = stack;
t->id = ++taskidgen;//就是个全局静态变量自增的id
t->startfn = fn;
t->startarg = arg;
/* do a reasonable initialization */
memset(&t->context.uc, 0, sizeof t->context.uc);
sigemptyset(&zero);
sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);//这里获取一下现在的信号屏蔽字到uc_sigmask存起来,切换协程的时候会设
置的
/* must initialize with current context */
if(getcontext(&t->context.uc) < 0){//初始化获取一下当前上下文状态,下面在修改堆栈,指令等地址
fprint(2, "getcontext: %r\n");
abort();
}
/* call makecontext to do the real work. */
/* leave a few words open on both ends */
t->context.uc.uc_stack.ss_sp = t->stk+8;//堆栈起始位置,使用的时候会从结束位置开始向下使用的
t->context.uc.uc_stack.ss_size = t->stksize-64;
/*
* All this magic is because you have to pass makecontext a
* function that takes some number of word-sized variables,
* and on 64-bit machines pointers are bigger than words.
*/
//print("make %p\n", t);
z = (ulong)t;
y = z;
z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */
x = z>>16;
makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
//由于makecontext需要参数必须是4个字节的,所以这里讲t的指针转为64位8个字节,然后分y,x 2个四字节传参数
//以备makecontext能够模拟设置堆栈传参,并设置sp等堆栈指针位置
return t;
}
协程创建后,所有的协程都会放到alltask数组中,每隔64个realloc扩容一下, 其中可以被调度运行的协程放在taskrunqueue双向链表里面。新协程放在链表尾部。
3.协程的调度/运行
协程创建后,是怎么调度运行的呢?这里需要看一下真正的Main函数了,libtask的main接管函数首先自己调用taskcreate 创建了taskmainstart协程,其实就是我们应用层编写的taskmain函数了。
然后调用taskscheduler进行协程调度。
static void
taskmainstart(void *v)
{
taskname("taskmain");
taskmain(taskargc, taskargv);
}
int
main(int argc, char **argv)
{
struct sigaction sa, osa;
memset(&sa, 0, sizeof sa);
sa.sa_handler = taskinfo;
sa.sa_flags = SA_RESTART;
sigaction(SIGQUIT, &sa, &osa);
#ifdef SIGINFO
sigaction(SIGINFO, &sa, &osa);
#endif
argv0 = argv[0];
taskargc = argc;
taskargv = argv;
if(mainstacksize == 0)
mainstacksize = 256*1024;
taskcreate(taskmainstart, nil, mainstacksize);//创建初始协程,调用应用层的taskmain函数,此时taskmainstart协程还没有运行,等待
调度
taskscheduler();//进行协程调度
fprint(2, "taskscheduler returned in main!\n");
abort();
return 0;
}
下面重点在taskscheduler,所有要进行协程切换的地方,都会讲协程切换到这个上下文:taskschedcontext, 实际上就是我们的主线程所在的协程上下文。下面代码比较简单,重在理解:
static void
taskscheduler(void)
{//协程调度函数
int i;
Task *t;
taskdebug("scheduler enter");
for(;;){
if(taskcount == 0)
exit(taskexitval);
t = taskrunqueue.head;//从头部开始唤起
if(t == nil){
fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount);
exit(1);
}
deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行
t->ready = 0;
taskrunning = t;//标记正在执行的协程
tasknswitch++;
taskdebug("run %d (%s)", t->id, t->name);
contextswitch(&taskschedcontext, &t->context);//真正进行上下文切换,这样就切换到了其他协程运行,比如taskmainstart
//print("back in scheduler\n");
taskrunning = nil;
if(t->exiting){
if(!t->system)
taskcount--;
i = t->alltaskslot;
alltask[i] = alltask[--nalltask];
alltask[i]->alltaskslot = i;
free(t);
}
}
}
上面contextswitch进行了实际的上下文切换,比如切换到了taskmainstart 协程,这样当前的上下文执行环境会保存到taskschedcontext里面, contextswitch函数不会返回,直到再次有主动切换到这里为止。
那么,什么时候能够返回来呢?答案是:对方主动放弃CPU的时候,比如最开头的taskyield函数,taskyield函数会将当前运行的taskrunning协程设置为可运行状态,挂到taskrunqueue后面, 然后将其切换到taskschedcontext运行,也就是上面的taskscheduler运行的协程,实际上是taskscheduler里面的contextswitch函数里面,随后contextswitch将返回到taskscheduler里面继续进行调度运行。
void
taskswitch(void)
{//主动进行协程切换,其实就是切换到main协程,到那里去进行实际的切换
needstack(0);//检测堆栈溢出
contextswitch(&taskrunning->context, &taskschedcontext);
}
void
taskready(Task *t)
{
t->ready = 1;//状态设置为可以运行的状态,然后加到运行 任务队列末尾里面去
addtask(&taskrunqueue, t);
}
int
taskyield(void)
{
int n;
n = tasknswitch;//用来计算自愿放弃协程后,到恢复所发生的切换次数
taskready(taskrunning);//挂到taskrunqueue后面
taskstate("yield");
taskswitch();
return tasknswitch - n - 1;
}
介绍的差不多了,这里也许能想到,如果某个协程不小心执行了一行sleep(1000)的代码,那么会真的整个进程就sleep进去的,因为这里没有任何机会进行协程调度,所以在协程程序里面,需要换为taskdelay进行睡眠,其里面的实现下次写一下。
到这里差不多了,其实只要理解了协程的运行上下文概念,就差不多了。后面有时间再稍微记录一下libtask里面对于异步IO的代码。
协程主要适合于一些IO比较频繁的系统,在这样的系统中,使用协程跟多线程的优缺点比较如下:
- 1. 单线程异步IO: 代码难度比较大,需要自己处理异步I/O, epoll等, 优点是性能高,代码执行是顺序的,不需要关心锁,竞争等情况;
- 2.协程: 比单线程异步I/O容易编程,代码更好写,协程里面是顺序编程的,但协程之间是独立栈,共享堆内存,单线程执行环境,在一个CPU上运行。协程切换代价比线程少多了,只需要十几条汇编指令切换寄存器。每秒据说能达到上百万次切换。
- 3.多线程同步I/O: 代码相对也好写,跟协程一样独立栈,共享堆内存。 需要处理同步禁止问题,但线程切换代价特别大,linux里面没有原生的线程,是用进程实现的。
- 4.多进程: 这个得看具体应用了,需要共享数据的应用不适合用多进程,否则得共享数据。但代码相对容易编写;
这里只是简单介绍下协程的实现方式之一,具体使用哪一种看具体应用,灵活使用即可。
总之在使用libtask的时候,需要随时心里想着我这code是在一个协程里面运行的,数据都是共享的,什么时候放弃CPU,什么时候可能会发生切换,码农们需要知晓,代码且写且珍惜,不然容易出错造就悲剧,好东西总会有代价的。

@克莱尔孙
谢谢提醒,缺少review环节~
赞,可以弄成golang了,加一个GPM调度了^.^
@sdbc
恩,是的,反正线程也是一个执行上下文。linux里面更是一个进程的实现。
@克莱尔孙
谢谢指正了!确实错别字有点多^.^,打得太快没注意check
ucontext已经deprecated了,能不用还是最好别用
就是说。协程,不一定就基于单线程,完全可以多线程的。
为线程池系统提供更好的并行性。
最近实现了线程池+epoll+ucontext的服务器,可以多个线程+AIO。
contextswitch函数不会返回,知道再次有主动切换到这里为止。
如果taskmain中逻辑执行完了,没有涉及切换的逻辑,也是可以返回得。楼主是不是想表达别的意思。
另外,每篇文章的错别字都比较多,希望能改改 - -