libtask协程库实现源码学习-异步I/O
上篇文章写了libtask协程库实现的基本原理,最后说道协程编程一个很大的关键点是,程序员需要知道什么时候应该进行协程切换,什么地方需要异步I/O,什么地方的代码是顺序运行的等。
这里回顾一下具体哪些地方需要做协程切换,异步I/O: 所有可能会造成阻塞的操作,都必须进行异步IO处理,及时切换协程,绝对不能在协程里面做阻塞操作,因为阻塞了大家都阻塞了,就黄了。
顺藤摸瓜,上次的http压力测试小程序里面,协程执行函数fetchtask就是协程运行的主要函数,看下其实现:
void fetchtask(void *v) { int fd, n; char buf[512]; fprintf(stderr, "starting...\n"); for(;;){ if((fd = netdial(TCP, server, 80)) < 0){//异步连接服务器,会造成协程切换 fprintf(stderr, "dial %s: %s (%s)\n", server, strerror(errno), taskgetstate()); continue; } snprintf(buf, sizeof buf, "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n", url, server); fdwrite(fd, buf, strlen(buf));//异步数据读写,这里可能会造成协程切换,因为一定有阻塞操作 while((n = fdread(fd, buf, sizeof buf)) > 0){///异步读取 //buf[n] = '\0'; //printf("buf:%s", buf); } close(fd); write(1, ".", 1); } }
上面的netdial用来连接服务器,其实就是socket, connect等函数的封装,需要注意的是其调用了fcntl(fd, F_SETFL, fcntl(fd, F_GETFL)|O_NONBLOCK);函数,将这个SOCK设置为非阻塞状态,所以才能异步IO了。
connect调用之后,实际上SOCK不一定连接成功了,所以就需要进行事件监听,看看是否真的连接成功了,代码里调用了fdwait(fd, 'w');
int netdial(int istcp, char *server, int port) {//非 阻塞连接server,connect后会切换协程到IO fdtask //`````` taskstate("netdial"); proto = istcp ? SOCK_STREAM : SOCK_DGRAM; if((fd = socket(AF_INET, proto, 0)) < 0){ taskstate("socket failed"); return -1; } fdnoblock(fd); //1······· if(connect(fd, (struct sockaddr*)&sa, sizeof sa) < 0 && errno != EINPROGRESS){ taskstate("connect failed"); close(fd); return -1; } /* wait for finish */ fdwait(fd, 'w');//等待处理完成,并且会释放CPU的。 //······
下面来看一下fdwait函数的实现, 这个函数类似于epoll_ctl的封装函数,只是libtask这里使用的是poll,而不是epoll。这个函数按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。
void fdwait(int fd, int rw) {//按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。 int bits; if(!startedfdtask){ startedfdtask = 1; taskcreate(fdtask, 0, 32768);//这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等 } if(npollfd >= MAXFD){ fprint(2, "too many poll file descriptors\n"); abort(); } taskstate("fdwait for %s", rw=='r' ? "read" : rw=='w' ? "write" : "error"); bits = 0; switch(rw){ case 'r': bits |= POLLIN; break; case 'w': bits |= POLLOUT; break; } //将这个FD挂入到pollfd里面,这里面是由fdtask协程进行等待唤醒等管理的、 //等这个FD有事件的时候,会将本协程设置为可运行的状态,并且fdtask也会主动yeild让出CPU。 polltask[npollfd] = taskrunning; pollfd[npollfd].fd = fd; pollfd[npollfd].events = bits; pollfd[npollfd].revents = 0; npollfd++; taskswitch();//注意这里并没有修改这个协程的运行状态,这样他下次还可能跑起来,而不是由IO协程唤起,这里是否是bug? ps: 见下面描述。 }
从上面的注释中可以看到,fdwait就是让协程在rw的可写或者可读事件上等待,所谓的等待,其实是协程切换,切换到其他协程运行。这里有点疑问的是taskswitch并没有修改当前协程的状态为不可执行,所以下次它还是可能会被自然唤醒,而不是由I/O协程唤醒的。这里是否会造成重复操作?知道的同学告诉我下。
不过这里多唤醒一次,当前协程也就是再次尝试I/O,基本还是会EAGAIN, 然后又调用fdwait,又睡下去。这样不会有bug,但会浪费CPU?
PS: 后来想了想,这个会的,因为当前协程在taskscheduler里面调度它运行的时候,使用了deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行,因此现在我要再fdwait里面直接调用taskswitch();,那么当前这个协程是不会被加到taskrunqueue链表里面,也就没有机会得到执行。
那么什么时候得到执行呢?答案是:只有当有人主动将其加到taskrunqueue里面,才能执行,这个人就是fdtask I/O监听协程,这是唯一的机会。所以写代码的时候,如果要切换协程,一定得想清楚这一点,别知道怎么切换出去了,不知道什么时候该切换回来就悲剧了。
下面最后一点,来看一下taskcreate 这个后台的I/O事件通知协程的工作。执行函数为fdtask,,代码注释如下:
void fdtask(void *v) { int i, ms; Task *t; uvlong now; tasksystem();//把自己设置为系统级协程,不会taskexit退出 taskname("fdtask"); for(;;){ /* let everyone else run */ while(taskyield() > 0) ; /* we're the only one runnable - poll for i/o */ errno = 0; taskstate("poll"); if((t=sleeping.head) == nil){ ms = -1;//没有人在sleep,所以就poll一直等待了,这个好危险啊, //如果上层不小心yeild了,并且没有dalay的,然后所有fd都没有活跃。那就完蛋了 }else{ /* sleep at most 5s */ now = nsec(); if(now >= t->alarmtime) ms = 0; else if(now+5*1000*1000*1000LL >= t->alarmtime) ms = (t->alarmtime - now)/1000000; else ms = 5000; } if(poll(pollfd, npollfd, ms) < 0){ if(errno == EINTR) continue; fprint(2, "poll: %s\n", strerror(errno)); taskexitall(0); } /* wake up the guys who deserve it */ for(i=0; i<npollfd; i++){ while(i < npollfd && pollfd[i].revents){ taskready(polltask[i]);//将这些有情况的协程设置为可运行状态,这样这个for下一轮的时候就会调用taskyield主动让出CPU --npollfd; pollfd[i] = pollfd[npollfd]; polltask[i] = polltask[npollfd]; } } now = nsec(); while((t=sleeping.head) && now >= t->alarmtime){ deltask(&sleeping, t);//看看定时器有没有到时间的 if(!t->system && --sleepingcounted == 0) taskcount--; taskready(t); } } }
从上面可以看出,这个协程主要是做守护工作,帮助其他协程监听I/O可读可写事件,然后唤起对方;另外还有一个工作就是,处理sleep操作,在这里面叫delay,应用调用taskdelay就可以进行睡眠了。
实现用poll,因此写code时要注意,这里性能有问题的,最好改为epoll, 有空我试试吧。
上面函数唤起的方式实际上为:对有事件的协程调用taskready(polltask[i]);,设置可运行状态,然后主动调用while(taskyield() > 0) ;让其他协程得到运行机会,也就是从fdwait的taskswitch里面重新得到执行机会。
下面看下读写的时候的封装函数:
int fdwrite(int fd, void *buf, int n) { int m, tot; for(tot=0; tot<n; tot+=m){ while((m=write(fd, (char*)buf+tot, n-tot)) < 0 && errno == EAGAIN) fdwait(fd, 'w');//关键:如果写入时返回EAGAIN说明差不多了,得过会才能写入。那么这里需要放入epoll,把本协程挂起 if(m < 0) return m; if(m == 0) break; } return tot; }
由此可以想到,实际上做I/O请求的时候,是类似epoll里面的不断读、写,直到eagain了,就放入poll里面等待事件。不同的是这里是放到fdtask专门的协程里面去等待的,而不是当前的I/O协程,这样可以换到其他协程执行CPU。
"没有人在sleep,所以就poll一直等待了,这个好危险啊,如果上层不小心yeild了,并且没有dalay的,然后所有fd都没有活跃。那就完蛋了"
这个是不会有问题的。taskyield只是暂时让出cpu,这时当前task会被加到taskrunqueue的最后。fdtask中,会优先执行完所有taskrunqueue的task后才进行后续的poll+timeout处理