1. 程式人生 > >Redis復制實現原理

Redis復制實現原理

www long 封裝 amp col 核心 做什麽 數據 slave

摘要

我的前一篇文章《Redis 復制原理及特性》已經介紹了Redis復制相關特性,這篇文章主要在理解Redis復制相關源碼的基礎之上介紹Redis復制的實現原理。

Redis復制實現原理

應用場景化

為了更好地表達與理解,我們先舉個實際應用場景例子來看看Redis復制是怎麽工作的,我們先啟動一臺master:

$ ./redis-server --port 8000

然後啟動一個redis客戶端和上面那臺監聽8000端口的Redis實例連接:

$ ./redis-cli -p 8000

我們向redis寫一個數據:

127.0.0.1:8000> set msg doni
OK
127.0.0.1:8000> get msg
"doni"

於是我們可以假設以下場景:

我們有一臺master實例master,master已經處於正常工作的狀態,接受讀寫請求,這個時候由於單臺機器的壓力過大,我們想再啟動一個Redis實例來分擔master的讀壓力,假設我們新啟動的這個實例叫slave。

已知M1的IP為127.0.0.1,端口為:8000

首先我們先啟動redis實例,同時啟動一個客戶端連接這個實例:

$ ./redis-server --port 8001 

$ ./redis-cli -p 8001

這個時候slave是沒有數據的:

127.0.0.1:8001> get msg
(nil)

我們可以用下面命令來讓slave和master進行復制:

127.0.0.1:8001> slaveof 127.0.0.1 8000

於是,slave就獲得了master上寫的數據了:

127.0.0.1:8001> get msg
"doni"

上面的例子和很直觀也很簡單,下面我們就在腦海中緩存這個應用場景,來看看redis是如何實現復制的。

處理slaveof

我們首先需要看看slave接收到客戶端的slaveof命令是如何處理的,下面是slave接收到客戶端的slaveof命令的處理流程圖:

技術分享圖片

slaveof命令處理流程圖

解釋下上圖,redis實例接收到客戶端的slaveof命令後的處理流程大致如下:

  1. 判斷當前模式是否為cluster,如果是則不支持復制。
  2. 判斷命令是否為slave‘of no one,如果是,這表明客戶端把當前實例設置為master。
  3. 如果客戶端指定了host和port,則將host和port設置為當前的master信息。
  4. 將當前實例的復制狀態設置為REPL_STATE_CONNECT。

除了上面的幾個大步驟之外,在第二步和第三步之間還做了下面一些事情:

  1. 釋放之前被阻塞的客戶端,這些通常是使用Redis阻塞列表而被阻塞的客戶端。
  2. 斷開當前實例的所有slave。
  3. 清除緩存的master信息。
  4. 釋放backlog,backlog是堆積環形緩沖區。
  5. 取消正在進行的握手過程。

上面就是Redis處理slaveof命令的大致流程,誒,好像並沒有做關於復制的事情誒。別急,如果看過我的另一篇《Redis網絡架構及單線程模型》文章的同學都應該知道redis的單線線程模型,這裏slaveof命令處理關鍵的一步已經將當前redis實例的復制狀態設置為了REPL_STATE_CONNECT狀態,在redis的eventloop裏面自然會對處於這個狀態的redis實例進行處理。

連接master

復制異步處理的觸發邏輯一方面是I/O事件驅動的一部分,另一方面就是eventloop對時間事件處理的一部分,其實也是定時任務,redis定時任務最外面一層是serverCron方法,serverCron方法囊括了其他幾乎所有定時處理邏輯的入口,可以列個不完全列表如下:

  1. 過期key處理。
  2. 軟件watchdog。
  3. 更新統計信息。
  4. rehash。
  5. 觸發備份RDB文件或者AOF重寫邏輯。
  6. 客戶端超時處理。
  7. 復制邏輯。
  8. ……

我們這裏只關心復制邏輯,調用代碼如下:

run_with_period(1000) replicationCron();

run_with_period方法是redis封裝的一個幫助方法,最然serverCron的調用頻率很高,是1毫秒一次:

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can‘t create event loop timers.");
        exit(1);
}

但是redis通過run_with_period實現了可以並不是每隔1毫秒必須要執行所有邏輯,run_with_period方法指定了具體的執行時間間隔。上面可以看出,redis主進程大概是1000毫秒也就是1秒鐘執行一次replicationCron邏輯,replicationCron做什麽事情呢,它做的事情很多,我們只關心本文的主線邏輯:

if (server.repl_state == REPL_STATE_CONNECT) {
   if (connectWithMaster() == C_OK) {
       serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
   }
}

如果當前實例的復制狀態為REPL_STATE_CONNECT,我們就會嘗試著連接剛才slaveof指定的master,連接master的主要實現在connectWithMaster裏面,connectWithMaster的邏輯相對簡單一些,大致做了下面三件事情:

  1. 和指定的master建立連接,獲取master的socket句柄,即fd。
  2. 註冊fd的讀寫事件,事件處理器為syncWithMaster。
  3. 設置當前實例的復制狀態為REPL_STATE_CONNECTING。

握手機制

上面已經註冊了當前實例和master的讀寫I/O事件即事件處理器,由於I/O事件分離相關邏輯都由系統框架完成,也就是eventloop,因此我們可以直接看當前實例針對master連接的I/O處理實現部分,也就是syncWithMaster處理器。

syncWithMaster主要實現了當前實例和master之間的握手協議,核心是賦值狀態遷移,我們可以用下面一張圖表示:

技術分享圖片

slave和msater的握手機制

上圖為slave在syncWithMaster階段做的事情,主要是和master進行握手,握手成功之後最後確定復制方案,中間涉及到遷移的狀態集合如下:

#define REPL_STATE_CONNECTING 2 /* 等待和master連接 */
/* --- 握手狀態開始 --- */
#define REPL_STATE_RECEIVE_PONG 3 /* 等待PING返回 */
#define REPL_STATE_SEND_AUTH 4 /* 發送認證消息 */
#define REPL_STATE_RECEIVE_AUTH 5 /* 等待認證回復 */
#define REPL_STATE_SEND_PORT 6 /* 發送REPLCONF信息,主要是當前實例監聽端口 */
#define REPL_STATE_RECEIVE_PORT 7 /* 等待REPLCONF返回 */
#define REPL_STATE_SEND_CAPA 8 /* 發送REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 9 /* 等待REPLCONF返回 */
#define REPL_STATE_SEND_PSYNC 10 /* 發送PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 11 /* 等待PSYNC返回 */
/* --- 握手狀態結束 --- */
#define REPL_STATE_TRANSFER 12 /* 正在從master接收RDB文件 */

當slave向master發送PSYNC命令之後,一般會得到三種回復,他們分別是:

  • +FULLRESYNC:不好意思,需要全量復制哦。
  • +CONTINUE:嘿嘿,可以進行增量同步。
  • -ERR:不好意思,目前master還不支持PSYNC。

當slave和master確定好復制方案之後,slave註冊一個讀取RDB文件的I/O事件處理器,事件處理器為readSyncBulkPayload,然後將狀態設置為REPL_STATE_TRANSFER,這基本就是syncWithMaster的實現。

處理PSYNC

全量還是增量

我們已經知道slave是怎麽同master建立連接,怎麽和master進行握手的了,那麽master那邊是什麽情況呢,master在與slave握手之後,對於psync命令處理的秘密都在syncCommand方法裏面,syncCommand方法實際包括兩個命令處理的實現,一個是sync,一個是psync。我們繼續看看,master對slave的psync的請求處理,如果當前請求不滿足psync的條件,則需要進行全量復制,滿足psync的條件有兩個,一個是slave帶來的runid是否為當前master的runid:

if (strcasecmp(master_runid, server.runid)) {
        //如果slave帶來的runid“?”,說明slave想要強制走全量復制
        if (master_runid[0] != ‘?‘) {
            serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                "Runid mismatch (Client asked for runid ‘%s‘, my runid is ‘%s‘)",
                master_runid, server.runid);
        } else {
            serverLog(LL_NOTICE,"Full resync requested by slave %s",
                replicationGetSlaveName(c));
        }
        goto need_full_resync;
}

如果不是,則需要全量同步。第二個條件即當前slave帶來的復制offset,master在backlog中是否還能找到:

if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       C_OK) goto need_full_resync;
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        if (psync_offset > server.master_repl_offset) {
         //警告:slave帶過來的offset不滿足增量復制的條件
        }
        goto need_full_resync;
 }

如果找不到,不好意思,還是需要全量復制的,如果兩個條件都滿足,master會告訴slave可以增量復制,回復+CONTINUE消息。

復制是否正在進行

如果在當前slave執行復制請求之前,恰好已經有其他的slave已經請求過了,且master這個時候正在進行子進程傳輸(包括RDB文件備份和socket傳輸),那麽分下面兩種情況處理:

  1. 如果復制方式是RDB disk方式,則找到當前master狀態為SLAVE_STATE_WAIT_BGSAVE_END的slave,復制這個slave的offset到當前slave的offset,這是為了當子進程完成RDB文件備份之後, 當前請求復制的slave可以和之前的slave一起進行master的復制操作。
  2. 如果復制方式是Diskless方式,則當前進來的slave並不會向上面那個slave這麽幸運了,因為基於socket的復制已經正在進行了,當前slave只能參與下一輪的子進程復制,且狀態為SLAVE_STATE_WAIT_BGSAVE_START。

如果沒有子進程正在復制,這裏針對RDB disk方式和diskless方式,又要分兩種情況討論:

  1. 如果是RDB disk方式,則啟動子進程進行RDB文件備份。
  2. 如果是diskless方式,則等待一段時間,也是為了盡可能讓後面的具有復制請求的slave一起進來,參與這一輪復制,復制開始由定時任務異步啟動復制。

子進程結束後處理

RDB disk方式,當子進程備份RDB文件完畢,什麽時候開始發送給slave的呢?diskless方式當子進程傳輸完畢,接下來又做什麽呢?對於RDB disk的方式,這裏涉及到一個I/O事件註冊的過程,也是由serverCron驅動的,當子進程結束之後,主進程會得知,然後通過backgroundSaveDoneHandler處理器來進行處理,針對RDB disk類型和diskless類型的復制,處理邏輯是不一樣的,我們分別來看看。

RDB disk方式後處理

對於RDB disk復制方式,後處理主要是註冊向slave發送RDB文件的處理器sendBulkToSlave:

if (aeCreateFileEvent(server.el, slave->fd, 
                    AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                    freeClient(slave);
                    continue;
 }

然後RDB的文件發送由sendBulkToSlave處理器來完成,master對於RDB文件發送完畢之後會把slave的狀態設置為:online。這裏需要註意的是,在把slave設置為online狀態之後會註冊寫處理器,將堆積在reply的數據發送給slave:

if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
        sendReplyToClient, slave) == AE_ERR) {
        freeClient(slave);
        return;
}

這部分的內容即為RDB文件開始備份到發送給slave結束這段時間的增量數據,因此需要註冊I/O事件處理器,將這段時間累積的內容發送給slave,最終保持數據一致。

diskless方式後處理

diskless方式的後處理不同的是當子進程結束的時候,其實RDB文件已經傳輸完成了,而且其中做了些事情:

  • 當slave通過接受完RDB文件之後發送一個REPLCONF ACK給master。
  • master接收到slave的REPLCONF ACK之後,開始將緩存的增量數據發送給slave。

因此這裏不會註冊sendBulkToSlave處理器,只需要將slave設置為online即可。我們還可以發現不同的一點,對於累積部分的數據處理,RDB disk方式是由master主動發送給slave的,而對於diskless方式,master收到slave的REPLCONF ACK之後才會將累積的數據發送出去,這點有些不同。

當子進程結束,後處理的過程中還要考慮到一種情況:

無論是RDB disk方式還是diskless方式,如果復制已經開始了,後來的slave需要同master復制,這部分的slave怎麽辦呢

怎麽辦呢,對於這類slave,slave的復制狀態為SLAVE_STATE_WAIT_BGSAVE_START,語義上表示當前slave等待復制的開始,對於這種情況,Redis會直接啟動子進程開始預備下一輪復制。

RDB文件傳輸協議

上面握手機制部分提到,當slave和master握手完畢之後註冊了個readSyncBulkPayload處理器,用於讀取master發送過來的RDB文件,RDB文件通過TCP連接傳輸,本質上是一個數據流,slave端是如何區分當前傳輸方式是RDB disk方式還是diskless方式的呢?實際上對於不同的復制方式,數據傳輸協議也是不同的,假設我們把這個長長的RDB文件流稱為RDB文件報文,我們來看看兩種方式的不同協議格式:

技術分享圖片

RDB文件傳輸協議

上面有兩種報文協議,第一種為RDB disk方式的RDB文件報文傳輸協議,TCP流以"$"開始,然後緊跟著報文的長度,以換行符結束,這樣slave客戶端讀取長度之後就知道要從TCP後續的流中讀取多少內容就算結束了。第二種為diskless復制方式的RDB文件報文傳輸協議,以"$EOF:"開頭,緊跟著40字節長度的隨機16進制字符串,RDB文件結尾也緊跟著同樣的40字節長度的隨機16進制字符串。slave客戶端分別由TCP數據流的頭部來判斷復制類型,然後根據不同的協議去解析RDB文件,當RDB文件傳輸完成之後,slave會將RDB文件保存在本地,然後載入,這樣slave就基本和master保持同步了。

總結

本文主要在了解Redis復制源碼的基礎之上介紹Redis復制的實現原理及一些細節,希望對大家有幫助。

註:本文由作者原創,如有疑問請聯系作者。

redis復制源碼註釋地址:

https://github.com/hongmoshui/redis

Redis復制實現原理