修改libtask支持epoll处理大量并发连接
原生的libtask库不支持epoll, 这样无法处理大量并发协程的情况,所以小改了下,让它支持epoll,允许大量并发协程运行。代码在这里libtask_epoll。
修改方法比较简单,就是把epoll_create, epoll_ctl, epoll_wait 几个函数的使用替换掉原来poll相关的函数,diff在这里。先是epoll_create, 这个在fdwait等待的时候,会判断startedfdtask是不是第一次运行fdwait()函数,如果是第一次,就需要进行相应的初始化。这里增加一个新函数prepare_fdtask, 用来完成epoll_create初始化和之前的taskcreate()创建IO等待poll协程,后者将会变为epoll协程。代码如下:
void prepare_fdtask(){
g_epollfd = epoll_create(2);//Since Linux 2.6.8, the size argument is ignored, but must be greater 0
if(g_epollfd < 0){
printf("epoll_create failed. errno:%d, errmsg:%s.\n", errno, strerror(errno));
exit(errno);
}
taskcreate(fdtask, 0, 32768);//这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等
}
然后是epoll_ctl。也就是如何处理新加,减少的epoll事件,实际上就是处理fdread等新加的可读,可写事件。这里为了最小改动之前的代码,但又要为每个fd记录一下其当前挂载的是可读还是可写抑或两者都有,一般得增加一个大数组,每个fd一个院所,于是采用了一个投机取巧的方式: 一个fd未64位,明显高位部分我们是不可能用到的,所以就将其最高位用来记录可读事件, 次高位记录可写事件。然后在真正epoll_ctl的时候,清除这些高位,从而取出真正的fd, 平时存储的都是带有高位标志的fd。
这样代码就带有不少位操作,具体如下:
void
fdwait(int *fd, int rw)
{//按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。
int addedmask = 0;
int oldmask = 0;
struct epoll_event ee;
if(!startedfdtask){
startedfdtask = 1;
prepare_fdtask();
}
taskstate("fdwait for %s", rw=='r' ? "read" : rw=='w' ? "write" : "error");
oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ;//最高位用来表示我已经epoll_ctl注册了可读事件
oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ;//次高位用来记录是否注册了可写事件
addedmask = 0;
switch(rw){
case 'r':
addedmask = EPOLLIN ;
break;
case 'w':
addedmask = EPOLLOUT ;
break;
}
ee.data.u64 = 0; /* avoid valgrind warning */
//将这个FD挂入到epoll里面,这里面是由fdtask协程进行等待唤醒等管理的、
//等这个FD有事件的时候,会将本协程设置为可运行的状态,并且fdtask也会主动yeild让出CPU。
if( (addedmask | oldmask) != oldmask ){//add it if need
ee.events = oldmask|addedmask ;
ee.data.ptr = taskrunning;
int op = oldmask == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd , &ee) == -1){
printf("epoll_ctl pre failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate());
exit(errno);
}
}
taskswitch();//注意这里并没有修改这个协程的运行状态,这样他下次还可能跑起来
if( (addedmask | oldmask) != oldmask ){ //说明刚才我增加过,那么这里需要从当前状态中,去掉刚刚加入的。 这里如果另外的协程加入了新的事件,就会出现这种情况.
//最好是代码确认读取完成后,显示删除
oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ;
oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ;
ee.events = oldmask & (~ addedmask ) ;
//int op = oldmask == addedmask ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ;
int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ;
if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd, &ee) == -1){
printf("epoll_ctl post failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate());
exit(errno);
}
if( addedmask == EPOLLIN) *fd = 0x7FFFFFFF&*fd ;
if( addedmask == EPOLLOUT) *fd = 0xBFFFFFFF&*fd ;
}
/*
不过这里多唤醒一次,当前协程也就是再次尝试I/O,基本还是会EAGAIN, 然后又调用fdwait,又睡下去。这样不会有bug,但会浪费CPU?
PS: 后来想了想,这个会的,因为当前协程在taskscheduler里面调度它运行的时候,使用了deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行,因此现在我要再fdwait里面直接调用taskswitch();,那
么当前这个协程是不会被加到taskrunqueue链表里面,也就没有机会得到执行。
那么什么时候得到执行呢?答案是:只有当有人主动将其加到taskrunqueue里面,才能执行,这个人就是fdtask I/O监听协程,这是唯一的机会。所以写代码的时候,如果要切换协程,一定得想清楚这一点,别知道怎么
切换出去了,不知道什么时候该切换回来就悲剧了。
*/
}
fdwait搞定了注册可读可写事件了,于是当前协程就会进入等待,也就是taskswitch()的函数了。然后就会把控制权交给I/O协程去处理了,后者专门做epoll_wait操作,用来监听刚才注册的这些fd是否有新事件,如果有,就进行上下文切换到目标协程的上下文中,让其继续运行,也就是刚才fdwait睡着的那个协程。这样就走通了。代码如下。
fdtask(void *v)
{
//。。。
tasksystem();//把自己设置为系统级协程,不会taskexit退出
taskname("fdtask");
for(;;){
/* let everyone else run */
while(taskyield() > 0)
;
/* we're the only one runnable - poll for i/o */
errno = 0;
taskstate("poll");
if((t=sleeping.head) == nil){
ms = -1;//没有人在sleep,所以就poll一直等待了,这个好危险啊,
//如果上层不小心yeild了,并且没有dalay的,然后所有fd都没有活跃。那就完蛋了
}else{
/* sleep at most 5s */
now = nsec();
if(now >= t->alarmtime)
ms = 0;
else if(now+5*1000*1000*1000LL >= t->alarmtime)
ms = (t->alarmtime - now)/1000000;
else
ms = 5000;
}
int retval = epoll_wait( g_epollfd, epoll_recv_events, MAXFD, ms) ;
if( retval >= 0){
for( i=0; i < retval; i++){
taskready( epoll_recv_events[i].data.ptr) ;//变为可执行状态
}
} else if( retval == EINTR){
continue ;
} else if (retval < 0){
fprint(2, "epoll: %s\n", strerror(errno));
taskexitall(0);
}
//。。。。。
}
}
增加了epoll_recv_events用来存储系统返回的事件列表。g_epollfd就是最开始epoll_create返回的epoll句柄。 拿到有新事件通知的协程后,调用taskready切换状态为可运行状态,并且挂载到taskrunqueue运行队列上面。
到这里基本就可以了,余下还有些细节的地方不多说,代码在这里:libtask_epoll。

近期评论