首页 > C/C++, TCP/IP, UNIX/LINUX, 协程 > 修改libtask支持epoll处理大量并发连接

修改libtask支持epoll处理大量并发连接

2014年10月10日 发表评论 阅读评论 14274次阅读    

原生的libtask库不支持epoll, 这样无法处理大量并发协程的情况,所以小改了下,让它支持epoll,允许大量并发协程运行。代码在这里libtask_epoll

修改方法比较简单,就是把epoll_create, epoll_ctl, epoll_wait 几个函数的使用替换掉原来poll相关的函数,diff在这里先是epoll_create, 这个在fdwait等待的时候,会判断startedfdtask是不是第一次运行fdwait()函数,如果是第一次,就需要进行相应的初始化。这里增加一个新函数prepare_fdtask, 用来完成epoll_create初始化和之前的taskcreate()创建IO等待poll协程,后者将会变为epoll协程。代码如下:

void prepare_fdtask(){                                                                                                                                                                          

    g_epollfd = epoll_create(2);//Since Linux 2.6.8, the size argument is ignored, but must be greater 0 
    if(g_epollfd < 0){
        printf("epoll_create failed. errno:%d, errmsg:%s.\n", errno, strerror(errno));
        exit(errno);
    }
    taskcreate(fdtask, 0, 32768);//这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等
}

然后是epoll_ctl。也就是如何处理新加,减少的epoll事件,实际上就是处理fdread等新加的可读,可写事件。这里为了最小改动之前的代码,但又要为每个fd记录一下其当前挂载的是可读还是可写抑或两者都有,一般得增加一个大数组,每个fd一个院所,于是采用了一个投机取巧的方式: 一个fd未64位,明显高位部分我们是不可能用到的,所以就将其最高位用来记录可读事件, 次高位记录可写事件。然后在真正epoll_ctl的时候,清除这些高位,从而取出真正的fd, 平时存储的都是带有高位标志的fd。

这样代码就带有不少位操作,具体如下:

void
fdwait(int *fd, int rw)
{//按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。
    int addedmask = 0;
    int oldmask = 0;
    struct epoll_event ee;

    if(!startedfdtask){
        startedfdtask = 1;
        prepare_fdtask();
    }
    taskstate("fdwait for %s", rw=='r' ? "read" : rw=='w' ? "write" : "error");

    oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ;//最高位用来表示我已经epoll_ctl注册了可读事件
    oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ;//次高位用来记录是否注册了可写事件
    addedmask = 0;
    switch(rw){
    case 'r':
        addedmask = EPOLLIN ;
        break;
    case 'w':
        addedmask = EPOLLOUT ;
        break;
    }

    ee.data.u64 = 0; /* avoid valgrind warning */
    //将这个FD挂入到epoll里面,这里面是由fdtask协程进行等待唤醒等管理的、
    //等这个FD有事件的时候,会将本协程设置为可运行的状态,并且fdtask也会主动yeild让出CPU。
    if( (addedmask | oldmask) != oldmask ){//add it if need
        ee.events = oldmask|addedmask ;
        ee.data.ptr = taskrunning;
        int op = oldmask == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
        if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd , &ee) == -1){
            printf("epoll_ctl pre failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate());
            exit(errno);
        }
    }
    taskswitch();//注意这里并没有修改这个协程的运行状态,这样他下次还可能跑起来

    if( (addedmask | oldmask) != oldmask  ){ //说明刚才我增加过,那么这里需要从当前状态中,去掉刚刚加入的。 这里如果另外的协程加入了新的事件,就会出现这种情况.
        //最好是代码确认读取完成后,显示删除
        oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ;
        oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ;
        ee.events = oldmask & (~ addedmask ) ;
        //int op = oldmask == addedmask  ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ;
        int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ;
        if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd, &ee) == -1){
            printf("epoll_ctl post failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate());
            exit(errno);
        }
        if( addedmask == EPOLLIN) *fd = 0x7FFFFFFF&*fd ;
        if( addedmask == EPOLLOUT) *fd = 0xBFFFFFFF&*fd ;
    }
    /*
     不过这里多唤醒一次,当前协程也就是再次尝试I/O,基本还是会EAGAIN, 然后又调用fdwait,又睡下去。这样不会有bug,但会浪费CPU?

PS: 后来想了想,这个会的,因为当前协程在taskscheduler里面调度它运行的时候,使用了deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行,因此现在我要再fdwait里面直接调用taskswitch();,那
么当前这个协程是不会被加到taskrunqueue链表里面,也就没有机会得到执行。

那么什么时候得到执行呢?答案是:只有当有人主动将其加到taskrunqueue里面,才能执行,这个人就是fdtask I/O监听协程,这是唯一的机会。所以写代码的时候,如果要切换协程,一定得想清楚这一点,别知道怎么
切换出去了,不知道什么时候该切换回来就悲剧了。
     */
}

fdwait搞定了注册可读可写事件了,于是当前协程就会进入等待,也就是taskswitch()的函数了。然后就会把控制权交给I/O协程去处理了,后者专门做epoll_wait操作,用来监听刚才注册的这些fd是否有新事件,如果有,就进行上下文切换到目标协程的上下文中,让其继续运行,也就是刚才fdwait睡着的那个协程。这样就走通了。代码如下。

fdtask(void *v)
{
//。。。
    tasksystem();//把自己设置为系统级协程,不会taskexit退出
    taskname("fdtask");
    for(;;){
        /* let everyone else run */
        while(taskyield() > 0)
            ;
        /* we're the only one runnable - poll for i/o */
        errno = 0;
        taskstate("poll");
        if((t=sleeping.head) == nil){
            ms = -1;//没有人在sleep,所以就poll一直等待了,这个好危险啊,
            //如果上层不小心yeild了,并且没有dalay的,然后所有fd都没有活跃。那就完蛋了
        }else{
            /* sleep at most 5s */
            now = nsec();
            if(now >= t->alarmtime)
                ms = 0;
            else if(now+5*1000*1000*1000LL >= t->alarmtime)
                ms = (t->alarmtime - now)/1000000;
            else
                ms = 5000;
        }
        int retval = epoll_wait( g_epollfd, epoll_recv_events, MAXFD, ms) ;
        if( retval >= 0){
            for( i=0; i < retval; i++){
                taskready( epoll_recv_events[i].data.ptr) ;//变为可执行状态
            }
        } else if( retval == EINTR){
            continue ;
        } else if (retval < 0){
            fprint(2, "epoll: %s\n", strerror(errno));
            taskexitall(0);
        }
//。。。。。
    }
}

增加了epoll_recv_events用来存储系统返回的事件列表。g_epollfd就是最开始epoll_create返回的epoll句柄。 拿到有新事件通知的协程后,调用taskready切换状态为可运行状态,并且挂载到taskrunqueue运行队列上面。

到这里基本就可以了,余下还有些细节的地方不多说,代码在这里:libtask_epoll

Share
分类: C/C++, TCP/IP, UNIX/LINUX, 协程 标签:
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。