Redis主从同步源码浅析-Master端
关于Redis的主从同步的基本介绍这里有:Replication, 不多介绍了。本文只涉及到主库的代码,从库的相关代码改天补上。
这里主要介绍redis 2.6.13版本代码,目前2.8新增了一些功能,比如增量同步功能等,不过到目前2013-10-05还没有正式上线。总结一下几点跟下面相关的:
- 同步采用类似mysql的操作日志重放方式,将写操作分发到从库重放。
- 每次从库启动必须从主库重新同步一份全量RDB数据文件,因此不能随便停止从库;
- 数据同步采用异步将写操作指令发送给从库的方式进行。
总体来说,redis的同步原理是:slave启动时下载所有数据快照,下载快照过程中产生的新写操作日志会不断累积记录起来,发送完快照后就发送这部分增量日志,日志在slave端进行重放。下面分步讲。
零、从库发送同步指令
redis slave启动需要再从库运行"sync"指令,告诉master需要其进行后台进程保存数据快照,也就是RDB文件以及在这个过程中保存的增量日志发送到slave,SYNC指令的处理函数为syncCommand, 函数首先判断一些基本的条件比如是否允许从库,是否当前还有数据没有发送给slave。
redis为了尽量减少增加slave的时候需要做RDB文件保存的操作,会在SYNC指令处理的时候,判断一下当前是否已经有从库刚刚发送了SYNC指令而启动RDB BGSAVE,是否已经启动了BGSAVE,分如下几种情况:
- 如果有RDB进程在做快照,并且有slave在等待快照完成,那么这个新的slave不需要重新进行做快照,只需要复制前述slave就行;
- 如果有RDB进程在做快照,但没有slave在等待快照完成,也就是所有的slave都已经完成RDB文件保存操作,则需要等待当前这次RDB快照完成后,自己重新启动一次RDB快照,因为这次没来得及保存增量写操作日志。
- 如果没有RDB进程在做快照,那么当前是可以安全启动RDB快照的,那么就调用rdbSaveBackground启动快照。同时会自动记录后续写操作日志。
相关代码比较简单:
void syncCommand(redisClient *c) {
//响应redis的sync指令
//········
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != -1) {
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
//正好现在有RDB程序在保存日志,那么我需要查看一下是否正好有从库在等待这个RDB结束的操作
//如果有,我就可以偷偷的将其尚未发送,累积起来的写操作日志拷贝一份,插队到后面作为从库。
//等待全部RDB文件发送完后将这部分累积的日志发送给这个新从库,其实就是类似插队的意思,插上对后,顺便打一碗饭
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {//正好有其他slave出发了BGSAVE操作,那么就正好可以通过copy同伴的缓冲区的方式追上。
//这些buffer是一个客户端注册成为从库之后,主库会主动的在replicationFeedSlaves
//里面将新写入日志追加到这里的。只是那个时候还不能发送,没注册写事件
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);//将要发送给之前的slave的数据拷贝一份,也就是也要发送给新的这个从库。
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
//由于之前正好没有客户端在后台保存过程中发送sync,所以只能老老实话等待下一个bgsave了。
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {//没办法了,老老实实进行BGSAVE,
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
}
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;//成功,标记为slave
c->slaveseldb = 0;
listAddNodeTail(server.slaves,c);
return;
}
这就是SYNC指令的代码了,挺简单的,rdbSaveBackground 用fork()的方式启动数据库RDB文件快照保存进程,不是这里的重点,来看看后面的工作。
一、master分发RDB文件给slave
当RDB快照生成完成后,就会由serverCron函数定期轮询检测到,进而调用backgroundSaveDoneHandler函数,后者主要工作就是调用updateSlavesWaitingBgsave去处理SYNC指令发送后,快照生成完成后的事情:分发RDB文件给slave。
updateSlavesWaitingBgsave 函数主要有2个工作:
- 启动RDB文件发送任务;
- 启动之前由于在RDB过程中欠下的SYNC指令,为新的slave再次生成RDB快照。
第一个启动RDB文件发送任务是通过用aeCreateFileEvent注册slave连接的可写事件为sendBulkToSlave函数达到的。如下代码:
void updateSlavesWaitingBgsave(int bgsaveerr) {
//backgroundSaveDoneHandler在BGSAVE操作完成后,调用这里来处理可能的从库事件。
listNode *ln;
int startbgsave = 0;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {//循环遍历每一个从库,查看其状态进行相应的处理。
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;//刚才在做bgsave的时候,有客户端来请求sync同步,但是我没有理他,现在得给他准备了。
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;//修改这个状态后,新的写入操作会记录到这个连接的缓存里
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
//后台保存完成,下面需要发送rdb文件了,丫的够大的
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
//打开这个rdb_filename,要准备给这个slave发送数据了。
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
//记住此时slave->repldbfd没有关闭,可写事件的时候就不需要打开了。
slave->replstate = REDIS_REPL_SEND_BULK;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);//删掉之前的可写回调,注册为sendBulkToSlave
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
剩下的工作就是上面第一部分提到的,如果SYNC命令的时候已经有RDB后台快照进程在工作了,而且没有slave在等待这个快照,那么只能老老实实等待这个RDB保存完毕,之后再启动一个RDB任务。所以在上面一部分轮训所有的slave的时候,会检测到是否有slave在REDIS_REPL_WAIT_BGSAVE_START状态,如果有,会设置startbgsave标记,从而用rdbSaveBackground进行RDB后台快照。
不过目前我拿到的代码中有个bug,就是如下的“if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)”这一行永远也没法成立,因为在上面一部分已经将slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;了,这应该是错误处理没有测试到的原因,毕竟该代码的条件为:SYNC指令正好碰到已有RDB快照,并且没有其他slave等待, 并且rdbSaveBackground返回ERROR。而后者返回ERROR的情形基本只有fork()失败返回-1的情况。太少见了。
对于这个bug在github已经提了个Issue #1308, https://github.com/antirez/redis/issues/1308, 希望作者看到修复吧。ps: 感谢 huangz1990帮忙确认这个bug。
if (startbgsave) {//悲剧,又有要sync的,还得保存一次。
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
//这下面似乎有问题,replstate已经在上面被设置为了_END。https://github.com/antirez/redis/issues/1308
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
关于slave连接的可写事件处理句柄sendBulkToSlave就不啰嗦了,主要就是读取RDB文件,然后发送内容给slave的工作。先发送一行总RDB文件大小给slave。
然后就是文件内容,发送文件内容是一次16K的方式发送,也就是每个可写事件一次只发送16K数据,避免太多阻塞其他连接。发送完毕后将可写事件处理句柄设置为sendReplyToClient,从而切换到发送这个过程中的累积的日志以及常规的同步。
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);//跳到未发送的位置,一次读取16K字节
if (buflen <= 0) {
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);//出错都不给客户端一点错误的···不过还好,之前发送过长度了的,slave会超时了的。
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s", strerror(errno));
freeClient(slave);
return;
}
slave->repldboff += nwritten;//向后移动指针,可能没有全部发送完毕。
if (slave->repldboff == slave->repldbsize) {//发送完毕了,可以关闭RDB快照文件了。
close(slave->repldbfd);
slave->repldbfd = -1;
//删除当前的这个可写回调sendBulkToSlave,注册一个新的sendReplyToClient可写回调,这样就能将增量的写日志发送给slave了。
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
slave->replstate = REDIS_REPL_ONLINE;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) {
freeClient(slave);
return;
}
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
}
二、增量数据同步给slave
slave初始化时,RDB快照文件发送给slave后,slave就可以处理query请求了,但是后续的主库写操作如何同步给slave呢?答案是类似AOF的方式:增量写操作分发给slave重放。
这里有2类增量写操作:
RDB快照过程中的写操作, RDB文件发送过程中的写操作 和 常规的分发后的写操作。
其实这三类处理方式都一样,都是通过在master上及时的将期间的写入操作指令保存起来,到时候发送给slave重新运行一下。
其实现方式是在redis的call()函数每处理完一条指令,如果是写入指令,就会调用propagate()函数,从其名称即可知道其功能是“分发”的意思。propagate有2个工作,一个是保存AOF的增量日志,另外一个就是尽心slave的增量日志保存。
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{//call()等函数处理完指令后,调用这里将指令加入到AOF缓冲里面,以供后续追加到AOF文件
//如果aof_state开关打开了,并且flags置位了写入AOF文件的标志,那么久需要将本指令加入到AOF缓存去。
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
//将这条指令发送给所有的slaves,放到其缓冲里面以待发送。就当它是一个服务端一样。
if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
replicationFeedSlaves函数进行实际上的slave增量日志暂存操作,暂存的命令放在c->buf或者c->reply列表上,暂时不会发送给客户端,因为可写事件句柄不会发送这上面的代码的,知道RDB文件发送完毕。其内容比较简单,就是讲客户端发送的指令重新拼接成字符串命令,然后追加到slave连接的暂存缓冲区中,待机发送。
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
//propagateExpire删除超时key,或者propagate函数分发同步客户端写操作时会调用这里。
//将这条指令发送给所有的slaves,放到其缓冲里面以待发送。
listNode *ln;
listIter li;
int j;
listRewind(slaves,&li);//获取slaves的第一个节点
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
//下面不发送给这个slave,那么数据怎么办? 后面记着了的。这个状态的客户端是个从库,并且sync后rdb还没有开始,没必要为其追加写操作日志。
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
if (slave->slaveseldb != dictid) {
robj *selectcmd;
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
incrRefCount(selectcmd);
} else {
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"select %d\r\n",dictid));
}
addReply(slave,selectcmd);//追加一条选择db的指令给当前这个slave
decrRefCount(selectcmd);
slave->slaveseldb = dictid;
}
addReplyMultiBulkLen(slave,argc);//增加参数数目指令
for (j = 0; j < argc; j++) //一个个将参数追加到slave上面去。
addReplyBulk(slave,argv[j]);
}
}
另外replicationFeedSlaves还可能在键过期的时候调用,这是因为redis为了保持一致性,所有键的过期都是通过master进行的。键过期的时候,master会组成一调DEL 指令,调用replicationFeedSlaves发送给所有slave删除键。
暂时写到这里,后面再写一个2.8版本的增量同步代码,然后写一个redis 集群化相关的代码学习吧。

近期评论