libtask协程库实现源码学习
协程的概念不多说了,轻量级线程,其最大的优势就是协程之间的切换代价非常低,理论上是单线程运行,只是在应用层进行了上下文手动切换。
其最重要的实现函数是makecontext, getcontext, swapcontext 这一组函数,具体的协程上下文切换由他们完成。具体就不多说了,这些函数在glibc里面是已汇编的形式提供的,实际上做的工作就是讲各个寄存器,堆栈指针,指令指针等全部保存起来,或者进行切换,从而达到协程切换的目的。我们知道程序在CPU上运行的时候,注意依赖2个重要的东西:
- 1.堆栈指针ESP寄存器用来找到堆栈的顶部位置,从而实现去参数的目的。 这个告诉CPU堆栈在哪,也就是数据在哪。
- 2.指令指针EIP寄存器用来存放下一次CPU将要执行的代码段指令的偏移量。这个是CPU将要执行的代码。
程序的运行主要依靠上面2个变量,他们代表了一个线程的上下文环境。改变他们就可以达到执行其他位置的代码的目的。
0.基本原理
这里稍微介绍一下makecontext就可以理解其含义了。代码在sysdeps/unix/sysv/linux/i386/makecontext.S, 但是是汇编代码,在libtask里面提供了一份C语言版本的简化代码,注释如下:
#ifdef NEEDX86MAKECONTEXT void makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...) { int *sp; sp = (int*)ucp->uc_stack.ss_sp+ucp->uc_stack.ss_size/4;//移动堆栈到末尾,然后从后面开始将参数拷贝上去 sp -= argc; sp = (void*)((uintptr_t)sp - (uintptr_t)sp%16); /* 16-align for OS X */ memmove(sp, &argc+1, argc*sizeof(int)); *--sp = 0; /* return address */ ucp->uc_mcontext.mc_eip = (long)func;//设置指令指针寄存器的位置为参数的func ucp->uc_mcontext.mc_esp = (int)sp;//堆栈寄存器。 } #endif
上面需要注意的是堆栈是向下生长的,makecontext需要用户提供堆栈空间,这个我们自己按实际需求设置就行,只要不会造成堆栈溢出就可以了。设置的堆栈空间会在makecontext里面进行初始化,比如设置参数,返回值等。进行swapcontext的时候就会用到mc_eip, mc_esp等。
1.使用libtask协程
下面看一下官方的httpload.c的协程例子,实现HTTP下载或者说压力测试吧。
#include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <task.h> #include <stdlib.h> enum{ STACK = 32768 }; char *server; char *url; void fetchtask(void*); void taskmain(int argc, char **argv){ int i, n; if(argc != 4){ fprintf(stderr, "usage: httpload n server url\n"); taskexitall(1); } n = atoi(argv[1]); server = argv[2]; url = argv[3]; for(i=0; i<n; i++){ taskcreate(fetchtask, 0, STACK);//创建协程,设置为可以运行的状态. //其实fetchtask不是上下文切换的第一个函数,taskstart才是,后者立即调用fetchtask while(taskyield() > 1)//主动释放CPU,这里循环其实是为了给其他协程足够的机会 ; sleep(1); } } void fetchtask(void *v) { int fd, n; char buf[512]; fprintf(stderr, "starting...\n"); for(;;){ if((fd = netdial(TCP, server, 80)) < 0){//异步连接服务器,会造成协程切换 fprintf(stderr, "dial %s: %s (%s)\n", server, strerror(errno), taskgetstate()); continue; } snprintf(buf, sizeof buf, "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n", url, server); fdwrite(fd, buf, strlen(buf)); while((n = fdread(fd, buf, sizeof buf)) > 0){ //buf[n] = '\0'; //printf("buf:%s", buf); } close(fd); write(1, ".", 1); } }
从上面可以看出,使用libtask库后,程序的开始函数变为了taskmain, 原先的main被接管了。调用taskcreate创建协程,执行函数设置为fetchtask, 堆栈大小32K,其工作就是建立几个协程,然后发送HTTP请求。具体不多介绍啦,待会围绕这个例子来介绍下其原理。
2. 创建协程taskcreate
协程创建方法为 taskcreate(fetchtask, 0, STACK);,参数为执行函数和参数,还有堆栈大小。其为协程分配了Task数据结构,并且仅限初始化,将协程设置为可运行状态,这样就可以执行了。
Task结构比较简单,一些链表指针,加上一个ucontext_t 的成员,堆栈指针地址,大小,以及执行函数和参数,如下:
struct Task { char name[256]; // offset known to acid char state[256]; Task *next;//协程的双向链表 Task *prev; Task *allnext; Task *allprev; Context context;//实际上就是ucontext_t的包装,后者就是系统里面的ucontext结构 uvlong alarmtime; uint id; uchar *stk;//堆栈起始位置,使用的时候是按向下使用的方式的 uint stksize;//堆栈长度 int exiting; int alltaskslot;//alltask[]的当前占用最高槽位数,总数能到alltaskslot+ 64 - alltaskslot%64 int system; int ready;//为1代表在运行状态 void (*startfn)(void*); void *startarg; void *udata; }; int taskcreate(void (*fn)(void*), void *arg, uint stack) { int id; Task *t; t = taskalloc(fn, arg, stack);//初始化申请Task数据结构,设置堆栈内容等 taskcount++;//当前OK的协程数目,不包括系统级别的协程 id = t->id; if(nalltask%64 == 0){//一次申请64个槽位,只能往后放 alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0])); if(alltask == nil){ fprint(2, "out of memory\n"); abort(); } } t->alltaskslot = nalltask; alltask[nalltask++] = t; taskready(t);//加入taskrunqueue的运行队列 return id; }
其中taskalloc函数进行了内存分配,数据初始化,主要通过makecontext设置了协程的上下文结构context,将协程的初始化运行函数设置为taskstart,后者会直接调用应用层的执行函数的。详见代码注释:
static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) {//申请Task结构,初始化信号,堆栈,运行函数等,调用makecontext模拟堆栈内容,设置堆栈指针esp位置灯 Task *t; sigset_t zero; uint x, y; ulong z; /* allocate the task and stack together */ t = malloc(sizeof *t+stack);//存放Task结构和后面的堆栈 if(t == nil){ fprint(2, "taskalloc malloc: %r\n"); abort(); } memset(t, 0, sizeof *t); t->stk = (uchar*)(t+1);//注意这里加1,实际上加的是一个Task结构,也就是移动了sizeof(Task). t->stksize = stack; t->id = ++taskidgen;//就是个全局静态变量自增的id t->startfn = fn; t->startarg = arg; /* do a reasonable initialization */ memset(&t->context.uc, 0, sizeof t->context.uc); sigemptyset(&zero); sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);//这里获取一下现在的信号屏蔽字到uc_sigmask存起来,切换协程的时候会设 置的 /* must initialize with current context */ if(getcontext(&t->context.uc) < 0){//初始化获取一下当前上下文状态,下面在修改堆栈,指令等地址 fprint(2, "getcontext: %r\n"); abort(); } /* call makecontext to do the real work. */ /* leave a few words open on both ends */ t->context.uc.uc_stack.ss_sp = t->stk+8;//堆栈起始位置,使用的时候会从结束位置开始向下使用的 t->context.uc.uc_stack.ss_size = t->stksize-64; /* * All this magic is because you have to pass makecontext a * function that takes some number of word-sized variables, * and on 64-bit machines pointers are bigger than words. */ //print("make %p\n", t); z = (ulong)t; y = z; z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */ x = z>>16; makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x); //由于makecontext需要参数必须是4个字节的,所以这里讲t的指针转为64位8个字节,然后分y,x 2个四字节传参数 //以备makecontext能够模拟设置堆栈传参,并设置sp等堆栈指针位置 return t; }
协程创建后,所有的协程都会放到alltask数组中,每隔64个realloc扩容一下, 其中可以被调度运行的协程放在taskrunqueue双向链表里面。新协程放在链表尾部。
3.协程的调度/运行
协程创建后,是怎么调度运行的呢?这里需要看一下真正的Main函数了,libtask的main接管函数首先自己调用taskcreate 创建了taskmainstart协程,其实就是我们应用层编写的taskmain函数了。
然后调用taskscheduler进行协程调度。
static void taskmainstart(void *v) { taskname("taskmain"); taskmain(taskargc, taskargv); } int main(int argc, char **argv) { struct sigaction sa, osa; memset(&sa, 0, sizeof sa); sa.sa_handler = taskinfo; sa.sa_flags = SA_RESTART; sigaction(SIGQUIT, &sa, &osa); #ifdef SIGINFO sigaction(SIGINFO, &sa, &osa); #endif argv0 = argv[0]; taskargc = argc; taskargv = argv; if(mainstacksize == 0) mainstacksize = 256*1024; taskcreate(taskmainstart, nil, mainstacksize);//创建初始协程,调用应用层的taskmain函数,此时taskmainstart协程还没有运行,等待 调度 taskscheduler();//进行协程调度 fprint(2, "taskscheduler returned in main!\n"); abort(); return 0; }
下面重点在taskscheduler,所有要进行协程切换的地方,都会讲协程切换到这个上下文:taskschedcontext, 实际上就是我们的主线程所在的协程上下文。下面代码比较简单,重在理解:
static void taskscheduler(void) {//协程调度函数 int i; Task *t; taskdebug("scheduler enter"); for(;;){ if(taskcount == 0) exit(taskexitval); t = taskrunqueue.head;//从头部开始唤起 if(t == nil){ fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount); exit(1); } deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行 t->ready = 0; taskrunning = t;//标记正在执行的协程 tasknswitch++; taskdebug("run %d (%s)", t->id, t->name); contextswitch(&taskschedcontext, &t->context);//真正进行上下文切换,这样就切换到了其他协程运行,比如taskmainstart //print("back in scheduler\n"); taskrunning = nil; if(t->exiting){ if(!t->system) taskcount--; i = t->alltaskslot; alltask[i] = alltask[--nalltask]; alltask[i]->alltaskslot = i; free(t); } } }
上面contextswitch进行了实际的上下文切换,比如切换到了taskmainstart 协程,这样当前的上下文执行环境会保存到taskschedcontext里面, contextswitch函数不会返回,直到再次有主动切换到这里为止。
那么,什么时候能够返回来呢?答案是:对方主动放弃CPU的时候,比如最开头的taskyield函数,taskyield函数会将当前运行的taskrunning协程设置为可运行状态,挂到taskrunqueue后面, 然后将其切换到taskschedcontext运行,也就是上面的taskscheduler运行的协程,实际上是taskscheduler里面的contextswitch函数里面,随后contextswitch将返回到taskscheduler里面继续进行调度运行。
void taskswitch(void) {//主动进行协程切换,其实就是切换到main协程,到那里去进行实际的切换 needstack(0);//检测堆栈溢出 contextswitch(&taskrunning->context, &taskschedcontext); } void taskready(Task *t) { t->ready = 1;//状态设置为可以运行的状态,然后加到运行 任务队列末尾里面去 addtask(&taskrunqueue, t); } int taskyield(void) { int n; n = tasknswitch;//用来计算自愿放弃协程后,到恢复所发生的切换次数 taskready(taskrunning);//挂到taskrunqueue后面 taskstate("yield"); taskswitch(); return tasknswitch - n - 1; }
介绍的差不多了,这里也许能想到,如果某个协程不小心执行了一行sleep(1000)的代码,那么会真的整个进程就sleep进去的,因为这里没有任何机会进行协程调度,所以在协程程序里面,需要换为taskdelay进行睡眠,其里面的实现下次写一下。
到这里差不多了,其实只要理解了协程的运行上下文概念,就差不多了。后面有时间再稍微记录一下libtask里面对于异步IO的代码。
协程主要适合于一些IO比较频繁的系统,在这样的系统中,使用协程跟多线程的优缺点比较如下:
- 1. 单线程异步IO: 代码难度比较大,需要自己处理异步I/O, epoll等, 优点是性能高,代码执行是顺序的,不需要关心锁,竞争等情况;
- 2.协程: 比单线程异步I/O容易编程,代码更好写,协程里面是顺序编程的,但协程之间是独立栈,共享堆内存,单线程执行环境,在一个CPU上运行。协程切换代价比线程少多了,只需要十几条汇编指令切换寄存器。每秒据说能达到上百万次切换。
- 3.多线程同步I/O: 代码相对也好写,跟协程一样独立栈,共享堆内存。 需要处理同步禁止问题,但线程切换代价特别大,linux里面没有原生的线程,是用进程实现的。
- 4.多进程: 这个得看具体应用了,需要共享数据的应用不适合用多进程,否则得共享数据。但代码相对容易编写;
这里只是简单介绍下协程的实现方式之一,具体使用哪一种看具体应用,灵活使用即可。
总之在使用libtask的时候,需要随时心里想着我这code是在一个协程里面运行的,数据都是共享的,什么时候放弃CPU,什么时候可能会发生切换,码农们需要知晓,代码且写且珍惜,不然容易出错造就悲剧,好东西总会有代价的。
@克莱尔孙
谢谢提醒,缺少review环节~
赞,可以弄成golang了,加一个GPM调度了^.^
@sdbc
恩,是的,反正线程也是一个执行上下文。linux里面更是一个进程的实现。
@克莱尔孙
谢谢指正了!确实错别字有点多^.^,打得太快没注意check
ucontext已经deprecated了,能不用还是最好别用
就是说。协程,不一定就基于单线程,完全可以多线程的。
为线程池系统提供更好的并行性。
最近实现了线程池+epoll+ucontext的服务器,可以多个线程+AIO。
contextswitch函数不会返回,知道再次有主动切换到这里为止。
如果taskmain中逻辑执行完了,没有涉及切换的逻辑,也是可以返回得。楼主是不是想表达别的意思。
另外,每篇文章的错别字都比较多,希望能改改 - -