Redis(九):主從複製的設計與實現解析
前面幾篇我們已經完全理解了redis的基本功能的實現了。
但單靠基本功能實現,往往還是稱不上優秀的專案的。畢竟,我們現在面對的都是複雜的環境,高併發的場景,大資料量的可能。
簡而言之,現在的系統一般都需要支援分散式部署,不存在單點問題,才算是一個合格的系統。
而redis作為一個儲存系統,單點問題肯定是不行的。
最簡單的,就是起碼得支援讀寫分離功能,因為我們面臨的許多問題,一般是面對大量的查詢問題。而要做到讀寫分離功能,就是要把主節點的資料同步到從節點上。從而可以讓從節點接受讀請求,以減輕主節點的讀壓力。
就讓我們來分析下 Redis 是如何進行主從同步資料的吧!主從同步,換個名稱也就是資料複製。
0. 主從複製的作用
資料冗餘:主從複製實現了資料的熱備份,是持久化之外的一種資料冗餘方式。
故障恢復:當主節點出現問題時,可以由從節點提供服務,實現快速的故障恢復;實際上是一種服務的冗餘。
負載均衡:在主從複製的基礎上,配合讀寫分離,可以由主節點提供寫服務,由從節點提供讀服務(即寫Redis資料時應用連線主節點,讀Redis資料時應用連線從節點),分擔伺服器負載;尤其是在寫少讀多的場景下,通過多個從節點分擔讀負載,可以大大提高Redis伺服器的併發量。
讀寫分離:可以用於實現讀寫分離,主庫寫、從庫讀,讀寫分離不僅可以提高伺服器的負載能力,同時可根據需求的變化,改變從庫的數量;
高可用基石:除了上述作用以外,主從複製還是哨兵和叢集能夠實施的基礎,因此說主從複製是Redis高可用的基礎。
1. Redis 主從複製簡介
在主從複製中,資料庫分為兩類,一類是主庫(master),另一類是同步主庫資料的從庫(slave)。主庫可以進行讀寫操作,當寫操作導致資料變化時會自動同步到從庫。而從庫一般是隻讀的(特定情況也可以寫,通過引數slave-read-only指定),並接受來自主庫的資料,一個主庫可擁有多個從庫,而一個從庫只能有一個主庫。這樣就使得redis的主從架構有了兩種模式:一類是一主多從如下圖1,二類是“鏈式主從複製”--主->從->主-從如下圖2。
2. Redis 主從複製的操作步驟簡略說明
1. 首先,你得有至少2個redis server 例項,單機多例項或者多機多例項皆可。
2. 配置主從關係,使用 slaveof master_host master_port; (config rewrite 可直接寫入配置檔案,避免每次都重新寫)
3. 驗證主從配置,使用 info Replication;
上面的操作步驟是進行實時操作的,也可以直接將 master/slave 配置放到 redis.conf 中,啟動時直接載入。
當master需要使用密碼進行訪問時,可以使用命令 masterauth 進行授權。
masterauth 123456 # 寫到redis.conf配置檔案中 config set masterauth 123456 # 通過命令列進行授權
3. 主要同步的實現原理
主從複製大致流程為:
1. slaveof 是我們的開啟方法,它會將master資訊寫入到從節點;
2. 然後與master進行建立連線;
3. 然後master決定複製方式是全量同步還是部分同步;
4. master進行資料準備;
5. 將需要同步的傳送給slave節點;
6. 從節點執行傳送過來的資料;
但是,我們需要進行深入理解。
3.1. slaveof 命令原始碼解析
slaveof 為我們操作開啟主從複製開啟了入口,其介面定義如下:
{"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
// 用法 slaveof <master_host> <master_port> 建立主從關係 // slaveof no one 取消主從同步 // replication.c void slaveofCommand(client *c) { /* SLAVEOF is not allowed in cluster mode as replication is automatically * configured using the current address of the master node. */ if (server.cluster_enabled) { addReplyError(c,"SLAVEOF not allowed in cluster mode."); return; } /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ // slaveof no one, 取消主從同步 if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { // 取消當前的master關聯,返回客戶端目前狀態資訊,結束 replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); } } else { long port; if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; /* Check if we are already attached to the specified slave */ // 只能和一個 master 建立主從關係 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); return; } /* There was no previous master or the user specified a different one, * we can continue. */ // 設定master資訊 replicationSetMaster(c->argv[1]->ptr, port); // 輸出client狀態資訊 sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')", server.masterhost, server.masterport, client); sdsfree(client); } addReply(c,shared.ok); } // 繫結新的master關聯 /* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) { sdsfree(server.masterhost); server.masterhost = sdsnew(ip); server.masterport = port; if (server.master) freeClient(server.master); // slave 不進行阻塞客戶端 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ // 斷開所有 slave 連線 disconnectSlaves(); /* Force our slaves to resync with us as well. */ // cacheMaster 丟棄 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ // 鏈式主從複製刪除 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ // 斷開正在連線slave請求 cancelReplicationHandshake(); server.repl_state = REPL_STATE_CONNECT; server.master_repl_offset = 0; server.repl_down_since = 0; } // 取消master關聯 /* Cancel replication, setting the instance as a master itself. */ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ sdsfree(server.masterhost); server.masterhost = NULL; if (server.master) { if (listLength(server.slaves) == 0) { /* If this instance is turned into a master and there are no * slaves, it inherits the replication offset from the master. * Under certain conditions this makes replicas comparable by * replication offset to understand what is the most updated. */ server.master_repl_offset = server.master->reploff; freeReplicationBacklog(); } freeClient(server.master); } replicationDiscardCachedMaster(); cancelReplicationHandshake(); server.repl_state = REPL_STATE_NONE; } // blocked.c, 解除所有的阻塞客戶端 /* Mass-unblock clients because something changed in the instance that makes * blocking no longer safe. For example clients blocked in list operations * in an instance which turns from master to slave is unsafe, so this function * is called when a master turns into a slave. * * The semantics is to send an -UNBLOCKED error to the client, disconnecting * it at the same time. */ void disconnectAllBlockedClients(void) { listNode *ln; listIter li; listRewind(server.clients,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (c->flags & CLIENT_BLOCKED) { addReplySds(c,sdsnew( "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> slave?)\r\n")); unblockClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } } // networking.c, 斷開所有的 slave 連線 /* Close all the slaves connections. This is useful in chained replication * when we resync with our own master and want to force all our slaves to * resync with us as well. */ void disconnectSlaves(void) { while (listLength(server.slaves)) { listNode *ln = listFirst(server.slaves); freeClient((client*)ln->value); } } // replication.c /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ void replicationDiscardCachedMaster(void) { if (server.cached_master == NULL) return; serverLog(LL_NOTICE,"Discarding previously cached master state."); server.cached_master->flags &= ~CLIENT_MASTER; freeClient(server.cached_master); server.cached_master = NULL; } // replication.c void freeReplicationBacklog(void) { serverAssert(listLength(server.slaves) == 0); zfree(server.repl_backlog); server.repl_backlog = NULL; } // replication.c /* This function aborts a non blocking replication attempt if there is one * in progress, by canceling the non-blocking connect attempt or * the initial bulk transfer. * * If there was a replication handshake in progress 1 is returned and * the replication state (server.repl_state) set to REPL_STATE_CONNECT. * * Otherwise zero is returned and no operation is perforemd at all. */ int cancelReplicationHandshake(void) { if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; } else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) { undoConnectWithMaster(); server.repl_state = REPL_STATE_CONNECT; } else { return 0; } return 1; } // networking.c /* Concatenate a string representing the state of a client in an human * readable format, into the sds string 's'. */ sds catClientInfoString(sds s, client *client) { char flags[16], events[3], *p; int emask; p = flags; if (client->flags & CLIENT_SLAVE) { if (client->flags & CLIENT_MONITOR) *p++ = 'O'; else *p++ = 'S'; } if (client->flags & CLIENT_MASTER) *p++ = 'M'; if (client->flags & CLIENT_MULTI) *p++ = 'x'; if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u'; if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; if (client->flags & CLIENT_READONLY) *p++ = 'r'; if (p == flags) *p++ = 'N'; *p++ = '\0'; emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd); p = events; if (emask & AE_READABLE) *p++ = 'r'; if (emask & AE_WRITABLE) *p++ = 'w'; *p = '\0'; // 可變引數定義: sds sdscatfmt(sds s, char const *fmt, ...) return sdscatfmt(s, "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s", (unsigned long long) client->id, getClientPeerId(client), client->fd, client->name ? (char*)client->name->ptr : "", (long long)(server.unixtime - client->ctime), (long long)(server.unixtime - client->lastinteraction), flags, client->db->id, (int) dictSize(client->pubsub_channels), (int) listLength(client->pubsub_patterns), (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) getClientOutputBufferMemoryUsage(client), events, client->lastcmd ? client->lastcmd->name : "NULL"); }
所以,slaveof 只是做簡單的驗證,然後設定了下 master 資訊,然後就返回了。那麼是誰在做同步的工作呢?
其實同步任務是由 cron 任務執行的。
3.2. 如何執行同步任務?
因為複製是比較耗效能的東西,如果和使用者執行緒共享處理過程的話,將可能引起併發效能的。所以,redis使用非同步 cron 任務的形式實現主從複製功能。
// server.c, 初始化server,註冊 cron void initServer(void) { ... /* Create out timers, that's our main way to process background * operations. */ // 新增 serverCron 到 eventLoop 中,以便後續可以執行定時指令碼 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } ... } // ae.c, 新增時間事件 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; } // server.c, 主指令碼執行入口, 每1秒執行1次 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ... /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ // 主從複製,連線 master,我們的入口 run_with_period(1000) replicationCron(); ... server.cronloops++; return 1000/server.hz; } // 重點入口: replicationCron() // replication.c, 主從複製定時指令碼 /* Replication cron function, called 1 time per second. */ void replicationCron(void) { static long long replication_cron_loops = 0; /* Non blocking connection timeout? */ // 連線超時處理,取消重連 if (server.masterhost && (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); cancelReplicationHandshake(); } /* Bulk transfer I/O timeout? */ // 傳輸資料超時,取消重連 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); cancelReplicationHandshake(); } /* Timed out master when we are an already connected slave? */ // slave 會話超時 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && (time(NULL)-server.master->lastinteraction) > server.repl_timeout) { serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); freeClient(server.master); } /* Check if we should connect to a MASTER */ // 3.2.1. 初次設定master時,一定會進行連線處理 if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == C_OK) { serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started"); } } /* Send ACK to master from time to time. * Note that we do not send periodic acks to masters that don't * support PSYNC and replication offsets. */ // 3.2.2. 每次定時任務執行,都會發生 ACK 給master if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC)) replicationSendAck(); /* If we have attached slaves, PING them from time to time. * So slaves can implement an explicit timeout to masters, and will * be able to detect a link disconnection even if the TCP connection * will not actually go down. */ listIter li; listNode *ln; robj *ping_argv[1]; /* First, send PING according to ping_slave_period. */ // 3.2.3. 傳送 PING 請求 // 預設 repl_ping_slave_period: 10 if ((replication_cron_loops % server.repl_ping_slave_period) == 0) { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); } /* Second, send a newline to all the slaves in pre-synchronization * stage, that is, slaves waiting for the master to create the RDB file. * The newline will be ignored by the slave but will refresh the * last-io timer preventing a timeout. In this case we ignore the * ping period and refresh the connection once per second since certain * timeouts are set at a few seconds (example: PSYNC response). */ // 3.2.4. 向以當前節點為master的slaves 傳送空行資料 listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)) { if (write(slave->fd, "\n", 1) == -1) { /* Don't worry, it's just a ping. */ } } } /* Disconnect timedout slaves. */ // 斷開連線超時的 slaves if (listLength(server.slaves)) { listIter li; listNode *ln; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate != SLAVE_STATE_ONLINE) continue; if (slave->flags & CLIENT_PRE_PSYNC) continue; if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { serverLog(LL_WARNING, "Disconnecting timedout slave: %s", replicationGetSlaveName(slave)); freeClient(slave); } } } /* If we have no attached slaves and there is a replication backlog * using memory, free it after some (configured) time. */ // 如果沒有slave 跟隨當前節點,一段時間後將backlog 釋放掉 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && server.repl_backlog) { time_t idle = server.unixtime - server.repl_no_slaves_since; if (idle > server.repl_backlog_time_limit) { freeReplicationBacklog(); serverLog(LL_NOTICE, "Replication backlog freed after %d seconds " "without connected slaves.", (int) server.repl_backlog_time_limit); } } /* If AOF is disabled and we no longer have attached slaves, we can * free our Replication Script Cache as there is no need to propagate * EVALSHA at all. */ if (listLength(server.slaves) == 0 && server.aof_state == AOF_OFF && listLength(server.repl_scriptcache_fifo) != 0) { replicationScriptCacheFlush(); } /* If we are using diskless replication and there are slaves waiting * in WAIT_BGSAVE_START state, check if enough seconds elapsed and * start a BGSAVE. * * This code is also useful to trigger a BGSAVE if the diskless * replication was turned off with CONFIG SET, while there were already * slaves in WAIT_BGSAVE_START state. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; int slaves_waiting = 0; int mincapa = -1; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; slaves_waiting++; mincapa = (mincapa == -1) ? slave->slave_capa : (mincapa & slave->slave_capa); } } // 3.2.5. 如果有等待同步的slave, 且等待時間超過 server.repl_diskless_sync_delay, 預設是: 5s if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { /* Start a BGSAVE. Usually with socket target, or with disk target * if there was a recent socket -> disk config change. */ startBgsaveForReplication(mincapa); } } /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ // 重新整理本節點的 從健康節點 數量,以便在需要確保多少節點時才進行寫入的場景判定 refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ }
以上,就是整個主從複製的主體框架了。且以上程式碼包含了兩種角色的執行機制。1: master 的執行; 2. slave 的執行;
slave 的執行過程如下:
1. 從節點每秒執行一次定時任務;
2. 當定時任務發現存在新的主節點後,會呼叫 connectWithMaster() 嘗試與master節點建立網路連線;
3. 建立連線後,由 syncWithMaster() 進行處理後續同步事務;
4. 各種連線超時釋放處理;
master 的執行過程如下:
1. 各種連線超時釋放處理;
2. 定期進行 PING slave 操作;
3. 向slave寫入一個空行,相當於ping操作與slave續租期;
4. 清理連線超時的slaves, 如果一個slave也沒有, 則直接把backlog釋放掉;
5. 如果未開啟磁碟持久化操作,且有等待同步的slaves, 則主動開啟一個 bgsave;
從上面的框架中,可以說大部分時候都是在處理各種異常問題和續期問題,但是實際最重要的一個連線master操作卻只有一行程式碼。那麼slave連線master之後,是如何進行後續的同步的呢?好像這個定時任務的執行並沒有太大的作用呢!
3.3. 從節點如何處理同步操作?
從節點是整個同步操作的操控者,整個同步可以說都是其主導的。從上一節的過程,我們可以看到,只有一個連線master的只剩,所以必定許多工作要這裡完成。
實際上,slave連線到master的請求實現,基於 epoll 模型的非同步操作,所以,在主框架中,我們只看到一個連線操作。因為連線完成後的操作,是非同步執行的。
// replication.c, 連線請求到 master 節點 int connectWithMaster(void) { int fd; // 建立socket fd fd = anetTcpNonBlockBestEffortBindConnect(NULL, server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); if (fd == -1) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return C_ERR; } // 使用epoll模型進行非同步連線 // 連線成功後,由 syncWithMaster 進行事件處理 // 關注 讀寫事件 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); serverLog(LL_WARNING,"Can't create readable event for SYNC"); return C_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; // 狀態變更,以便下次不會再進行連線 server.repl_state = REPL_STATE_CONNECTING; return C_OK; } // anet.c, 建立一個非阻塞的socket連線 int anetTcpNonBlockBestEffortBindConnect(char *err, char *addr, int port, char *source_addr) { // ANET_CONNECT_BE_BINDING 代表將進行重試儘可能建立連線 return anetTcpGenericConnect(err,addr,port,source_addr, ANET_CONNECT_NONBLOCK|ANET_CONNECT_BE_BINDING); } // 與master連線成功後,由 syncWithMaster 進行處理後續事務 // replication.c void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err = NULL; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); UNUSED(el); UNUSED(privdata); UNUSED(mask); /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ if (server.repl_state == REPL_STATE_NONE) { close(fd); return; } /* Check for errors in the socket. */ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { serverLog(LL_WARNING,"Error condition on socket for SYNC: %s", strerror(sockerr)); goto error; } /* Send a PING to check the master is able to reply without errors. */ if (server.repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REPL_STATE_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ // 傳送一個 PING 出去,檢查 master 是否可以響應 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); if (err) goto write_error; return; } /* Receive the PONG command. */ if (server.repl_state == REPL_STATE_RECEIVE_PONG) { // 同步讀取PING結果 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. * Note that older versions of Redis replied with "operation not * permitted" instead of using a proper error code, so we test * both. */ // 沒有許可權且提示不是請授權類的提示,則發生錯誤 // 沒有呼叫 auth 前 // -NOAUTH, 代表未授權, 可以進入下一步授權操作 if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) { serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); sdsfree(err); goto error; } else { serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); } sdsfree(err); server.repl_state = REPL_STATE_SEND_AUTH; } /* AUTH with the master if required. */ // 需要輸入master密碼狀態 if (server.repl_state == REPL_STATE_SEND_AUTH) { if (server.masterauth) // 傳送授權命令 // AUTH master_password err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH; return; } else { server.repl_state = REPL_STATE_SEND_PORT; } } /* Receive AUTH reply. */ if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { // 授權響應,讀取結果 // 授權成功響應 +OK, 其他授權失敗 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (err[0] == '-') { serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); server.repl_state = REPL_STATE_SEND_PORT; } /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ // 傳送埠號給master, 以便master可以列舉出所有slave的埠號 if (server.repl_state == REPL_STATE_SEND_PORT) { sds port = sdsfromlonglong(server.port); // 傳送本節點的埠給 master // 命令: REPLCONF listening-port port err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", "listening-port",port, NULL); sdsfree(port); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_PORT; return; } /* Receive REPLCONF listening-port reply. */ if (server.repl_state == REPL_STATE_RECEIVE_PORT) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ // 忽略失敗情況,影響不大,只是個展示問題,且並非所有版本都支援該命令 if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF listening-port: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_CAPA; } /* Inform the master of our capabilities. While we currently send * just one capability, it is possible to chain new capabilities here * in the form of REPLCONF capa X capa Y capa Z ... * The master will ignore capabilities it does not understand. */ if (server.repl_state == REPL_STATE_SEND_CAPA) { // 傳送命令: REPLCONF capa eof err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", "capa","eof",NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA; return; } /* Receive CAPA reply. */ if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF capa. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); } sdsfree(err); // 可以進行資料同步了 PSYNC server.repl_state = REPL_STATE_SEND_PSYNC; } /* Try a partial resynchonization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC * to start a full resynchronization so that we get the master run id * and the global offset, to try a partial resync at the next * reconnection attempt. */ if (server.repl_state == REPL_STATE_SEND_PSYNC) { // 嘗試進行部分同步, 可能為 全量同步、部分同步、或者命令不支援 // PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_FULLRESYNC, PSYNC_NOT_SUPPORTED if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC; return; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { serverLog(LL_WARNING,"syncWithMaster(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); goto error; } // 讀取 PSYNC 結果 // PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_FULLRESYNC, PSYNC_NOT_SUPPORTED psync_result = slaveTryPartialResynchronization(fd,1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Note: if PSYNC does not return WAIT_REPLY, it will take care of * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); return; } /* PSYNC failed or is not supported: we want our slaves to resync with us * as well, if we have any (chained replication case). The mater may * transfer us an entirely different data set and we have no way to * incrementally feed our slaves after that. */ // 不能使用 PSYNC 進行同步,斷開當前節點的 slaves // 不允許鏈式主從 disconnectSlaves(); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the server.repl_master_runid and repl_master_initial_offset are * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE,"Retrying with SYNC..."); // 不支援 PSYNC, 降級為 SYNC if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } } /* Prepare a suitable temp file for bulk transfer */ // 準備從rdb檔案中讀取資料,最多重試5次(共5s) // 臨時檔名: temp-<1560888xxx>.<pid>.rdb while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); goto error; } /* Setup the non blocking download of the bulk file. */ // 使用 epoll 模型進行非同步接收master傳送過來的rdb檔案 // 由 readSyncBulkPayload 函式進行結果處理 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } // 儲存同步狀態 server.repl_state = REPL_STATE_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); close(fd); server.repl_transfer_s = -1; server.repl_state = REPL_STATE_CONNECT; return; write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err); sdsfree(err); goto error; }
整個連線成功之後的處理過程還是比較繁雜的,主要邏輯就在 syncWithMaster,主要是在各個狀態之間的轉換,尤其頭疼,不過幸好都是流水式的一步步下來。
1. REPL_STATE_CONNECTING: 待連線狀態. slave 傳送 PING命令進行主動連線, 然後將狀態置為 REPL_STATE_RECEIVE_PONG;
2. REPL_STATE_RECEIVE_PONG: 待master響應狀態. slave同步等待結果(其實一般會立即獲取到,因為epoll已經準備好,才會呼叫此狀態),判斷是否PING正常後, 將狀態置為 REPL_STATE_SEND_AUTH;
3. REPL_STATE_SEND_AUTH: 等待授權狀態. slave 傳送 auth passwd 給master後, 將狀態置為 REPL_STATE_RECEIVE_AUTH;
4. REPL_STATE_RECEIVE_AUTH: 等待授權響應狀態. slave同步等待結果, 判斷授權通過後, 將狀態置為 REPL_STATE_SEND_PORT;
5. REPL_STATE_SEND_PORT: 待發送埠狀態. slave傳送自身的服務埠給master以便master展示使用, 然後將狀態置為 REPL_STATE_RECEIVE_PORT;
6. REPL_STATE_RECEIVE_PORT: 等待埠傳送結果. 不論結果如何, 直接將狀態置為 REPL_STATE_SEND_CAPA;
7. REPL_STATE_SEND_CAPA: 等待發送capa命令狀態. 傳送 REPLCONF capa eof 後, 將狀態置為 REPL_STATE_RECEIVE_CAPA;
8. REPL_STATE_RECEIVE_CAPA: 等待capa命令傳送結果. 不論結果如何, 將狀態置為 REPL_STATE_SEND_PSYNC;
9. REPL_STATE_SEND_PSYNC: 等待PSYNC同步命令狀態. 嘗試使用PSYNC進行部分複製,結果可能是全量複製或部分複製,也可能使用其他版本命令執行, 將狀態置為 REPL_STATE_RECEIVE_PSYNC;
10. REPL_STATE_RECEIVE_PSYNC: 等待PSYNC結果. 這是真正接收資料的時候, 是終態, 根據上一次命令的請求方式,接收相應結果進一步處理;
11. 重新註冊一個 epoll 事件,用於接收master傳輸過來的資料,處理方法為 readSyncBulkPayload();
接下來,我們先看看嘗試部分時都做了哪些事,因為這決定了是使用全量複製還是部分複製:
// 嘗試進行部分同步 // replication.c int slaveTryPartialResynchronization(int fd, int read_reply) { char *psync_runid; char psync_offset[32]; sds reply; /* Writing half */ // 第一次呼叫時, read_reply=0, 即是寫動作 // 向 master 寫入 PSYNC psync_runid psync_offset // 即是每次都拉取一部分資料吧 if (!read_reply) { /* Initially set repl_master_initial_offset to -1 to mark the current * master run_id and offset as not valid. Later if we'll be able to do * a FULL resync using the PSYNC command we'll set the offset at the * right value, so that this information will be propagated to the * client structure representing the master into server.master. */ server.repl_master_initial_offset = -1; // 如果已經建立了連線,則 psync_runid, psync_offset 都是可預知的 // 否則 psync_runid = "?", psync_offset="-1"; if (server.cached_master) { psync_runid = server.cached_master->replrunid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); } else { serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); psync_runid = "?"; memcpy(psync_offset,"-1",3); } /* Issue the PSYNC command */ // 首次傳送命令 PSYNC ? -1 // 後續使用實際的資訊 PSYNC psync_runid psync_offset reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL); if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); aeDeleteFileEvent(server.el,fd,AE_READABLE); return PSYNC_WRITE_ERROR; } return PSYNC_WAIT_REPLY; } /* Reading half */ // 讀取 PSYNC 的結果 reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (sdslen(reply) == 0) { /* The master may send empty newlines after it receives PSYNC * and before to reply, just to keep the connection alive. */ sdsfree(reply); return PSYNC_WAIT_REPLY; } aeDeleteFileEvent(server.el,fd,AE_READABLE); // +FULLRESYNC 代表需要進行全量複製,否則進行部分複製 // +FULLRESYNC runid offset if (!strncmp(reply,"+FULLRESYNC",11)) { char *runid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the run id * and the replication offset. */ runid = strchr(reply,' '); if (runid) { runid++; offset = strchr(runid,' '); if (offset) offset++; } // runid 長度為 40 if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master * runid to make sure next PSYNCs will fail. */ memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1); } else { memcpy(server.repl_master_runid, runid, offset-runid-1); server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0'; server.repl_master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", server.repl_master_runid, server.repl_master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ // 全量同步,重置master快取 replicationDiscardCachedMaster(); sdsfree(reply); return PSYNC_FULLRESYNC; } // 部分複製的情況下,只會返回 +CONTINUE if (!strncmp(reply,"+CONTINUE",9)) { /* Partial resync was accepted, set the replication state accordingly */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); // 立即將結果釋放,那什麼時候處理結果呢? sdsfree(reply); // 實際上通過該方法同步資料的 replicationResurrectCachedMaster(fd); // 繼續使用 部分同步 return PSYNC_CONTINUE; } /* If we reach this point we received either an error since the master does * not understand PSYNC, or an unexpected reply from the master. * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ // PSYNC 不支援,因處理為降級版本 if (strncmp(reply,"-ERR",4)) { /* If it's not an error, log the unexpected event. */ serverLog(LL_WARNING, "Unexpected reply to PSYNC from master: %s", reply); } else { serverLog(LL_NOTICE, "Master does not support PSYNC or is in " "error state (reply: %s)", reply); } sdsfree(reply); replicationDiscardCachedMaster(); return PSYNC_NOT_SUPPORTED; }
通過上面的過程,我們可以看清了整個與master是如何協調進行同步的,主要依賴於 PSYNC 的返回值決定。也可以看到,全量同步功能時,註冊了一個可讀事件的監聽,具體處理使用 readSyncBulkPayload 進行承載。
3.4. 全量同步資料的實現方式
通過前面的分析,我們看到全量同時時,註冊了一個FileEvent事件,依賴於epoll實現非同步操作。具體處理由 readSyncBulkPayload() 進行處理。它負責非同步讀取master 同步過來的資料,寫入aof檔案,載入到slave的資料庫中。具體如下:
// replication.c /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[4096]; ssize_t nread, readlen; off_t left; UNUSED(el); UNUSED(privdata); UNUSED(mask); /* Static vars used to hold the EOF mark, and the last bytes received * form the server: when they match, we reached the end of the transfer. */ static char eofmark[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE]; static int usemark = 0; /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ // 先讀取資料長度 if (server.repl_transfer_size == -1) { if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING, "I/O error reading bulk count from MASTER: %s", strerror(errno)); goto error; } if (buf[0] == '-') { serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf+1); goto error; } else if (buf[0] == '\0') { /* At this stage just a newline works as a PING in order to take * the connection live. So we refresh our last interaction * timestamp. */ server.repl_transfer_lastio = server.unixtime; return; } else if (buf[0] != '$') { serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } /* There are two possible forms for the bulk payload. One is the * usual $<count> bulk format. The other is used for diskless transfers * when the master does not know beforehand the size of the file to * transfer. In the latter case, the following format is used: * * $EOF:<40 bytes delimiter> * * At the end of the file the announced delimiter is transmitted. The * delimiter is long and random enough that the probability of a * collision with the actual file content can be ignored. */ if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) { usemark = 1; memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE); memset(lastbytes,0,CONFIG_RUN_ID_SIZE); /* Set any repl_transfer_size to avoid entering this code path * at the next call. */ server.repl_transfer_size = 0; serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving streamed RDB from master"); } else { usemark = 0; // 讀取資料長度, 寫入 server.repl_transfer_size, 後續判斷是否取完整資料 server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving %lld bytes from master", (long long) server.repl_transfer_size); } return; } /* Read bulk data */ if (usemark) { readlen = sizeof(buf); } else { left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); } nread = read(fd,buf,readlen); if (nread <= 0) { serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(); return; } server.stat_net_input_bytes += nread; /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ int eof_reached = 0; if (usemark) { /* Update the last bytes array, and check if it matches our delimiter.*/ // 更新 最後幾個字元 if (nread >= CONFIG_RUN_ID_SIZE) { memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); } else { int rem = CONFIG_RUN_ID_SIZE-nread; memmove(lastbytes,lastbytes+nread,rem); memcpy(lastbytes+rem,buf,nread); } if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; } server.repl_transfer_lastio = server.unixtime; // 將資料寫入到 temp rdb 檔案中 if (write(server.repl_transfer_fd,buf,nread) != nread) { serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); goto error; } server.repl_transfer_read += nread; /* Delete the last 40 bytes from the file if we reached EOF. */ if (usemark && eof_reached) { if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) { serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); goto error; } } /* Sync data on disk from time to time, otherwise at the end of the transfer * we may suffer a big delay as the memory buffers are copied into the * actual disk. */ // 緩衝達到一定值後,直接刷盤 // REPL_MAX_WRITTEN_BEFORE_FSYNC: 8M if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); server.repl_transfer_last_fsync_off += sync_size; } /* Check if the transfer is now complete */ // 傳輸完成 if (!usemark) { if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; } if (eof_reached) { // 直接將臨時 rdb 檔案改名為正式的 rdb 檔案,從而實現資料替換 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); cancelReplicationHandshake(); return; } serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); // 清空原來的資料,刷入新資料 signalFlushedDb(-1); emptyDb( -1, server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, replicationEmptyDbCallback); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); // 重新載入 rdb 檔案,從而完成同步操作 if (rdbLoad(server.rdb_filename) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); return; } /* Final setup of the connected slave <- master link */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); // 設定 master 資訊,以便下次直接使用 replicationCreateMasterClient(server.repl_transfer_s); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (server.aof_state != AOF_OFF) { int retry = 10; // 重新關聯 aof 檔案,以便後續寫入aof正常 stopAppendOnly(); while (retry-- && startAppendOnly() == C_ERR) { serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); sleep(1); } if (!retry) { serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); exit(1); } } } return; error: cancelReplicationHandshake(); return; }
以上就是全量複製功能實現了,大體步驟為:
1. 先讀取整體資料長度;(肯定是master發來的資料了)
2. 依次讀取就緒資料,將其定入臨時aof檔案 temp-<unixtime>.<pid>.aof;
3. 達到一定緩衝數量後,強制刷盤;
4. master 傳輸完成後,slave將臨時aof檔案重新命名為正式的aof檔案;
5. slave 清空原來db資料;
6. 禁用aof檔案的監聽,載入新的aof資料,重新開啟監聽;
7. aof 先停止再啟動,重新關聯新檔案;
3.5. 部分複製的實現
前面我們看到有個 slaveTryPartialResynchronization(), 是做部分同步檢測的,但是它只會返回幾個狀態,好像返回後都沒有做什麼後續處理。只有全量同步時,我們看到了如上邏輯。那麼部分同步是如何實現的呢?其中有個 +CONTINUE 的狀態值得我們注意:
... // 部分複製的情況下,只會返回 +CONTINUE if (!strncmp(reply,"+CONTINUE",9)) { /* Partial resync was accepted, set the replication state accordingly */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); // 立即將結果釋放,那什麼時候處理結果呢? sdsfree(reply); // 實際上通過該方法同步資料的 replicationResurrectCachedMaster(fd); // 繼續使用 部分同步 return PSYNC_CONTINUE; } ...
就這上面這個,返回 CONTINUE 後,外部邏輯只是返回,所以肯定是 replicationResurrectCachedMaster() 做了處理。而這個處理,應該是讀取後續的資料沒錯了!
// replication.c, 使用 cacheMaster 做 PSYNC 處理複製資料 /* Turn the cached master into the current master, using the file descriptor * passed as argument as the socket for the new master. * * This function is called when successfully setup a partial resynchronization * so the stream of data that we'll receive will start from were this * master left. */ void replicationResurrectCachedMaster(int newfd) { server.master = server.cached_master; server.cached_master = NULL; server.master->fd = newfd; server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); server.master->authenticated = 1; server.master->lastinteraction = server.unixtime; server.repl_state = REPL_STATE_CONNECTED; /* Re-add to the list of clients. */ listAddNodeTail(server.clients,server.master); // 新增file事件,epoll事件, 由 readQueryFromClient 進行事件處理 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, readQueryFromClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ // 如果有待發送資料,建立一個 寫的 fileEvent 事件 if (clientHasPendingReplies(server.master)) { if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } } } // 接下來,我們檢視下 當master傳送資料過來時,部分複製是如何實現的 // networking.c, 從 master 中讀取資料, privdata = server.master void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); // PROTO_IOBUF_LEN: 1024*16 // PROTO_MBULK_BIG_ARG: 1024*32 readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 讀取請求命令 nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->reploff += nread; server.stat_net_input_bytes += nread; // 超出最大限制,不處理 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } // 處理 querybuf 資料, 其實就和普通的客戶端寫請求一樣的處理方式 processInputBuffer(c); }
處理master 部分同步過來的資料,重新在 slave 執行一次即可,基於epoll的事件監聽,可以持續處理同步資料。
所以,部分複製,其實就是重新在slave端執行與master相同的請求就好了。這個processInputBuffer()過程在前面的文章已經介紹過。
3.6. PSYNC 命令實現原理
從上面可以看出,PSYNC是整個主從複製過程的重要操作,那麼 PSYNC 都是怎麼實現的呢?大體上應該是一個範圍查詢響應的過程,但是細節必然很多。我們可以先自己想想,要處理的點大概有哪些呢?
1. 第一次呼叫時,即 PSYNC ? -1 如何處理?
2. 後續呼叫時 即 PSYNC psync_runid psync_offset 如何處理?
3. 響應結構是如何的?比如如何響應+CONTINUE?
我們就通過原始碼來解答這些問題吧!
首先是 PSYNC 的定義: 可以看到,sync 和 psync 居然是一樣的實現?
// 差別是 sync 的引數只有一個,而 psync 的引數是3個 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
具體實現:
// 用法: PSYNC run_id offset // replication.c /* SYNC and PSYNC command implemenation. */ void syncCommand(client *c) { /* ignore SYNC if already slave or in monitor mode */ // SYNC 命令只能呼叫成功一次,後續就直接忽略了 if (c->flags & CLIENT_SLAVE) return; /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { addReplyError(c,"Can't SYNC while not connected with my master"); return; } /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ // 還有輸出未完成時不能再進行處理 if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } serverLog(LL_NOTICE,"Slave %s asks for synchronization", replicationGetSlaveName(c)); /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however * when this happens masterTryPartialResynchronization() already * replied with: * * +FULLRESYNC <runid> <offset> * * So the slave knows the new runid and offset to try a PSYNC later * if the connection with the master is lost. */ // 事實上,psync 和 sync 的實現還是區別對待的 // psync 將會優先嚐試部分複製 if (!strcasecmp(c->argv[0]->ptr,"psync")) { // 部分複製將不會重置 flags, 即每次 psync 都會成功執行 if (masterTryPartialResynchronization(c) == C_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { char *master_runid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * runid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_runid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= CLIENT_PRE_PSYNC; } // 以下為全量複製 /* Full resynchronization. */ server.stat_sync_full++; /* Setup the slave as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the slave differently. */ c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; // 新增slave 到master的從節點集合中, 設定 SLAVE 標識,表示已執行過 SYNC 操作 c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c); /* CASE 1: BGSAVE is in progress, with disk target. */ // 如果 rdb 儲存已在進行中,即 BGSAVE 已經在執行 // 此種是對於後來進行主從同步的客戶端,只需告知正在執行 BGSAVE 即可 if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a goo