修改libtask支持epoll处理大量并发连接
原生的libtask库不支持epoll, 这样无法处理大量并发协程的情况,所以小改了下,让它支持epoll,允许大量并发协程运行。代码在这里libtask_epoll。
修改方法比较简单,就是把epoll_create, epoll_ctl, epoll_wait 几个函数的使用替换掉原来poll相关的函数,diff在这里。先是epoll_create, 这个在fdwait等待的时候,会判断startedfdtask是不是第一次运行fdwait()函数,如果是第一次,就需要进行相应的初始化。这里增加一个新函数prepare_fdtask, 用来完成epoll_create初始化和之前的taskcreate()创建IO等待poll协程,后者将会变为epoll协程。代码如下:
void prepare_fdtask(){ g_epollfd = epoll_create(2);//Since Linux 2.6.8, the size argument is ignored, but must be greater 0 if(g_epollfd < 0){ printf("epoll_create failed. errno:%d, errmsg:%s.\n", errno, strerror(errno)); exit(errno); } taskcreate(fdtask, 0, 32768);//这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等 }
然后是epoll_ctl。也就是如何处理新加,减少的epoll事件,实际上就是处理fdread等新加的可读,可写事件。这里为了最小改动之前的代码,但又要为每个fd记录一下其当前挂载的是可读还是可写抑或两者都有,一般得增加一个大数组,每个fd一个院所,于是采用了一个投机取巧的方式: 一个fd未64位,明显高位部分我们是不可能用到的,所以就将其最高位用来记录可读事件, 次高位记录可写事件。然后在真正epoll_ctl的时候,清除这些高位,从而取出真正的fd, 平时存储的都是带有高位标志的fd。
这样代码就带有不少位操作,具体如下:
void fdwait(int *fd, int rw) {//按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。 int addedmask = 0; int oldmask = 0; struct epoll_event ee; if(!startedfdtask){ startedfdtask = 1; prepare_fdtask(); } taskstate("fdwait for %s", rw=='r' ? "read" : rw=='w' ? "write" : "error"); oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ;//最高位用来表示我已经epoll_ctl注册了可读事件 oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ;//次高位用来记录是否注册了可写事件 addedmask = 0; switch(rw){ case 'r': addedmask = EPOLLIN ; break; case 'w': addedmask = EPOLLOUT ; break; } ee.data.u64 = 0; /* avoid valgrind warning */ //将这个FD挂入到epoll里面,这里面是由fdtask协程进行等待唤醒等管理的、 //等这个FD有事件的时候,会将本协程设置为可运行的状态,并且fdtask也会主动yeild让出CPU。 if( (addedmask | oldmask) != oldmask ){//add it if need ee.events = oldmask|addedmask ; ee.data.ptr = taskrunning; int op = oldmask == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd , &ee) == -1){ printf("epoll_ctl pre failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate()); exit(errno); } } taskswitch();//注意这里并没有修改这个协程的运行状态,这样他下次还可能跑起来 if( (addedmask | oldmask) != oldmask ){ //说明刚才我增加过,那么这里需要从当前状态中,去掉刚刚加入的。 这里如果另外的协程加入了新的事件,就会出现这种情况. //最好是代码确认读取完成后,显示删除 oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ; oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ; ee.events = oldmask & (~ addedmask ) ; //int op = oldmask == addedmask ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ; int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ; if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd, &ee) == -1){ printf("epoll_ctl post failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate()); exit(errno); } if( addedmask == EPOLLIN) *fd = 0x7FFFFFFF&*fd ; if( addedmask == EPOLLOUT) *fd = 0xBFFFFFFF&*fd ; } /* 不过这里多唤醒一次,当前协程也就是再次尝试I/O,基本还是会EAGAIN, 然后又调用fdwait,又睡下去。这样不会有bug,但会浪费CPU? PS: 后来想了想,这个会的,因为当前协程在taskscheduler里面调度它运行的时候,使用了deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行,因此现在我要再fdwait里面直接调用taskswitch();,那 么当前这个协程是不会被加到taskrunqueue链表里面,也就没有机会得到执行。 那么什么时候得到执行呢?答案是:只有当有人主动将其加到taskrunqueue里面,才能执行,这个人就是fdtask I/O监听协程,这是唯一的机会。所以写代码的时候,如果要切换协程,一定得想清楚这一点,别知道怎么 切换出去了,不知道什么时候该切换回来就悲剧了。 */ }
fdwait搞定了注册可读可写事件了,于是当前协程就会进入等待,也就是taskswitch()的函数了。然后就会把控制权交给I/O协程去处理了,后者专门做epoll_wait操作,用来监听刚才注册的这些fd是否有新事件,如果有,就进行上下文切换到目标协程的上下文中,让其继续运行,也就是刚才fdwait睡着的那个协程。这样就走通了。代码如下。
fdtask(void *v) { //。。。 tasksystem();//把自己设置为系统级协程,不会taskexit退出 taskname("fdtask"); for(;;){ /* let everyone else run */ while(taskyield() > 0) ; /* we're the only one runnable - poll for i/o */ errno = 0; taskstate("poll"); if((t=sleeping.head) == nil){ ms = -1;//没有人在sleep,所以就poll一直等待了,这个好危险啊, //如果上层不小心yeild了,并且没有dalay的,然后所有fd都没有活跃。那就完蛋了 }else{ /* sleep at most 5s */ now = nsec(); if(now >= t->alarmtime) ms = 0; else if(now+5*1000*1000*1000LL >= t->alarmtime) ms = (t->alarmtime - now)/1000000; else ms = 5000; } int retval = epoll_wait( g_epollfd, epoll_recv_events, MAXFD, ms) ; if( retval >= 0){ for( i=0; i < retval; i++){ taskready( epoll_recv_events[i].data.ptr) ;//变为可执行状态 } } else if( retval == EINTR){ continue ; } else if (retval < 0){ fprint(2, "epoll: %s\n", strerror(errno)); taskexitall(0); } //。。。。。 } }
增加了epoll_recv_events用来存储系统返回的事件列表。g_epollfd就是最开始epoll_create返回的epoll句柄。 拿到有新事件通知的协程后,调用taskready切换状态为可运行状态,并且挂载到taskrunqueue运行队列上面。
到这里基本就可以了,余下还有些细节的地方不多说,代码在这里:libtask_epoll。
近期评论