Redis主从同步源码浅析-Master端
关于Redis的主从同步的基本介绍这里有:Replication, 不多介绍了。本文只涉及到主库的代码,从库的相关代码改天补上。
这里主要介绍redis 2.6.13版本代码,目前2.8新增了一些功能,比如增量同步功能等,不过到目前2013-10-05还没有正式上线。总结一下几点跟下面相关的:
- 同步采用类似mysql的操作日志重放方式,将写操作分发到从库重放。
- 每次从库启动必须从主库重新同步一份全量RDB数据文件,因此不能随便停止从库;
- 数据同步采用异步将写操作指令发送给从库的方式进行。
总体来说,redis的同步原理是:slave启动时下载所有数据快照,下载快照过程中产生的新写操作日志会不断累积记录起来,发送完快照后就发送这部分增量日志,日志在slave端进行重放。下面分步讲。
零、从库发送同步指令
redis slave启动需要再从库运行"sync"指令,告诉master需要其进行后台进程保存数据快照,也就是RDB文件以及在这个过程中保存的增量日志发送到slave,SYNC指令的处理函数为syncCommand, 函数首先判断一些基本的条件比如是否允许从库,是否当前还有数据没有发送给slave。
redis为了尽量减少增加slave的时候需要做RDB文件保存的操作,会在SYNC指令处理的时候,判断一下当前是否已经有从库刚刚发送了SYNC指令而启动RDB BGSAVE,是否已经启动了BGSAVE,分如下几种情况:
- 如果有RDB进程在做快照,并且有slave在等待快照完成,那么这个新的slave不需要重新进行做快照,只需要复制前述slave就行;
- 如果有RDB进程在做快照,但没有slave在等待快照完成,也就是所有的slave都已经完成RDB文件保存操作,则需要等待当前这次RDB快照完成后,自己重新启动一次RDB快照,因为这次没来得及保存增量写操作日志。
- 如果没有RDB进程在做快照,那么当前是可以安全启动RDB快照的,那么就调用rdbSaveBackground启动快照。同时会自动记录后续写操作日志。
相关代码比较简单:
void syncCommand(redisClient *c) { //响应redis的sync指令 //········ /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ if (server.rdb_child_pid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ redisClient *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); //正好现在有RDB程序在保存日志,那么我需要查看一下是否正好有从库在等待这个RDB结束的操作 //如果有,我就可以偷偷的将其尚未发送,累积起来的写操作日志拷贝一份,插队到后面作为从库。 //等待全部RDB文件发送完后将这部分累积的日志发送给这个新从库,其实就是类似插队的意思,插上对后,顺便打一碗饭 while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; } if (ln) {//正好有其他slave出发了BGSAVE操作,那么就正好可以通过copy同伴的缓冲区的方式追上。 //这些buffer是一个客户端注册成为从库之后,主库会主动的在replicationFeedSlaves //里面将新写入日志追加到这里的。只是那个时候还不能发送,没注册写事件 /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ copyClientOutputBuffer(c,slave);//将要发送给之前的slave的数据拷贝一份,也就是也要发送给新的这个从库。 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences */ //由于之前正好没有客户端在后台保存过程中发送sync,所以只能老老实话等待下一个bgsave了。 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else {//没办法了,老老实实进行BGSAVE, /* Ok we don't have a BGSAVE in progress, let's start one */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= REDIS_SLAVE;//成功,标记为slave c->slaveseldb = 0; listAddNodeTail(server.slaves,c); return; }
这就是SYNC指令的代码了,挺简单的,rdbSaveBackground 用fork()的方式启动数据库RDB文件快照保存进程,不是这里的重点,来看看后面的工作。
一、master分发RDB文件给slave
当RDB快照生成完成后,就会由serverCron函数定期轮询检测到,进而调用backgroundSaveDoneHandler函数,后者主要工作就是调用updateSlavesWaitingBgsave去处理SYNC指令发送后,快照生成完成后的事情:分发RDB文件给slave。
updateSlavesWaitingBgsave 函数主要有2个工作:
- 启动RDB文件发送任务;
- 启动之前由于在RDB过程中欠下的SYNC指令,为新的slave再次生成RDB快照。
第一个启动RDB文件发送任务是通过用aeCreateFileEvent注册slave连接的可写事件为sendBulkToSlave函数达到的。如下代码:
void updateSlavesWaitingBgsave(int bgsaveerr) { //backgroundSaveDoneHandler在BGSAVE操作完成后,调用这里来处理可能的从库事件。 listNode *ln; int startbgsave = 0; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) {//循环遍历每一个从库,查看其状态进行相应的处理。 redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { startbgsave = 1;//刚才在做bgsave的时候,有客户端来请求sync同步,但是我没有理他,现在得给他准备了。 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;//修改这个状态后,新的写入操作会记录到这个连接的缓存里 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { //后台保存完成,下面需要发送rdb文件了,丫的够大的 struct redis_stat buf; if (bgsaveerr != REDIS_OK) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; } //打开这个rdb_filename,要准备给这个slave发送数据了。 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); continue; } slave->repldboff = 0; slave->repldbsize = buf.st_size; //记住此时slave->repldbfd没有关闭,可写事件的时候就不需要打开了。 slave->replstate = REDIS_REPL_SEND_BULK; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);//删掉之前的可写回调,注册为sendBulkToSlave if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } } }
剩下的工作就是上面第一部分提到的,如果SYNC命令的时候已经有RDB后台快照进程在工作了,而且没有slave在等待这个快照,那么只能老老实实等待这个RDB保存完毕,之后再启动一个RDB任务。所以在上面一部分轮训所有的slave的时候,会检测到是否有slave在REDIS_REPL_WAIT_BGSAVE_START状态,如果有,会设置startbgsave标记,从而用rdbSaveBackground进行RDB后台快照。
不过目前我拿到的代码中有个bug,就是如下的“if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)”这一行永远也没法成立,因为在上面一部分已经将slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;了,这应该是错误处理没有测试到的原因,毕竟该代码的条件为:SYNC指令正好碰到已有RDB快照,并且没有其他slave等待, 并且rdbSaveBackground返回ERROR。而后者返回ERROR的情形基本只有fork()失败返回-1的情况。太少见了。
对于这个bug在github已经提了个Issue #1308, https://github.com/antirez/redis/issues/1308, 希望作者看到修复吧。ps: 感谢 huangz1990帮忙确认这个bug。
if (startbgsave) {//悲剧,又有要sync的,还得保存一次。 if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { listIter li; listRewind(server.slaves,&li); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); while((ln = listNext(&li))) { redisClient *slave = ln->value; //这下面似乎有问题,replstate已经在上面被设置为了_END。https://github.com/antirez/redis/issues/1308 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) freeClient(slave); } } } }
关于slave连接的可写事件处理句柄sendBulkToSlave就不啰嗦了,主要就是读取RDB文件,然后发送内容给slave的工作。先发送一行总RDB文件大小给slave。
然后就是文件内容,发送文件内容是一次16K的方式发送,也就是每个可写事件一次只发送16K数据,避免太多阻塞其他连接。发送完毕后将可写事件处理句柄设置为sendReplyToClient,从而切换到发送这个过程中的累积的日志以及常规的同步。
lseek(slave->repldbfd,slave->repldboff,SEEK_SET); buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);//跳到未发送的位置,一次读取16K字节 if (buflen <= 0) { redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", (buflen == 0) ? "premature EOF" : strerror(errno)); freeClient(slave);//出错都不给客户端一点错误的···不过还好,之前发送过长度了的,slave会超时了的。 return; } if ((nwritten = write(fd,buf,buflen)) == -1) { redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s", strerror(errno)); freeClient(slave); return; } slave->repldboff += nwritten;//向后移动指针,可能没有全部发送完毕。 if (slave->repldboff == slave->repldbsize) {//发送完毕了,可以关闭RDB快照文件了。 close(slave->repldbfd); slave->repldbfd = -1; //删除当前的这个可写回调sendBulkToSlave,注册一个新的sendReplyToClient可写回调,这样就能将增量的写日志发送给slave了。 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); slave->replstate = REDIS_REPL_ONLINE; if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { freeClient(slave); return; } redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); }
二、增量数据同步给slave
slave初始化时,RDB快照文件发送给slave后,slave就可以处理query请求了,但是后续的主库写操作如何同步给slave呢?答案是类似AOF的方式:增量写操作分发给slave重放。
这里有2类增量写操作:
RDB快照过程中的写操作, RDB文件发送过程中的写操作 和 常规的分发后的写操作。
其实这三类处理方式都一样,都是通过在master上及时的将期间的写入操作指令保存起来,到时候发送给slave重新运行一下。
其实现方式是在redis的call()函数每处理完一条指令,如果是写入指令,就会调用propagate()函数,从其名称即可知道其功能是“分发”的意思。propagate有2个工作,一个是保存AOF的增量日志,另外一个就是尽心slave的增量日志保存。
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) {//call()等函数处理完指令后,调用这里将指令加入到AOF缓冲里面,以供后续追加到AOF文件 //如果aof_state开关打开了,并且flags置位了写入AOF文件的标志,那么久需要将本指令加入到AOF缓存去。 if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); //将这条指令发送给所有的slaves,放到其缓冲里面以待发送。就当它是一个服务端一样。 if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves)) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
replicationFeedSlaves函数进行实际上的slave增量日志暂存操作,暂存的命令放在c->buf或者c->reply列表上,暂时不会发送给客户端,因为可写事件句柄不会发送这上面的代码的,知道RDB文件发送完毕。其内容比较简单,就是讲客户端发送的指令重新拼接成字符串命令,然后追加到slave连接的暂存缓冲区中,待机发送。
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { //propagateExpire删除超时key,或者propagate函数分发同步客户端写操作时会调用这里。 //将这条指令发送给所有的slaves,放到其缓冲里面以待发送。 listNode *ln; listIter li; int j; listRewind(slaves,&li);//获取slaves的第一个节点 while((ln = listNext(&li))) { redisClient *slave = ln->value; //下面不发送给这个slave,那么数据怎么办? 后面记着了的。这个状态的客户端是个从库,并且sync后rdb还没有开始,没必要为其追加写操作日志。 /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ if (slave->slaveseldb != dictid) { robj *selectcmd; if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; incrRefCount(selectcmd); } else { selectcmd = createObject(REDIS_STRING, sdscatprintf(sdsempty(),"select %d\r\n",dictid)); } addReply(slave,selectcmd);//追加一条选择db的指令给当前这个slave decrRefCount(selectcmd); slave->slaveseldb = dictid; } addReplyMultiBulkLen(slave,argc);//增加参数数目指令 for (j = 0; j < argc; j++) //一个个将参数追加到slave上面去。 addReplyBulk(slave,argv[j]); } }
另外replicationFeedSlaves还可能在键过期的时候调用,这是因为redis为了保持一致性,所有键的过期都是通过master进行的。键过期的时候,master会组成一调DEL 指令,调用replicationFeedSlaves发送给所有slave删除键。
暂时写到这里,后面再写一个2.8版本的增量同步代码,然后写一个redis 集群化相关的代码学习吧。
近期评论