首页 > C/C++, Redis, TCP/IP > Redis主体事件循环流程介绍

Redis主体事件循环流程介绍

2013年8月25日 发表评论 阅读评论 8897次阅读    

回顾一下之前看的redis代码,脑子内存比较小,一会就忘了,所以备忘一下。
redis主体流程比较简单,init,listen, accept, read, write,基本就是这几步。下面简单介绍一下,当做备忘。

零、从main开始

main函数主体功能比较清晰,下面罗列一下:

  1. initServerConfig纯初始化redisServer server这个巨型结构的成员;
  2. loadServerConfig解析配置文件和命令行参数;
  3. daemonize进入守护进程模式,就fork了一下;
  4. initServer打开listen端口,加入epoll事件监听集合,这个待会详细介绍;
  5. loadDataFromDisk加载读入持久化文件里面的数据库数据;
  6. aeSetBeforeSleepProc设置一个在epoll等待之前调用的函数beforeSleep;
  7. aeMain(server.el)进入for循环不断监听listen,client句柄的事件,并处理之;
  8. 如果要退出,aeDeleteEventLoop删除各个事件,退出系统;

挺顺溜的,没有特别多的旁枝错节。下面具体介绍下listen,client连接的东西;


一、初始化listen端口

listen 端口的打开和初始化是在读取完系统配置之后做的事。initServer完成的工作如下:

  1. 设置各个错误处理信号的处理函数。
  2. createSharedObjects初始化shared全局变量上面的各个字符串成员。
  3. 调整打开文件数目限制。
  4. 初始化epoll_create,初始化fired,events等事件结构。
  5. socket() -> bind() -> listen(),绑定监听端口。
  6. 设置一个每秒触发的定时器,函数为serverCron。
  7. aeCreateFileEvent将监听fd加入epoll.
  8. scriptingInit初始化lua脚本相关的数据。
  9. bioInit初始化2个后端线程,函数为bioProcessBackgroundJobs,进行一些后台任务。

其中打开监听端口代码如下:

void initServer() {
    createSharedObjects();//初始化shared这个大全局结构的成员。基本都是字符串
    adjustOpenFilesLimit();//调整打开文件数目限制。如果可以的话。如果允许会设置为server.maxclients+32。否则以128步长减少直到成功。
    server.el = aeCreateEventLoop(server.maxclients+1024);//调用epoll_create,初始化epoll相关数据结构。如果使用epoll的话。
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);

    if (server.port != 0) {
		//依次调用socket() -> bind() -> listen(),绑定监听端口。从这里开始,客户端连接就可以成功了。
        server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
        if (server.ipfd == ANET_ERR) {
            redisLog(REDIS_WARNING, "Opening port %d: %s", server.port, server.neterr);
            exit(1);
        }
    }
//·····
}

int anetTcpServer(char *err, int port, char *bindaddr)
{//socket() -> bind() -> listen().调用这三个函数,绑定一个端口,监听端口。这样客户端的连接就已经可以进来了。
//为什么不等着load完数据再打开呢?不然上线的时候会瞬间累积很多连接,卡在那的。容易导致backlog满。
    int s;
    struct sockaddr_in sa;

	//socket的包装函数,还设置了SO_REUSEADDR。新建一个sock
    if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)
        return ANET_ERR;

    memset(&sa,0,sizeof(sa));
    sa.sin_family = AF_INET;
    sa.sin_port = htons(port);
    sa.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {
        anetSetError(err, "invalid bind address");
        close(s);
        return ANET_ERR;
    }
	//下面就调用了bind,listen函数,其他的没有了。
    if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)
        return ANET_ERR;
    return s;
}

然后便是将打开的listen端口加入epoll事件监听集中,进行监听,由aeCreateFileEvent完成,用epoll_ctl注册刻度事件。其注册的listen端口可读事件处理函数为acceptTcpHandler,这样在listen端口有新连接的时候会调用acceptTcpHandler,后者在accept这个新连接,然后就可以处理后续跟这个客户端连接相关的事件了。

void initServer() {
//``````
	//增加一个定时器,没毫秒触发。用于做一些定时任务,整理任务等。
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        redisPanic("create time event failed");
        exit(1);
    }
	//将这个sockfd和unix域的监听句柄加入epoll里面。设置其回调。
    if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
		redisPanic("Unrecoverable error creating server.ipfd file event.");
//``````
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,  aeFileProc *proc, void *clientData)
{//将参数fd加入epoll中,并保存相关回调,数据到eventLoop->events[fd]中。到时候可以根据fd取得数据。
//proc 为acceptTcpHandler 或者如果是unix域就是acceptUnixHandler
//客户端连接会设置为readQueryFromClient,用来读取客户端的query.
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];//直接拿fd当下标获取数组。
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
//······
    return AE_OK;
}
//增加一个fd到epoll监听事件里面。如果已经存在,就更改它,否则添加。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

二、事件监听循环

aeMain函数进入事件监听循环,循环终止条件为eventLoop->stop标记为1的时候。也就是aeStop的工作,但是可怜的这个函数在主程序不会被调用,也就是redis不会从这里退出。只有Redis-benchmark.c 里面进行测试的时候才会调用。那么,redis怎么退出的?答案是:shutdownCommand()函数,也就是shutdown命令。千万不要直接kill,否则数据没了就悲剧了。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {//stop ==1 停止服务
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);//调用了beforeSleep函数。

        aeProcessEvents(eventLoop, AE_ALL_EVENTS);//处理各种事件。
    }
}

beforeSleep函数会处理一些挂起的事件,并调用flushAppendOnlyFile将之前的指令的改动,写入到AOF文件中,内容在server.aof_buf里面了。
主循环重点在aeProcessEvents,aeProcessEvents函数首先处理一下链表里面的定时器事件,调用对应的prc,然后调用aeApiPoll等待读写事件,实际上调用的是epoll_wait函数。

int aeProcessEvents(aeEventLoop *eventLoop, int flags){
//·········
		//到这里后,tvp指向一个超时结构,代表epoll能够等待的时间。
		//调用epoll_wait,返回可读/写的事件数目,并且将其设置到fired数组上面,下面进行实际的事件处理。
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
			//一个个扫描eventLoop->fired数组里面的,已经fired的连接句柄,
			//也就是循环处理每个连接的读写事件。aeApiPoll只是返回是否有事件。
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {//处理读事件
                rfired = 1;
				//accept连接为acceptTcpHandler,initServer设置的。
				//设置的读句柄为readQueryFromClient,读取客户端的连接。
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {//处理写事件。
                if (!rfired || fe->wfileProc != fe->rfileProc)//如果没有处理可读事件 或者 读写句柄不相同,那么需要调用其函数。
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
//·········
}

至于具体的事件处理方式,见下面。


三、listen端口可读事件

上面介绍了,initServer打开listen端口后,会注册epoll可读事件,并记录处理函数为fe->rfileProc = acceptTcpHandler;看看这个函数内容,其分2步,第一步用acceptTcpHandler接受这个客户端连接,然第二部初始化这个客户端连接的相关数据,将clientfd加入epoll里面,设置的可读事件处理函数为readQueryFromClient,也就是读取客户端请求体的函数。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
	//aeProcessEvents函数做为rfileProc调用的可读事件结构,是监听端口的可读事件回调。
    int cport, cfd;
    char cip[128];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

	//accept接受一个客户端的连接。返回客户端IP,PORT,以及新fd
    cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
        return;
    }
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
	//将cfd加入epoll设置回调函数为readQueryFromClient,并初始化redisClient
    acceptCommonHandler(cfd,0);
}

accept接受一个客户端的连接。返回客户端IP,PORT,以及新fd 由anetTcpAccept完成,其很简单:

int anetTcpAccept(char *err, int s, char *ip, int *port) {
	//acceptTcpHandler调用这里,用来接受一个客户端的连接。返回客户端IP,PORT,以及新fd
    int fd;
    struct sockaddr_in sa;
    socklen_t salen = sizeof(sa);
	//简单调用accept函数接收一个连接,返回fd
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
        return ANET_ERR;

    if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
    if (port) *port = ntohs(sa.sin_port);
    return fd;
}

第二步初始化clientfd,由acceptCommonHandler完成。

static void acceptCommonHandler(int fd, int flags) {
	//acceptTcpHandler调用accept接收完一个客户端连接后,
	//调用这里将这个连接的相关成员初始化,放入epoll,回调为readQueryFromClient
    redisClient *c;

	//设置客户端连接socket的标志,加入到epoll,设置readQueryFromClient为可读事件回调。
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    /* If maxclient directive is set and this is one client more... close the
     * connection. Note that we create the client instead to check before
     * for this condition, since now the socket is already set in non-blocking
     * mode and we can send an error for free using the Kernel I/O */
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    server.stat_numconnections++;
    c->flags |= flags;
}

listen端口接受一个客户端连接后,设置这个cfd的处理函数为readQueryFromClient,这样在主循环中,epoll_wait返回的句柄中如果这个cfd发送了数据到redis的话,就会返回这个fd,从而就会调用fe->rfileProc(eventLoop,fd,fe->clientData,mask);对于客户端连接,自然就是上面的readQueryFromClient函数。


四、读取客户端命令行数据

readQueryFromClient函数用来读取客户端的请求命令行数据。这部分涉及到redis命令行协议的文本处理,后面进行函数调用等。限于篇幅改天介绍吧。

五、给客户端返回数据

这部分跟读取客户端命令行数据类似,只是处理函数为sendReplyToClient,其是在prepareClientToWrite里面设置的,比如如果要给客户端发送数据了,调用addReplyString即可。后者先注册可写事件,然后将回复数据放到临时缓冲区中,等待fd可写的时候由sendReplyToClient发送数据给客户端。

void addReplyString(redisClient *c, char *s, size_t len) {
	//注册一个可读的epoll句柄,回调为sendReplyToClient函数。
    if (prepareClientToWrite(c) != REDIS_OK) return;
	//尝试把s内存数据拷贝到buf的bufpos后面,如果reply不为空,那么要追加到reply的后面。
    if (_addReplyToBuffer(c,s,len) != REDIS_OK)
        _addReplyStringToList(c,s,len);//放到reply列表的后面,以备后续进行发送。
}
int prepareClientToWrite(redisClient *c) {
//如果可能,给这个链接注册一个可读的epoll句柄,回调为sendReplyToClient函数。
    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
    if (c->fd <= 0) return REDIS_ERR; /* Fake client */
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR)
        return REDIS_ERR;
    return REDIS_OK;
}

关于发送缓冲区,redis优先将数据存储在固定大小的buf中,也就是redisClient::buf[REDIS_REPLY_CHUNK_BYTES];里面,默认大小为16K,必须修改宏重新编译才可以更改。如果有数据没有发送完,c->buf放不下了,就会放到c->reply链表里面,链表每个节点都是内存buf,后来的数据放入最后面。这方面可以看_addReplyToBuffer和_addReplyStringToList两个函数。

int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
	//把s缓存拷贝到buf的bufpos后面。

    size_t available = sizeof(c->buf)-c->bufpos;
    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    if (listLength(c->reply) > 0) return REDIS_ERR;//如果reply上面有数据,那么就必须将这块buf放到reply的后面,而不能插入到这里。

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return REDIS_ERR;

    memcpy(c->buf+c->bufpos,s,len);//把s缓存拷贝到buf的bufpos后面。
    c->bufpos+=len;
    return REDIS_OK;
}
void _addReplyStringToList(redisClient *c, char *s, size_t len) {
	//_addReplyToBuffer如果遇到reply列表不为空,就必须将s的内存数据追加到reply列表的后面。
	//将s加入到c->reply的后面,如果reply有元素,而且最后一个元素足够容纳现在的s字符串,长度len,则追加到后面就行了。
    robj *tail;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

    if (listLength(c->reply) == 0) {//如果c->reply为空,可能有其他地方调用这里的,不是_addReplyToBuffer
        robj *o = createStringObject(s,len);
		//把刚申请的s放到c->reply的后面
        listAddNodeTail(c->reply,o);
        c->reply_bytes += zmalloc_size_sds(o->ptr);
    } else {
        tail = listNodeValue(listLast(c->reply));

        /* Append to this object when possible. */
        if (tail->ptr != NULL && sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
        {//最后一个节点足够容纳这块内存,就将其追加到后面。
            c->reply_bytes -= zmalloc_size_sds(tail->ptr);
			//如果reply最后一个节点被其他地方共享了,就分配一个新的并且修改最后一个节点指针。
            tail = dupLastObjectIfNeeded(c->reply);
            tail->ptr = sdscatlen(tail->ptr,s,len);//可能会重新relloca
            c->reply_bytes += zmalloc_size_sds(tail->ptr);
        } else {//只能老老实实分配一个新的节点,放到reply的后面了。
            robj *o = createStringObject(s,len);

            listAddNodeTail(c->reply,o);
            c->reply_bytes += zmalloc_size_sds(o->ptr);
        }
    }
	//下面判断一下客户端使用的输出buffer是否超过限制,如果是,
	//就设置一个标志REDIS_CLOSE_ASAP,然后打印客户端的所有状态信息。
    asyncCloseClientOnOutputBufferLimitReached(c);
}

sendReplyToClient函数比较简单,其先后将c->buf内存的数据发送给客户端,然后将c->reply链表的数据用write的方式发送出去。都懒得用writev了。

到这里介绍的差不多了,监听数据用acceptTcpHandler,读数据用readQueryFromClient,写数据用sendReplyToClient,简单可依赖,没做太多优化。

Share
分类: C/C++, Redis, TCP/IP 标签: ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。