Redis主从同步源码浅析-Slave端
前一篇文章写了下redis主从同步的server端代码,这里补一下slave端的。
简单来讲,看了master端就知道slave端的代码大概流程了:
- 中断跟本slave的下一级slave的连接,强迫其重连SYNC;
- 给master发送PING确认其状态是否OK;
- 发送SYNC要求master做RDB快照(2.8版本以上会有PSYNC的指令,也就是部分同步,下回介绍。);
- 接收RDB文件大小;
- 接收RDB文件;
- emptyDb()清空当前数据库,rdbLoad()重新加载新的RDB文件;
- 按需startAppendOnly,然后接收master过来的累积和实时更新数据;
下面分别介绍这些步骤。
零、slave初始化-启动同步流程
redis搭建slave比较简单,有2种方式,第一种是在配置文件中指定:
slaveof 127.0.0.1 6379
这样在redis启动加载配置文件后,会设置server.masterhost等信息,同时会设置server.repl_state = REDIS_REPL_CONNECT; 这样redis会在serverCrond定时任务的后面会隔一秒调用replicationCron函数,从而开始跟master的连接;
第二种方式为启动后用上面一样的指令设置master信息,这格式化会中断跟之前的master的信息,重新跟新的master建立连接,重新SYNC数据。处理函数为:slaveofCommand。
void slaveofCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) {//已经是个slave了,需要关闭之 //····· } } else { long port; if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK)) return; /* Check if we are already attached to the specified slave */ if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); return; } /* There was no previous master or the user specified a different one, * we can continue. */ sdsfree(server.masterhost); server.masterhost = sdsdup(c->argv[1]->ptr); server.masterport = port; if (server.master) freeClient(server.master);//直接关闭之前的master连接,readSyncBulkPayload接收完RDB文件会设置这个的。 disconnectSlaves(); /* Force our slaves to resync with us as well. */ cancelReplicationHandshake(); //下面设置这个的状态为需要连接master, 这样在serverCron定时任务会每秒调用replicationCron,进而会调用connectWithMaster进行重连的。 server.repl_state = REDIS_REPL_CONNECT; redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", server.masterhost, server.masterport); } addReply(c,shared.ok); }
这里当redis收到slaveof命令后,会中断跟目前的master建立的连接,然后会调用disconnectSlaves中断我自己的下一级slave,因为redis支持树形slave机制,类似mysql。
redis主从支持树形结构,所以这里需要先断开跟本slave的slave们的连接,让他们重连.这里需要关注重连后,新的数据如何同步的问题,比如我拿到RDB文件后,我需要将其复制一份给我的从库们. 实现的方式是让从库们重新发起sync指令,当然此时估计他们sync后的数据为空。
这里有个疑问,如果是树形的架构,正在同步数据的从库连接被断开,1秒后重新尝试连接,然后重新发送PING,SYNC,同步RDB文件,又重新建立了连接,这样是不是就悲剧了 ?
- 如果我自己slave-serve-stale-data 设置为off了,那么此时断开连接的我的二级slave们给我发送PING,SYNC指令的时候,我是不会处理的,只有info,slaveof等命令会处理,这样我的slave们无法同步成功,会因为我拒绝而在syncWithMaster里面因为阻塞读取一行"+PONG\r\n"失败而失败,再次进入REDIS_REPL_CONNECT状态尝试跟我建立连接。直到我的状态切换为REDIS_REPL_CONNECTED为止 。这种情况下没啥大问题顶多slave无法服务而已。
- 如果我自己的slave-serve-stale-data设置为on了,也就是我在没有跟master同步完RDB文件的过程中,还可以接受各种命令的,这个在processCommand里面检测的。那么也就可以接受下面我断开的这些slave的重连请求,包括PING,SYNC ! 这样他们又要求我做RDB快照,而且我真的去做快照,做完还发送给他们是不是悲剧了? 问题在于我还没有跟我的master同步完RDB数据的时候,我是否应该叫我的slave们立即跟我重新同步。这种情况是不是就悲剧了.额,不对,刚才测试了一下,这种情况不会发生,因为syncCommand函数开头检查了一下我自己的状态是不是在REDIS_REPL_CONNECTED,不在的话我是不会接收SYNC命令的。所以我的slave们不能立即SYNC成功,直到我自己的同步搞定了为止。否则收到"Can't SYNC while not connected with my master"而一直报错。
上面slaveofCommand关键的代码是这一行:server.repl_state = REDIS_REPL_CONNECT;设置这个标志后replicationCron会每秒检查server.repl_state的状态进行相应的操作。如果是REDIS_REPL_CONNECT,就会调用connectWithMaster去异步连接master.
void replicationCron(void) { //serverCron调用这里,注意主库和从库都可能调用这里的。 //````` /* Check if we should connect to a MASTER */ if (server.repl_state == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER..."); if (connectWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); } } //·····
来看看connectWithMaster的代码,挺简单的,就进行非阻塞的连接,设置连接的读写事件为syncWithMaster, 服务器状态server.repl_state 为 REDIS_REPL_CONNECTING。 这样如果连接成功,会调用syncWithMaster函数。
int connectWithMaster(void) { int fd; //非阻塞建立连接 fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); if (fd == -1) { redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return REDIS_ERR; } //绑定读写事件都为syncWithMaster if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); return REDIS_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; server.repl_state = REDIS_REPL_CONNECTING; return REDIS_OK; }
继续走syncWithMaster,syncWithMaster其实是个状态机,从发送PING,发送SYNC,等待结果,一个个处理。
一、发送PING消息
发送PING消息为了简单,redis是调用syncWrite同步阻塞发送的,发送完后将server.repl_state 设置为 REDIS_REPL_RECEIVE_PONG;也就是等待PING的状态。下次连接可读、写的时候会调用本函数去读取PING的结果。
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { //````` /* If we were connecting, it's time to send a non blocking PING, we want to * make sure the master is able to reply before going into the actual * replication process where we have long timeouts in the order of * seconds (in the meantime the slave would block). */ //如果之前正在REDIS_REPL_CONNECTING,现在有可读可写事件了,说明连接成功了,下一步就是需要发送PING请求 if (server.repl_state == REDIS_REPL_CONNECTING) { redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REDIS_REPL_RECEIVE_PONG;//设置为PING命令已经发送完毕 /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ syncWrite(fd,"PING\r\n",6,100);//同步阻塞发送PING命令,这样服务端会返回"+PONG\r\n"字符串的 //这里实现的比较简单,就是发送完PING后,当连接可读可写时,再进syncWithMaster这个函数的时候 //下面的代码会判断PONG这个动作,然后就会阻塞去读取一行回复,看他是不是成功了。 return; } /* Receive the PONG command. */ if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { char buf[1024]; //上面在REDIS_REPL_CONNECTING状态的时候给master发送了一行PING指令,这样master会返回"+PONG\r\n"的, //现在连接可读或者可写了,所以我们阻塞读取这么多数据,判断返回是否OK。 /* Delete the readable event, we no longer need it now that there is * the PING reply to read. */ aeDeleteFileEvent(server.el,fd,AE_READABLE); /* Read the reply with explicit timeout. */ buf[0] = '\0'; if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING, "I/O error reading PING reply from master: %s", strerror(errno)); goto error; } //·····
后面如果需要进行AUTH验证,就会给服务器发送AUTH指令验证身份:sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
二、发送SYNC请求RDB 快照
这个通过syncWrite发送一条SYNC指令过去,然后准备一个临时文件打开接收数据,将连接的可读事件设置为readSyncBulkPayload就行了。然后守卫工作将server.repl_state 这个小状态机设置为REDIS_REPL_TRANSFER,也就是准备接收RDB文件过程中。
//到这里的话,连接肯定成功了,而且PING指令也都收到了回复。所以果断发送SYNC指令 /* Issue the SYNC command */ if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } /* Prepare a suitable temp file for bulk transfer */ while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); goto error; } //SYNC命令已经发送了,以后的可读可写事件就依靠readSyncBulkPayload来读取解析了。 /* Setup the non blocking download of the bulk file. */ if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { redisLog(REDIS_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } server.repl_state = REDIS_REPL_TRANSFER;//进入数据传输阶段。
三、接收RDB文件
上面看到了,发送SYNC指令后,跟master的连接的可读事件设置为readSyncBulkPayload了,函数读取master发过来的RDB大小以及文件内容保存到本地文件中,如果读取完毕,那么调用rdbLoad加载文件内容。并考虑重新启动startAppendOnly。这个读取是异步的,所以如果需要,这个过程中redis还是可以处理请求。当然slave-serve-stale-data 得设置为on才行。
先读取RDB文件总大小:
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { //读取master发过来的RDB大小以及文件内容保存到本地文件中; //如果读取完毕,那么调用rdbLoad加载文件内容。并考虑重新启动startAppendOnly if (server.repl_transfer_size == -1) { if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { //````` //先获取RDB文件大小。 server.repl_transfer_size = strtol(buf+1,NULL,10);
然后就可以读取RDB文件内容了,其实就是一堆指令。注意redis为了避免阻塞,每次可读回调只读取16K的数据,然后写入RDB临时文件里面,写到一定大小,默认写死为REPL_MAX_WRITTEN_BEFORE_FSYNC 也就是8M,就进行一次刷磁盘的操作sync();避免到最后一次SYNC的时候直接卡死服务器。
/* Read bulk data */ left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); nread = read(fd,buf,readlen);//读取一次,有且仅有的读取一次。每次可读事件就读一次。 if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); replicationAbortSyncTransfer(); return; } server.repl_transfer_lastio = server.unixtime; if (write(server.repl_transfer_fd,buf,nread) != nread) {//写到临时文件里面去。 redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); goto error; } server.repl_transfer_read += nread;//更新读了的数目
如果读取完成了,那么就可以加载数据了。
四、rdbLoad()重新加载新的RDB文件
如果文件全部接收完毕,redis会先清空所有数据emptyDb,然后用rdbLoad加载RDB文件到内存中。设置连接为CONNECTED状态
/* Check if the transfer is now complete */ if (server.repl_transfer_read == server.repl_transfer_size) {//看看是否文件全部接收完毕,如果完毕,GOOD if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); replicationAbortSyncTransfer(); return; } redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); signalFlushedDb(-1); emptyDb(); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);//上面注释说了,避免循环进入。 //开始加载RDB文件到内存数据结构中,这个要花费不少时间的。 if (rdbLoad(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); replicationAbortSyncTransfer(); return; } /* Final setup of the connected slave <- master link */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); server.master = createClient(server.repl_transfer_s);//重新注册可读事件毁掉为readQueryFromClient server.master->flags |= REDIS_MASTER; server.master->authenticated = 1; server.repl_state = REDIS_REPL_CONNECTED; redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
当切换server.repl_state 为 REDIS_REPL_CONNECTED的时候,新来的查询请求就能够被处理了,在processCommand里面就不会过滤非STALE请求,同时本slave也能接受下一级slave的SYNC指令了。
后面redis会附带启动AOF,如果需要的话。
五、总结
redis主从同步代码比较简练,不多,但功能该有的都有,很赞的。下面说点缺点:
- 不能支持增量同步(这个即将发布的2.8版本已经解决,采用backlog的形式);
- 如果系统很大,好几十G的RDB文件,靠一个连接发送RDB文件的话估计得把人耗死,而且更悲剧的问题是:在做RDB快照,以及发送RDB问的过程中,所有客户端的写操作都会记录在内存中,这个对本来内存要求高的redis又增加了负担;
- 另外redis的扩容是个问题,那么大的数据量,加载一次RDB文件得好几个小时,简直无法忍受。
不过关于扩容作者在其博客 里面介绍了可用的方法:presharding。不多说,绝对经典。
另外redis的集群化正在开发中,可用在这里看到redis集群化的进度和概况:Redis cluster Specification (work in progress),代码里面也有最新的相关实现了,过段时间看看。
近期评论