首页 > Redis > Redis主从同步源码浅析-Master端

Redis主从同步源码浅析-Master端

2013年10月5日 发表评论 阅读评论 12651次阅读    

关于Redis的主从同步的基本介绍这里有:Replication, 不多介绍了。本文只涉及到主库的代码,从库的相关代码改天补上。

这里主要介绍redis 2.6.13版本代码,目前2.8新增了一些功能,比如增量同步功能等,不过到目前2013-10-05还没有正式上线。总结一下几点跟下面相关的:

  1. 同步采用类似mysql的操作日志重放方式,将写操作分发到从库重放。
  2. 每次从库启动必须从主库重新同步一份全量RDB数据文件,因此不能随便停止从库;
  3. 数据同步采用异步将写操作指令发送给从库的方式进行。

总体来说,redis的同步原理是:slave启动时下载所有数据快照,下载快照过程中产生的新写操作日志会不断累积记录起来,发送完快照后就发送这部分增量日志,日志在slave端进行重放。下面分步讲。

零、从库发送同步指令

redis slave启动需要再从库运行"sync"指令,告诉master需要其进行后台进程保存数据快照,也就是RDB文件以及在这个过程中保存的增量日志发送到slave,SYNC指令的处理函数为syncCommand, 函数首先判断一些基本的条件比如是否允许从库,是否当前还有数据没有发送给slave。

redis为了尽量减少增加slave的时候需要做RDB文件保存的操作,会在SYNC指令处理的时候,判断一下当前是否已经有从库刚刚发送了SYNC指令而启动RDB BGSAVE,是否已经启动了BGSAVE,分如下几种情况:

  1. 如果有RDB进程在做快照,并且有slave在等待快照完成,那么这个新的slave不需要重新进行做快照,只需要复制前述slave就行;
  2. 如果有RDB进程在做快照,但没有slave在等待快照完成,也就是所有的slave都已经完成RDB文件保存操作,则需要等待当前这次RDB快照完成后,自己重新启动一次RDB快照,因为这次没来得及保存增量写操作日志。
  3. 如果没有RDB进程在做快照,那么当前是可以安全启动RDB快照的,那么就调用rdbSaveBackground启动快照。同时会自动记录后续写操作日志。

相关代码比较简单:

void syncCommand(redisClient *c) {
	//响应redis的sync指令
//········
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1) {
        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;

        listRewind(server.slaves,&li);
		//正好现在有RDB程序在保存日志,那么我需要查看一下是否正好有从库在等待这个RDB结束的操作
		//如果有,我就可以偷偷的将其尚未发送,累积起来的写操作日志拷贝一份,插队到后面作为从库。
		//等待全部RDB文件发送完后将这部分累积的日志发送给这个新从库,其实就是类似插队的意思,插上对后,顺便打一碗饭
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
        if (ln) {//正好有其他slave出发了BGSAVE操作,那么就正好可以通过copy同伴的缓冲区的方式追上。
        //这些buffer是一个客户端注册成为从库之后,主库会主动的在replicationFeedSlaves
        //里面将新写入日志追加到这里的。只是那个时候还不能发送,没注册写事件
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */
            copyClientOutputBuffer(c,slave);//将要发送给之前的slave的数据拷贝一份,也就是也要发送给新的这个从库。
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
        //由于之前正好没有客户端在后台保存过程中发送sync,所以只能老老实话等待下一个bgsave了。
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {//没办法了,老老实实进行BGSAVE,
        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    }

    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;//成功,标记为slave
    c->slaveseldb = 0;
    listAddNodeTail(server.slaves,c);
    return;
}

这就是SYNC指令的代码了,挺简单的,rdbSaveBackground 用fork()的方式启动数据库RDB文件快照保存进程,不是这里的重点,来看看后面的工作。

一、master分发RDB文件给slave

当RDB快照生成完成后,就会由serverCron函数定期轮询检测到,进而调用backgroundSaveDoneHandler函数,后者主要工作就是调用updateSlavesWaitingBgsave去处理SYNC指令发送后,快照生成完成后的事情:分发RDB文件给slave。

updateSlavesWaitingBgsave 函数主要有2个工作:

  1. 启动RDB文件发送任务;
  2. 启动之前由于在RDB过程中欠下的SYNC指令,为新的slave再次生成RDB快照。

第一个启动RDB文件发送任务是通过用aeCreateFileEvent注册slave连接的可写事件为sendBulkToSlave函数达到的。如下代码:

void updateSlavesWaitingBgsave(int bgsaveerr) {
//backgroundSaveDoneHandler在BGSAVE操作完成后,调用这里来处理可能的从库事件。
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {//循环遍历每一个从库,查看其状态进行相应的处理。
        redisClient *slave = ln->value;

        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;//刚才在做bgsave的时候,有客户端来请求sync同步,但是我没有理他,现在得给他准备了。
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;//修改这个状态后,新的写入操作会记录到这个连接的缓存里
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
        //后台保存完成,下面需要发送rdb文件了,丫的够大的
            struct redis_stat buf;

            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }
			//打开这个rdb_filename,要准备给这个slave发送数据了。
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
//记住此时slave->repldbfd没有关闭,可写事件的时候就不需要打开了。
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);//删掉之前的可写回调,注册为sendBulkToSlave
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }

剩下的工作就是上面第一部分提到的,如果SYNC命令的时候已经有RDB后台快照进程在工作了,而且没有slave在等待这个快照,那么只能老老实实等待这个RDB保存完毕,之后再启动一个RDB任务。所以在上面一部分轮训所有的slave的时候,会检测到是否有slave在REDIS_REPL_WAIT_BGSAVE_START状态,如果有,会设置startbgsave标记,从而用rdbSaveBackground进行RDB后台快照。
不过目前我拿到的代码中有个bug,就是如下的“if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)”这一行永远也没法成立,因为在上面一部分已经将slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;了,这应该是错误处理没有测试到的原因,毕竟该代码的条件为:SYNC指令正好碰到已有RDB快照,并且没有其他slave等待, 并且rdbSaveBackground返回ERROR。而后者返回ERROR的情形基本只有fork()失败返回-1的情况。太少见了。

对于这个bug在github已经提了个Issue #1308,  https://github.com/antirez/redis/issues/1308, 希望作者看到修复吧。ps: 感谢 huangz1990帮忙确认这个bug。

    if (startbgsave) {//悲剧,又有要sync的,还得保存一次。
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;
				//这下面似乎有问题,replstate已经在上面被设置为了_END。https://github.com/antirez/redis/issues/1308
                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

关于slave连接的可写事件处理句柄sendBulkToSlave就不啰嗦了,主要就是读取RDB文件,然后发送内容给slave的工作。先发送一行总RDB文件大小给slave。

然后就是文件内容,发送文件内容是一次16K的方式发送,也就是每个可写事件一次只发送16K数据,避免太多阻塞其他连接。发送完毕后将可写事件处理句柄设置为sendReplyToClient,从而切换到发送这个过程中的累积的日志以及常规的同步。

    lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
    buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);//跳到未发送的位置,一次读取16K字节
    if (buflen <= 0) {
        redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
            (buflen == 0) ? "premature EOF" : strerror(errno));
        freeClient(slave);//出错都不给客户端一点错误的···不过还好,之前发送过长度了的,slave会超时了的。
        return;
    }
    if ((nwritten = write(fd,buf,buflen)) == -1) {
        redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s", strerror(errno));
        freeClient(slave);
        return;
    }
    slave->repldboff += nwritten;//向后移动指针,可能没有全部发送完毕。
    if (slave->repldboff == slave->repldbsize) {//发送完毕了,可以关闭RDB快照文件了。
        close(slave->repldbfd);
        slave->repldbfd = -1;
		//删除当前的这个可写回调sendBulkToSlave,注册一个新的sendReplyToClient可写回调,这样就能将增量的写日志发送给slave了。
        aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
        slave->replstate = REDIS_REPL_ONLINE;
        if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) {
            freeClient(slave);
            return;
        }
        redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
    }

二、增量数据同步给slave

slave初始化时,RDB快照文件发送给slave后,slave就可以处理query请求了,但是后续的主库写操作如何同步给slave呢?答案是类似AOF的方式:增量写操作分发给slave重放。
这里有2类增量写操作:
RDB快照过程中的写操作, RDB文件发送过程中的写操作 和 常规的分发后的写操作。
其实这三类处理方式都一样,都是通过在master上及时的将期间的写入操作指令保存起来,到时候发送给slave重新运行一下。
其实现方式是在redis的call()函数每处理完一条指令,如果是写入指令,就会调用propagate()函数,从其名称即可知道其功能是“分发”的意思。propagate有2个工作,一个是保存AOF的增量日志,另外一个就是尽心slave的增量日志保存。

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{//call()等函数处理完指令后,调用这里将指令加入到AOF缓冲里面,以供后续追加到AOF文件
//如果aof_state开关打开了,并且flags置位了写入AOF文件的标志,那么久需要将本指令加入到AOF缓存去。
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);

	//将这条指令发送给所有的slaves,放到其缓冲里面以待发送。就当它是一个服务端一样。
    if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

replicationFeedSlaves函数进行实际上的slave增量日志暂存操作,暂存的命令放在c->buf或者c->reply列表上,暂时不会发送给客户端,因为可写事件句柄不会发送这上面的代码的,知道RDB文件发送完毕。其内容比较简单,就是讲客户端发送的指令重新拼接成字符串命令,然后追加到slave连接的暂存缓冲区中,待机发送。

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
//propagateExpire删除超时key,或者propagate函数分发同步客户端写操作时会调用这里。
//将这条指令发送给所有的slaves,放到其缓冲里面以待发送。
    listNode *ln;
    listIter li;
    int j;

    listRewind(slaves,&li);//获取slaves的第一个节点
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

//下面不发送给这个slave,那么数据怎么办? 后面记着了的。这个状态的客户端是个从库,并且sync后rdb还没有开始,没必要为其追加写操作日志。
        /* Don't feed slaves that are still waiting for BGSAVE to start */
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;

        /* Feed slaves that are waiting for the initial SYNC (so these commands
         * are queued in the output buffer until the initial SYNC completes),
         * or are already in sync with the master. */
        if (slave->slaveseldb != dictid) {
            robj *selectcmd;

            if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
                selectcmd = shared.select[dictid];
                incrRefCount(selectcmd);
            } else {
                selectcmd = createObject(REDIS_STRING,
                    sdscatprintf(sdsempty(),"select %d\r\n",dictid));
            }
            addReply(slave,selectcmd);//追加一条选择db的指令给当前这个slave
            decrRefCount(selectcmd);
            slave->slaveseldb = dictid;
        }
        addReplyMultiBulkLen(slave,argc);//增加参数数目指令
        for (j = 0; j < argc; j++) //一个个将参数追加到slave上面去。
			addReplyBulk(slave,argv[j]);
    }
}

另外replicationFeedSlaves还可能在键过期的时候调用,这是因为redis为了保持一致性,所有键的过期都是通过master进行的。键过期的时候,master会组成一调DEL 指令,调用replicationFeedSlaves发送给所有slave删除键。
暂时写到这里,后面再写一个2.8版本的增量同步代码,然后写一个redis 集群化相关的代码学习吧。

Share
分类: Redis 标签: , ,

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。