修改libtask支持epoll处理大量并发连接
原生的libtask库不支持epoll, 这样无法处理大量并发协程的情况,所以小改了下,让它支持epoll,允许大量并发协程运行。代码在这里libtask_epoll。
修改方法比较简单,就是把epoll_create, epoll_ctl, epoll_wait 几个函数的使用替换掉原来poll相关的函数,diff在这里。先是epoll_create, 这个在fdwait等待的时候,会判断startedfdtask是不是第一次运行fdwait()函数,如果是第一次,就需要进行相应的初始化。这里增加一个新函数prepare_fdtask, 用来完成epoll_create初始化和之前的taskcreate()创建IO等待poll协程,后者将会变为epoll协程。代码如下:
1 | void prepare_fdtask(){ |
2 |
3 | g_epollfd = epoll_create(2); //Since Linux 2.6.8, the size argument is ignored, but must be greater 0 |
4 | if (g_epollfd < 0){ |
5 | printf ( "epoll_create failed. errno:%d, errmsg:%s.\n" , errno , strerror ( errno )); |
6 | exit ( errno ); |
7 | } |
8 | taskcreate(fdtask, 0, 32768); //这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等 |
9 | } |
然后是epoll_ctl。也就是如何处理新加,减少的epoll事件,实际上就是处理fdread等新加的可读,可写事件。这里为了最小改动之前的代码,但又要为每个fd记录一下其当前挂载的是可读还是可写抑或两者都有,一般得增加一个大数组,每个fd一个院所,于是采用了一个投机取巧的方式: 一个fd未64位,明显高位部分我们是不可能用到的,所以就将其最高位用来记录可读事件, 次高位记录可写事件。然后在真正epoll_ctl的时候,清除这些高位,从而取出真正的fd, 平时存储的都是带有高位标志的fd。
这样代码就带有不少位操作,具体如下:
1 | void |
2 | fdwait( int *fd, int rw) |
3 | { //按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。 |
4 | int addedmask = 0; |
5 | int oldmask = 0; |
6 | struct epoll_event ee; |
7 |
8 | if (!startedfdtask){ |
9 | startedfdtask = 1; |
10 | prepare_fdtask(); |
11 | } |
12 | taskstate( "fdwait for %s" , rw== 'r' ? "read" : rw== 'w' ? "write" : "error" ); |
13 |
14 | oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ; //最高位用来表示我已经epoll_ctl注册了可读事件 |
15 | oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ; //次高位用来记录是否注册了可写事件 |
16 | addedmask = 0; |
17 | switch (rw){ |
18 | case 'r' : |
19 | addedmask = EPOLLIN ; |
20 | break ; |
21 | case 'w' : |
22 | addedmask = EPOLLOUT ; |
23 | break ; |
24 | } |
25 |
26 | ee.data.u64 = 0; /* avoid valgrind warning */ |
27 | //将这个FD挂入到epoll里面,这里面是由fdtask协程进行等待唤醒等管理的、 |
28 | //等这个FD有事件的时候,会将本协程设置为可运行的状态,并且fdtask也会主动yeild让出CPU。 |
29 | if ( (addedmask | oldmask) != oldmask ){ //add it if need |
30 | ee.events = oldmask|addedmask ; |
31 | ee.data.ptr = taskrunning; |
32 | int op = oldmask == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; |
33 | if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd , &ee) == -1){ |
34 | printf ( "epoll_ctl pre failed. errno:%d, errmsg:%s, state(%s)\n" , errno , strerror ( errno ), taskgetstate()); |
35 | exit ( errno ); |
36 | } |
37 | } |
38 | taskswitch(); //注意这里并没有修改这个协程的运行状态,这样他下次还可能跑起来 |
39 |
40 | if ( (addedmask | oldmask) != oldmask ){ //说明刚才我增加过,那么这里需要从当前状态中,去掉刚刚加入的。 这里如果另外的协程加入了新的事件,就会出现这种情况. |
41 | //最好是代码确认读取完成后,显示删除 |
42 | oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ; |
43 | oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ; |
44 | ee.events = oldmask & (~ addedmask ) ; |
45 | //int op = oldmask == addedmask ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ; |
46 | int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ; |
47 | if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd, &ee) == -1){ |
48 | printf ( "epoll_ctl post failed. errno:%d, errmsg:%s, state(%s)\n" , errno , strerror ( errno ), taskgetstate()); |
49 | exit ( errno ); |
50 | } |
51 | if ( addedmask == EPOLLIN) *fd = 0x7FFFFFFF&*fd ; |
52 | if ( addedmask == EPOLLOUT) *fd = 0xBFFFFFFF&*fd ; |
53 | } |
54 | /* |
55 | 不过这里多唤醒一次,当前协程也就是再次尝试I/O,基本还是会EAGAIN, 然后又调用fdwait,又睡下去。这样不会有bug,但会浪费CPU? |
56 |
57 | PS: 后来想了想,这个会的,因为当前协程在taskscheduler里面调度它运行的时候,使用了deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行,因此现在我要再fdwait里面直接调用taskswitch();,那 |
58 | 么当前这个协程是不会被加到taskrunqueue链表里面,也就没有机会得到执行。 |
59 |
60 | 那么什么时候得到执行呢?答案是:只有当有人主动将其加到taskrunqueue里面,才能执行,这个人就是fdtask I/O监听协程,这是唯一的机会。所以写代码的时候,如果要切换协程,一定得想清楚这一点,别知道怎么 |
61 | 切换出去了,不知道什么时候该切换回来就悲剧了。 |
62 | */ |
63 | } |
fdwait搞定了注册可读可写事件了,于是当前协程就会进入等待,也就是taskswitch()的函数了。然后就会把控制权交给I/O协程去处理了,后者专门做epoll_wait操作,用来监听刚才注册的这些fd是否有新事件,如果有,就进行上下文切换到目标协程的上下文中,让其继续运行,也就是刚才fdwait睡着的那个协程。这样就走通了。代码如下。
1 | fdtask( void *v) |
2 | { |
3 | //。。。 |
4 | tasksystem(); //把自己设置为系统级协程,不会taskexit退出 |
5 | taskname( "fdtask" ); |
6 | for (;;){ |
7 | /* let everyone else run */ |
8 | while (taskyield() > 0) |
9 | ; |
10 | /* we're the only one runnable - poll for i/o */ |
11 | errno = 0; |
12 | taskstate( "poll" ); |
13 | if ((t=sleeping.head) == nil){ |
14 | ms = -1; //没有人在sleep,所以就poll一直等待了,这个好危险啊, |
15 | //如果上层不小心yeild了,并且没有dalay的,然后所有fd都没有活跃。那就完蛋了 |
16 | } else { |
17 | /* sleep at most 5s */ |
18 | now = nsec(); |
19 | if (now >= t->alarmtime) |
20 | ms = 0; |
21 | else if (now+5*1000*1000*1000LL >= t->alarmtime) |
22 | ms = (t->alarmtime - now)/1000000; |
23 | else |
24 | ms = 5000; |
25 | } |
26 | int retval = epoll_wait( g_epollfd, epoll_recv_events, MAXFD, ms) ; |
27 | if ( retval >= 0){ |
28 | for ( i=0; i < retval; i++){ |
29 | taskready( epoll_recv_events[i].data.ptr) ; //变为可执行状态 |
30 | } |
31 | } else if ( retval == EINTR){ |
32 | continue ; |
33 | } else if (retval < 0){ |
34 | fprint(2, "epoll: %s\n" , strerror ( errno )); |
35 | taskexitall(0); |
36 | } |
37 | //。。。。。 |
38 | } |
39 | } |
增加了epoll_recv_events用来存储系统返回的事件列表。g_epollfd就是最开始epoll_create返回的epoll句柄。 拿到有新事件通知的协程后,调用taskready切换状态为可运行状态,并且挂载到taskrunqueue运行队列上面。
到这里基本就可以了,余下还有些细节的地方不多说,代码在这里:libtask_epoll。
近期评论