Redis主体事件循环流程介绍
回顾一下之前看的redis代码,脑子内存比较小,一会就忘了,所以备忘一下。
redis主体流程比较简单,init,listen, accept, read, write,基本就是这几步。下面简单介绍一下,当做备忘。
零、从main开始
main函数主体功能比较清晰,下面罗列一下:
- initServerConfig纯初始化redisServer server这个巨型结构的成员;
- loadServerConfig解析配置文件和命令行参数;
- daemonize进入守护进程模式,就fork了一下;
- initServer打开listen端口,加入epoll事件监听集合,这个待会详细介绍;
- loadDataFromDisk加载读入持久化文件里面的数据库数据;
- aeSetBeforeSleepProc设置一个在epoll等待之前调用的函数beforeSleep;
- aeMain(server.el)进入for循环不断监听listen,client句柄的事件,并处理之;
- 如果要退出,aeDeleteEventLoop删除各个事件,退出系统;
挺顺溜的,没有特别多的旁枝错节。下面具体介绍下listen,client连接的东西;
一、初始化listen端口
listen 端口的打开和初始化是在读取完系统配置之后做的事。initServer完成的工作如下:
- 设置各个错误处理信号的处理函数。
- createSharedObjects初始化shared全局变量上面的各个字符串成员。
- 调整打开文件数目限制。
- 初始化epoll_create,初始化fired,events等事件结构。
- socket() -> bind() -> listen(),绑定监听端口。
- 设置一个每秒触发的定时器,函数为serverCron。
- aeCreateFileEvent将监听fd加入epoll.
- scriptingInit初始化lua脚本相关的数据。
- bioInit初始化2个后端线程,函数为bioProcessBackgroundJobs,进行一些后台任务。
其中打开监听端口代码如下:
void initServer() { createSharedObjects();//初始化shared这个大全局结构的成员。基本都是字符串 adjustOpenFilesLimit();//调整打开文件数目限制。如果可以的话。如果允许会设置为server.maxclients+32。否则以128步长减少直到成功。 server.el = aeCreateEventLoop(server.maxclients+1024);//调用epoll_create,初始化epoll相关数据结构。如果使用epoll的话。 server.db = zmalloc(sizeof(redisDb)*server.dbnum); if (server.port != 0) { //依次调用socket() -> bind() -> listen(),绑定监听端口。从这里开始,客户端连接就可以成功了。 server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr); if (server.ipfd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening port %d: %s", server.port, server.neterr); exit(1); } } //····· } int anetTcpServer(char *err, int port, char *bindaddr) {//socket() -> bind() -> listen().调用这三个函数,绑定一个端口,监听端口。这样客户端的连接就已经可以进来了。 //为什么不等着load完数据再打开呢?不然上线的时候会瞬间累积很多连接,卡在那的。容易导致backlog满。 int s; struct sockaddr_in sa; //socket的包装函数,还设置了SO_REUSEADDR。新建一个sock if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR) return ANET_ERR; memset(&sa,0,sizeof(sa)); sa.sin_family = AF_INET; sa.sin_port = htons(port); sa.sin_addr.s_addr = htonl(INADDR_ANY); if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) { anetSetError(err, "invalid bind address"); close(s); return ANET_ERR; } //下面就调用了bind,listen函数,其他的没有了。 if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) return ANET_ERR; return s; }
然后便是将打开的listen端口加入epoll事件监听集中,进行监听,由aeCreateFileEvent完成,用epoll_ctl注册刻度事件。其注册的listen端口可读事件处理函数为acceptTcpHandler,这样在listen端口有新连接的时候会调用acceptTcpHandler,后者在accept这个新连接,然后就可以处理后续跟这个客户端连接相关的事件了。
void initServer() { //`````` //增加一个定时器,没毫秒触发。用于做一些定时任务,整理任务等。 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("create time event failed"); exit(1); } //将这个sockfd和unix域的监听句柄加入epoll里面。设置其回调。 if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); //`````` } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {//将参数fd加入epoll中,并保存相关回调,数据到eventLoop->events[fd]中。到时候可以根据fd取得数据。 //proc 为acceptTcpHandler 或者如果是unix域就是acceptUnixHandler //客户端连接会设置为readQueryFromClient,用来读取客户端的query. if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd];//直接拿fd当下标获取数组。 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; //······ return AE_OK; } //增加一个fd到epoll监听事件里面。如果已经存在,就更改它,否则添加。 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
二、事件监听循环
aeMain函数进入事件监听循环,循环终止条件为eventLoop->stop标记为1的时候。也就是aeStop的工作,但是可怜的这个函数在主程序不会被调用,也就是redis不会从这里退出。只有Redis-benchmark.c 里面进行测试的时候才会调用。那么,redis怎么退出的?答案是:shutdownCommand()函数,也就是shutdown命令。千万不要直接kill,否则数据没了就悲剧了。
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) {//stop ==1 停止服务 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop);//调用了beforeSleep函数。 aeProcessEvents(eventLoop, AE_ALL_EVENTS);//处理各种事件。 } }
beforeSleep函数会处理一些挂起的事件,并调用flushAppendOnlyFile将之前的指令的改动,写入到AOF文件中,内容在server.aof_buf里面了。
主循环重点在aeProcessEvents,aeProcessEvents函数首先处理一下链表里面的定时器事件,调用对应的prc,然后调用aeApiPoll等待读写事件,实际上调用的是epoll_wait函数。
int aeProcessEvents(aeEventLoop *eventLoop, int flags){ //········· //到这里后,tvp指向一个超时结构,代表epoll能够等待的时间。 //调用epoll_wait,返回可读/写的事件数目,并且将其设置到fired数组上面,下面进行实际的事件处理。 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { //一个个扫描eventLoop->fired数组里面的,已经fired的连接句柄, //也就是循环处理每个连接的读写事件。aeApiPoll只是返回是否有事件。 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) {//处理读事件 rfired = 1; //accept连接为acceptTcpHandler,initServer设置的。 //设置的读句柄为readQueryFromClient,读取客户端的连接。 fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) {//处理写事件。 if (!rfired || fe->wfileProc != fe->rfileProc)//如果没有处理可读事件 或者 读写句柄不相同,那么需要调用其函数。 fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } //········· }
至于具体的事件处理方式,见下面。
三、listen端口可读事件
上面介绍了,initServer打开listen端口后,会注册epoll可读事件,并记录处理函数为fe->rfileProc = acceptTcpHandler;看看这个函数内容,其分2步,第一步用acceptTcpHandler接受这个客户端连接,然第二部初始化这个客户端连接的相关数据,将clientfd加入epoll里面,设置的可读事件处理函数为readQueryFromClient,也就是读取客户端请求体的函数。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { //aeProcessEvents函数做为rfileProc调用的可读事件结构,是监听端口的可读事件回调。 int cport, cfd; char cip[128]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); //accept接受一个客户端的连接。返回客户端IP,PORT,以及新fd cfd = anetTcpAccept(server.neterr, fd, cip, &cport); if (cfd == AE_ERR) { redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); //将cfd加入epoll设置回调函数为readQueryFromClient,并初始化redisClient acceptCommonHandler(cfd,0); }
accept接受一个客户端的连接。返回客户端IP,PORT,以及新fd 由anetTcpAccept完成,其很简单:
int anetTcpAccept(char *err, int s, char *ip, int *port) { //acceptTcpHandler调用这里,用来接受一个客户端的连接。返回客户端IP,PORT,以及新fd int fd; struct sockaddr_in sa; socklen_t salen = sizeof(sa); //简单调用accept函数接收一个连接,返回fd if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR) return ANET_ERR; if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); if (port) *port = ntohs(sa.sin_port); return fd; }
第二步初始化clientfd,由acceptCommonHandler完成。
static void acceptCommonHandler(int fd, int flags) { //acceptTcpHandler调用accept接收完一个客户端连接后, //调用这里将这个连接的相关成员初始化,放入epoll,回调为readQueryFromClient redisClient *c; //设置客户端连接socket的标志,加入到epoll,设置readQueryFromClient为可读事件回调。 if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in non-blocking * mode and we can send an error for free using the Kernel I/O */ if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; freeClient(c); return; } server.stat_numconnections++; c->flags |= flags; }
listen端口接受一个客户端连接后,设置这个cfd的处理函数为readQueryFromClient,这样在主循环中,epoll_wait返回的句柄中如果这个cfd发送了数据到redis的话,就会返回这个fd,从而就会调用fe->rfileProc(eventLoop,fd,fe->clientData,mask);对于客户端连接,自然就是上面的readQueryFromClient函数。
四、读取客户端命令行数据
readQueryFromClient函数用来读取客户端的请求命令行数据。这部分涉及到redis命令行协议的文本处理,后面进行函数调用等。限于篇幅改天介绍吧。
五、给客户端返回数据
这部分跟读取客户端命令行数据类似,只是处理函数为sendReplyToClient,其是在prepareClientToWrite里面设置的,比如如果要给客户端发送数据了,调用addReplyString即可。后者先注册可写事件,然后将回复数据放到临时缓冲区中,等待fd可写的时候由sendReplyToClient发送数据给客户端。
void addReplyString(redisClient *c, char *s, size_t len) { //注册一个可读的epoll句柄,回调为sendReplyToClient函数。 if (prepareClientToWrite(c) != REDIS_OK) return; //尝试把s内存数据拷贝到buf的bufpos后面,如果reply不为空,那么要追加到reply的后面。 if (_addReplyToBuffer(c,s,len) != REDIS_OK) _addReplyStringToList(c,s,len);//放到reply列表的后面,以备后续进行发送。 } int prepareClientToWrite(redisClient *c) { //如果可能,给这个链接注册一个可读的epoll句柄,回调为sendReplyToClient函数。 if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK; if (c->fd <= 0) return REDIS_ERR; /* Fake client */ if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return REDIS_ERR; return REDIS_OK; }
关于发送缓冲区,redis优先将数据存储在固定大小的buf中,也就是redisClient::buf[REDIS_REPLY_CHUNK_BYTES];里面,默认大小为16K,必须修改宏重新编译才可以更改。如果有数据没有发送完,c->buf放不下了,就会放到c->reply链表里面,链表每个节点都是内存buf,后来的数据放入最后面。这方面可以看_addReplyToBuffer和_addReplyStringToList两个函数。
int _addReplyToBuffer(redisClient *c, char *s, size_t len) { //把s缓存拷贝到buf的bufpos后面。 size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return REDIS_ERR;//如果reply上面有数据,那么就必须将这块buf放到reply的后面,而不能插入到这里。 /* Check that the buffer has enough space available for this string. */ if (len > available) return REDIS_ERR; memcpy(c->buf+c->bufpos,s,len);//把s缓存拷贝到buf的bufpos后面。 c->bufpos+=len; return REDIS_OK; } void _addReplyStringToList(redisClient *c, char *s, size_t len) { //_addReplyToBuffer如果遇到reply列表不为空,就必须将s的内存数据追加到reply列表的后面。 //将s加入到c->reply的后面,如果reply有元素,而且最后一个元素足够容纳现在的s字符串,长度len,则追加到后面就行了。 robj *tail; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; if (listLength(c->reply) == 0) {//如果c->reply为空,可能有其他地方调用这里的,不是_addReplyToBuffer robj *o = createStringObject(s,len); //把刚申请的s放到c->reply的后面 listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) {//最后一个节点足够容纳这块内存,就将其追加到后面。 c->reply_bytes -= zmalloc_size_sds(tail->ptr); //如果reply最后一个节点被其他地方共享了,就分配一个新的并且修改最后一个节点指针。 tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,s,len);//可能会重新relloca c->reply_bytes += zmalloc_size_sds(tail->ptr); } else {//只能老老实实分配一个新的节点,放到reply的后面了。 robj *o = createStringObject(s,len); listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); } } //下面判断一下客户端使用的输出buffer是否超过限制,如果是, //就设置一个标志REDIS_CLOSE_ASAP,然后打印客户端的所有状态信息。 asyncCloseClientOnOutputBufferLimitReached(c); }
sendReplyToClient函数比较简单,其先后将c->buf内存的数据发送给客户端,然后将c->reply链表的数据用write的方式发送出去。都懒得用writev了。
到这里介绍的差不多了,监听数据用acceptTcpHandler,读数据用readQueryFromClient,写数据用sendReplyToClient,简单可依赖,没做太多优化。
近期评论