首页 > Redis > Redis主从同步源码浅析-Slave端

Redis主从同步源码浅析-Slave端

2013年10月6日 发表评论 阅读评论 15520次阅读    

前一篇文章写了下redis主从同步的server端代码,这里补一下slave端的。

简单来讲,看了master端就知道slave端的代码大概流程了:

  1. 中断跟本slave的下一级slave的连接,强迫其重连SYNC;
  2. 给master发送PING确认其状态是否OK;
  3. 发送SYNC要求master做RDB快照(2.8版本以上会有PSYNC的指令,也就是部分同步,下回介绍。);
  4. 接收RDB文件大小;
  5. 接收RDB文件;
  6. emptyDb()清空当前数据库,rdbLoad()重新加载新的RDB文件;
  7. 按需startAppendOnly,然后接收master过来的累积和实时更新数据;

下面分别介绍这些步骤。

零、slave初始化-启动同步流程

redis搭建slave比较简单,有2种方式,第一种是在配置文件中指定:

slaveof 127.0.0.1 6379

这样在redis启动加载配置文件后,会设置server.masterhost等信息,同时会设置server.repl_state = REDIS_REPL_CONNECT; 这样redis会在serverCrond定时任务的后面会隔一秒调用replicationCron函数,从而开始跟master的连接;

第二种方式为启动后用上面一样的指令设置master信息,这格式化会中断跟之前的master的信息,重新跟新的master建立连接,重新SYNC数据。处理函数为:slaveofCommand。

void slaveofCommand(redisClient *c) {
    if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) {
        if (server.masterhost) {//已经是个slave了,需要关闭之
//·····
        }
    } else {
        long port;
        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
            return;
        /* Check if we are already attached to the specified slave */
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
            addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
            return;
        }
        /* There was no previous master or the user specified a different one,
         * we can continue. */
        sdsfree(server.masterhost);
        server.masterhost = sdsdup(c->argv[1]->ptr);
        server.masterport = port;
        if (server.master) freeClient(server.master);//直接关闭之前的master连接,readSyncBulkPayload接收完RDB文件会设置这个的。

        disconnectSlaves(); /* Force our slaves to resync with us as well. */
        cancelReplicationHandshake();

	//下面设置这个的状态为需要连接master, 这样在serverCron定时任务会每秒调用replicationCron,进而会调用connectWithMaster进行重连的。
        server.repl_state = REDIS_REPL_CONNECT;
        redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", server.masterhost, server.masterport);
    }
    addReply(c,shared.ok);
}

这里当redis收到slaveof命令后,会中断跟目前的master建立的连接,然后会调用disconnectSlaves中断我自己的下一级slave,因为redis支持树形slave机制,类似mysql。

redis主从支持树形结构,所以这里需要先断开跟本slave的slave们的连接,让他们重连.这里需要关注重连后,新的数据如何同步的问题,比如我拿到RDB文件后,我需要将其复制一份给我的从库们. 实现的方式是让从库们重新发起sync指令,当然此时估计他们sync后的数据为空。

这里有个疑问,如果是树形的架构,正在同步数据的从库连接被断开,1秒后重新尝试连接,然后重新发送PING,SYNC,同步RDB文件,又重新建立了连接,这样是不是就悲剧了 ?

  1. 如果我自己slave-serve-stale-data 设置为off了,那么此时断开连接的我的二级slave们给我发送PING,SYNC指令的时候,我是不会处理的,只有info,slaveof等命令会处理,这样我的slave们无法同步成功,会因为我拒绝而在syncWithMaster里面因为阻塞读取一行"+PONG\r\n"失败而失败,再次进入REDIS_REPL_CONNECT状态尝试跟我建立连接。直到我的状态切换为REDIS_REPL_CONNECTED为止 。这种情况下没啥大问题顶多slave无法服务而已。
  2. 如果我自己的slave-serve-stale-data设置为on了,也就是我在没有跟master同步完RDB文件的过程中,还可以接受各种命令的,这个在processCommand里面检测的。那么也就可以接受下面我断开的这些slave的重连请求,包括PING,SYNC ! 这样他们又要求我做RDB快照,而且我真的去做快照,做完还发送给他们是不是悲剧了? 问题在于我还没有跟我的master同步完RDB数据的时候,我是否应该叫我的slave们立即跟我重新同步。这种情况是不是就悲剧了.额,不对,刚才测试了一下,这种情况不会发生,因为syncCommand函数开头检查了一下我自己的状态是不是在REDIS_REPL_CONNECTED,不在的话我是不会接收SYNC命令的。所以我的slave们不能立即SYNC成功,直到我自己的同步搞定了为止。否则收到"Can't SYNC while not connected with my master"而一直报错。

上面slaveofCommand关键的代码是这一行:server.repl_state = REDIS_REPL_CONNECT;设置这个标志后replicationCron会每秒检查server.repl_state的状态进行相应的操作。如果是REDIS_REPL_CONNECT,就会调用connectWithMaster去异步连接master.

void replicationCron(void) {
//serverCron调用这里,注意主库和从库都可能调用这里的。
//`````
    /* Check if we should connect to a MASTER */
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
//·····

来看看connectWithMaster的代码,挺简单的,就进行非阻塞的连接,设置连接的读写事件为syncWithMaster, 服务器状态server.repl_state 为 REDIS_REPL_CONNECTING。 这样如果连接成功,会调用syncWithMaster函数。

int connectWithMaster(void) {
    int fd;
//非阻塞建立连接
    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }
//绑定读写事件都为syncWithMaster
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }

    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    server.repl_state = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}

继续走syncWithMaster,syncWithMaster其实是个状态机,从发送PING,发送SYNC,等待结果,一个个处理。

一、发送PING消息

发送PING消息为了简单,redis是调用syncWrite同步阻塞发送的,发送完后将server.repl_state 设置为 REDIS_REPL_RECEIVE_PONG;也就是等待PING的状态。下次连接可读、写的时候会调用本函数去读取PING的结果。

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
//`````
    /* If we were connecting, it's time to send a non blocking PING, we want to
     * make sure the master is able to reply before going into the actual
     * replication process where we have long timeouts in the order of
     * seconds (in the meantime the slave would block). */
//如果之前正在REDIS_REPL_CONNECTING,现在有可读可写事件了,说明连接成功了,下一步就是需要发送PING请求
	if (server.repl_state == REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
        /* Delete the writable event so that the readable event remains
         * registered and we can wait for the PONG reply. */
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        server.repl_state = REDIS_REPL_RECEIVE_PONG;//设置为PING命令已经发送完毕
        /* Send the PING, don't check for errors at all, we have the timeout
         * that will take care about this. */
        syncWrite(fd,"PING\r\n",6,100);//同步阻塞发送PING命令,这样服务端会返回"+PONG\r\n"字符串的
        //这里实现的比较简单,就是发送完PING后,当连接可读可写时,再进syncWithMaster这个函数的时候
        //下面的代码会判断PONG这个动作,然后就会阻塞去读取一行回复,看他是不是成功了。
        return;
    }
    /* Receive the PONG command. */
    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
        char buf[1024];
//上面在REDIS_REPL_CONNECTING状态的时候给master发送了一行PING指令,这样master会返回"+PONG\r\n"的,
//现在连接可读或者可写了,所以我们阻塞读取这么多数据,判断返回是否OK。
        /* Delete the readable event, we no longer need it now that there is
         * the PING reply to read. */
        aeDeleteFileEvent(server.el,fd,AE_READABLE);

        /* Read the reply with explicit timeout. */
        buf[0] = '\0';
        if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING, "I/O error reading PING reply from master: %s", strerror(errno));
            goto error;
        }
//·····

后面如果需要进行AUTH验证,就会给服务器发送AUTH指令验证身份:sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);

二、发送SYNC请求RDB 快照

这个通过syncWrite发送一条SYNC指令过去,然后准备一个临时文件打开接收数据,将连接的可读事件设置为readSyncBulkPayload就行了。然后守卫工作将server.repl_state 这个小状态机设置为REDIS_REPL_TRANSFER,也就是准备接收RDB文件过程中。

//到这里的话,连接肯定成功了,而且PING指令也都收到了回复。所以果断发送SYNC指令
    /* Issue the SYNC command */
    if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
        redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
            strerror(errno));
        goto error;
    }

    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries--) {
        snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
        goto error;
    }
//SYNC命令已经发送了,以后的可读可写事件就依靠readSyncBulkPayload来读取解析了。
    /* Setup the non blocking download of the bulk file. */
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        redisLog(REDIS_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }

    server.repl_state = REDIS_REPL_TRANSFER;//进入数据传输阶段。

三、接收RDB文件

上面看到了,发送SYNC指令后,跟master的连接的可读事件设置为readSyncBulkPayload了,函数读取master发过来的RDB大小以及文件内容保存到本地文件中,如果读取完毕,那么调用rdbLoad加载文件内容。并考虑重新启动startAppendOnly。这个读取是异步的,所以如果需要,这个过程中redis还是可以处理请求。当然slave-serve-stale-data 得设置为on才行。

先读取RDB文件总大小:

void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
//读取master发过来的RDB大小以及文件内容保存到本地文件中;
//如果读取完毕,那么调用rdbLoad加载文件内容。并考虑重新启动startAppendOnly
    if (server.repl_transfer_size == -1) {
        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
//`````
		//先获取RDB文件大小。
        server.repl_transfer_size = strtol(buf+1,NULL,10);

然后就可以读取RDB文件内容了,其实就是一堆指令。注意redis为了避免阻塞,每次可读回调只读取16K的数据,然后写入RDB临时文件里面,写到一定大小,默认写死为REPL_MAX_WRITTEN_BEFORE_FSYNC 也就是8M,就进行一次刷磁盘的操作sync();避免到最后一次SYNC的时候直接卡死服务器。

    /* Read bulk data */
    left = server.repl_transfer_size - server.repl_transfer_read;
    readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
    nread = read(fd,buf,readlen);//读取一次,有且仅有的读取一次。每次可读事件就读一次。
    if (nread <= 0) {
        redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
            (nread == -1) ? strerror(errno) : "connection lost");
        replicationAbortSyncTransfer();
        return;
    }
    server.repl_transfer_lastio = server.unixtime;
    if (write(server.repl_transfer_fd,buf,nread) != nread) {//写到临时文件里面去。
        redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
        goto error;
    }
    server.repl_transfer_read += nread;//更新读了的数目

如果读取完成了,那么就可以加载数据了。

四、rdbLoad()重新加载新的RDB文件

如果文件全部接收完毕,redis会先清空所有数据emptyDb,然后用rdbLoad加载RDB文件到内存中。设置连接为CONNECTED状态

    /* Check if the transfer is now complete */
    if (server.repl_transfer_read == server.repl_transfer_size) {//看看是否文件全部接收完毕,如果完毕,GOOD
        if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
            redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
            replicationAbortSyncTransfer();
            return;
        }
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
        signalFlushedDb(-1);
        emptyDb();
        /* Before loading the DB into memory we need to delete the readable
         * handler, otherwise it will get called recursively since
         * rdbLoad() will call the event loop to process events from time to
         * time for non blocking loading. */
        aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);//上面注释说了,避免循环进入。
        //开始加载RDB文件到内存数据结构中,这个要花费不少时间的。
        if (rdbLoad(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
            replicationAbortSyncTransfer();
            return;
        }
        /* Final setup of the connected slave <- master link */
        zfree(server.repl_transfer_tmpfile);
        close(server.repl_transfer_fd);
        server.master = createClient(server.repl_transfer_s);//重新注册可读事件毁掉为readQueryFromClient
        server.master->flags |= REDIS_MASTER;
        server.master->authenticated = 1;
        server.repl_state = REDIS_REPL_CONNECTED;
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");

当切换server.repl_state 为 REDIS_REPL_CONNECTED的时候,新来的查询请求就能够被处理了,在processCommand里面就不会过滤非STALE请求,同时本slave也能接受下一级slave的SYNC指令了。

后面redis会附带启动AOF,如果需要的话。

五、总结

redis主从同步代码比较简练,不多,但功能该有的都有,很赞的。下面说点缺点:

  1. 不能支持增量同步(这个即将发布的2.8版本已经解决,采用backlog的形式);
  2. 如果系统很大,好几十G的RDB文件,靠一个连接发送RDB文件的话估计得把人耗死,而且更悲剧的问题是:在做RDB快照,以及发送RDB问的过程中,所有客户端的写操作都会记录在内存中,这个对本来内存要求高的redis又增加了负担;
  3. 另外redis的扩容是个问题,那么大的数据量,加载一次RDB文件得好几个小时,简直无法忍受。

不过关于扩容作者在其博客 里面介绍了可用的方法:presharding。不多说,绝对经典。

另外redis的集群化正在开发中,可用在这里看到redis集群化的进度和概况:Redis cluster Specification (work in progress)代码里面也有最新的相关实现了,过段时间看看。

Share
分类: Redis 标签: , ,

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。