Redis原始碼剖析和註釋(二十七)--- Redis 故障轉移流程和原理剖析
Redis 故障轉移流程和原理
1. 故障轉移介紹
Redis
叢集自身實現了高可用。高可用首先要解決叢集部分失敗的場景:當叢集內少量節點出現故障時通過自動故障轉移保證叢集可以正常對外提供服務。接下來就介紹故障轉移的細節,分析故障檢測和故障轉移。
- 故障檢測
- 故障轉移
2. 故障檢測
2.1 主觀故障的檢測
當一個節點出現問題,需要使用一種健壯的方法保證識別出節點是否發生了故障。在之前的 Redis Cluster 通訊流程深入剖析 一文中,介紹了Redis
的gossip
協議,叢集節點通過PING/PONG
訊息實現節點通訊,訊息不但可以傳播節點槽資訊,還可以傳播主從狀態、節點故障資訊等。因此故障檢測也是就是通過訊息傳播機制實現的。
首先Redis
叢集節點每隔1s
會隨機向一個最有可能發生故障的節點發送PING
訊息。執行該操作的函式是叢集的定時函式clusterCron()
。Redis Cluster檔案詳細註釋
if (!(iteration % 10)) {
int j;
// 隨機抽查5個節點,向pong_received值最小的傳送PING訊息
for (j = 0; j < 5; j++) {
// 隨機抽查一個節點
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
// 跳過無連線或已經發送過PING的節點
if (this->link == NULL || this->ping_sent != 0) continue;
// 跳過myself節點和處於握手狀態的節點
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
// 查找出這個5個隨機抽查的節點,接收到PONG回覆過去最久的節點
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
// 向接收到PONG回覆過去最久的節點發送PING訊息,判斷是否可達
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
最有可以能發生故障的節點的判斷方法是:隨機抽取5
個節點,根據pong_received
值的大小來判斷,這個變數代表最後一次接收到PONG
訊息回覆的時間,所以會向隨機選取的5
個節點中,最久沒有接收到PONG
訊息回覆的節點發送PING
訊息,來回復該節點的PONG
訊息。傳送PING
訊息會更新最近一次傳送PING
訊息的時間資訊ping_sent
。
這兩個時間資訊對於判斷節點故障扮演非常重要的作用。
如果這個節點真的發生了故障,當傳送了它PING
訊息後,就不會接收到PONG
訊息作為回覆,因此會觸發超時判斷。
當前以myself
節點為主視角,如果向一個節點發送了PING
訊息,但是在一定時間內沒有收到PONG
回覆,那麼會檢測到該節點可能疑似下線。處理該情況的程式碼在clusterCron()
函式中。
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;
// 跳過myself節點,無地址NOADDR節點,和處於握手狀態的節點
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;
......
// 如果當前還沒有傳送PING訊息,則跳過,只要傳送了PING訊息之後,才會執行以下操作
if (node->ping_sent == 0) continue;
// 計算已經等待接收PONG回覆的時長
delay = now - node->ping_sent;
// 如果等待的時間超過了限制
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
// 設定該節點為疑似下線的標識
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
// 設定更新狀態的標識
update_state = 1;
}
}
}
這個迴圈會迭代所有的節點,來檢測是否需要將某個節點標記為下線的狀態。還會做一些其他的操作,例如:
- 判斷孤立的主節點的個數,如果存在孤立的主節點並且某些條件滿足,之後會為其遷移一個其他主節點的從節點。
- 釋放回復
PONG
訊息過慢(超過超時時間的一半)的節點連線,等待下個週期重新建立連線。這樣做是為了連線更加健壯。 - 觸發第一次
PING
訊息傳送。當節點第一次加入叢集時,傳送完MEET
訊息,也接受PONG
回覆後,會觸發該條件,來執行第一次PING
訊息通訊。 - 如果一個從節點請求了手動故障轉移,傳送給請求節點一個
PING
訊息。 - 最後,則是對節點的故障檢測。
如果傳送PING
訊息的時間已經超過了cluster_node_timeout
限制,預設是15S
,那麼會將迭代的該節點的flags
開啟CLUSTER_NODE_PFAIL
標識,表示myself
節點主觀判斷該節點下線。但是這不代表最終的故障判定。
2.2 客觀故障的檢測
當myself
節點檢測到一個節點疑似下線後,就會開啟該節點的CLUSTER_NODE_PFAIL
標識,表示判斷該節點主觀下線,但是可能存在誤判的情況,因此為了真正的標記該節點的下線狀態,會進行客觀故障的檢測。
客觀故障的檢測仍然依賴PING/PONG
訊息的傳播,每次傳送PING/PONG
訊息,總會攜帶叢集節點個數的十分之一個節點資訊,傳送PING/PONG
訊息的函式clusterSendPing()
具體程式碼如下:Redis Cluster檔案詳細註釋
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
int totlen; /* Total packet length. */
// freshnodes 的值是除了當前myself節點和傳送訊息的兩個節點之外,叢集中的所有節點
// freshnodes 表示的意思是gossip協議中可以包含的有關節點資訊的最大個數
int freshnodes = dictSize(server.cluster->nodes)-2;
// wanted 的值是叢集節點的十分之一向下取整,並且最小等於3
// wanted 表示的意思是gossip中要包含的其他節點資訊個數
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
// 因此 wanted 最多等於 freshnodes。
if (wanted > freshnodes) wanted = freshnodes;
// 計算分配訊息的最大空間
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*wanted);
// 訊息的總長最少為一個訊息結構的大小
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
// 分配空間
buf = zcalloc(totlen);
hdr = (clusterMsg*) buf;
// 設定傳送PING命令的時間
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
// 構建訊息的頭部
clusterBuildMessageHdr(hdr,type);
int maxiterations = wanted*3;
// 構建訊息內容
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
// 隨機選擇一個叢集節點
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
// 1. 跳過當前節點,不選myself節點
if (this == myself) continue;
// 2. 偏愛選擇處於下線狀態或疑似下線狀態的節點
if (maxiterations > wanted*2 &&
!(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)))
continue;
// 以下節點不能作為被選中的節點:
/*
1. 處於握手狀態的節點
2. 帶有NOADDR標識的節點
3. 因為不處理任何槽而斷開連線的節點
*/
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* Tecnically not correct, but saves CPU. */
continue;
}
// 如果已經在gossip的訊息中新增過了當前節點,則退出迴圈
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
CLUSTER_NAMELEN) == 0) break;
}
// j 一定 == gossipcount
if (j != gossipcount) continue;
/* Add it */
// 這個節點滿足條件,則將其新增到gossip訊息中
freshnodes--;
// 指向新增該節點的那個空間
gossip = &(hdr->data.ping.gossip[gossipcount]);
// 新增名字
memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN);
// 記錄傳送PING的時間
gossip->ping_sent = htonl(this->ping_sent);
// 接收到PING回覆的時間
gossip->pong_received = htonl(this->pong_received);
// 設定該節點的IP和port
memcpy(gossip->ip,this->ip,sizeof(this->ip));
gossip->port = htons(this->port);
// 記錄標識
gossip->flags = htons(this->flags);
gossip->notused1 = 0;
gossip->notused2 = 0;
// 已經新增到gossip訊息的節點數加1
gossipcount++;
}
// 計算訊息的總長度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
// 記錄訊息節點的數量到包頭
hdr->count = htons(gossipcount);
// 記錄訊息節點的總長到包頭
hdr->totlen = htonl(totlen);
// 傳送訊息
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
重點關注這幾個變數:
freshnodes
int freshnodes = dictSize(server.cluster->nodes)-2;
freshnodes
的值是除了當前myself節點和傳送訊息的兩個節點之外,叢集中的所有節點。freshnodes
表示的意思是gossip協議中可以包含的有關節點資訊的最大個數
wanted
wanted = floor(dictSize(server.cluster->nodes)/10);
wanted
的值是叢集節點的十分之一向下取整,並且最小等於3。wanted
表示的意思是gossip
中要包含的其他節點資訊個數。
Gossip
協議包含的節點資訊個數是wanted
個,wanted
的值是叢集節點的十分之一向下取整,並且最小等於3。為什麼選擇十分之一,這是因為Redis Cluster
中計算故障轉移超時時間是server.cluster_node_timeout*2
,因此如果有節點下線,就能夠收到大部分叢集節點發送來的下線報告。
十分之一的由來:如果有
N
個主節點,那麼wanted
就是N/10
,我們認為,在一個node_timeout
的時間內,我們會接收到任意一個節點的4個訊息包,因為,傳送一個訊息包,最慢被接收也不過node_timeout/2
的時間,如果超過這個時間,那麼接收回復的訊息包就會超時,所以一個node_timeout
時間內,當前節點會發送兩個PING
包,同理,接收當前節點的PING
包,也會發送兩個PING
包給當前節點,並且會回覆兩個PONG
包,這樣一來,在一個node_timeout
時間內,當前節點就會接收到4個包。但是
Redis Cluster
中計算故障轉移超時時間是server.cluster_node_timeout*2
,是兩倍的node_timeout
時間,那麼當前節點會接收到8個訊息包。因為
N
個主節點,那麼wanted
就是N/10
,所以收到叢集下線報告的概率就是8*N/10
,也就是80%
,這樣就收到了大部分叢集節點發送來的下線報告。
然後計算訊息的總的大小,也就是totlen
變數,訊息包頭部加上wanted
個節點資訊。
為訊息分配空間,並呼叫clusterBuildMessageHdr()
函式來構建訊息包頭部,將傳送節點的資訊填充進去。
接著使用while
迴圈,選擇wanted
個叢集節點,選擇節點有一下幾個特點:
- 當然不會選擇
myself
節點,因為,在包頭中已經包含了myself
節點也就是傳送節點的資訊。 - 偏愛選擇處於下線狀態或疑似下線狀態的節點,這樣有利於進行故障檢測。
- 不選,處於握手狀態或沒有地址狀態的節點,還有就是因為不負責任何槽而斷開連線的節點。
如果滿足了上述條件,就會將節點的資訊加入到gossip
中,如果節點不夠最少的3個,那麼重複選擇時會提前跳出迴圈。
最後,更新一下訊息的總長度,然後呼叫clusterSendMessage()
函式傳送訊息。
因此,可以得知,在傳送PING/PONG
訊息時,會將處於CLUSTER_NODE_PFAIL
狀態的節點處於訊息的流言部分。
無論是叢集中的哪個主節點接收到了訊息,無論就接收到PING
訊息,還是接收到PONG
回覆,都會呼叫clusterReadHandler()
函式來讀取收到的訊息,並且判斷讀取的訊息合法性和完整性等等。
如果訊息可讀,會呼叫clusterProcessPacket()
函式來處理讀取到的訊息。該函式能夠處理所有型別的訊息,但是我們主要關注處理PING/PONG
訊息包的流言部分的程式碼
if (sender) clusterProcessGossipSection(hdr,link);
在接收節點的視角下的叢集中,sender
是訊息的傳送節點,如果sender
節點處於當前叢集中,那麼會呼叫clusterProcessGossipSection()
函式來處理流言部分的資訊。
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
// 獲取該條訊息包含的節點數資訊
uint16_t count = ntohs(hdr->count);
// clusterMsgDataGossip陣列的地址
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
// 傳送訊息的節點
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
// 遍歷所有節點的資訊
while(count--) {
// 獲取節點的標識資訊
uint16_t flags = ntohs(g->flags);
clusterNode *node;
sds ci;
// 根據獲取的標識資訊,生成用逗號連線的sds字串ci
ci = representClusterNodeFlags(sdsempty(), flags);
// 列印到日誌中
serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ci);
sdsfree(ci);
// 根據指定name從叢集中查詢並返回節點
node = clusterLookupNode(g->nodename);
// 如果node存在
if (node) {
// 如果傳送者是主節點,且不是node本身
if (sender && nodeIsMaster(sender) && node != myself) {
// 如果標識中指定了關於下線的狀態
if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
// 將sender的新增到node的故障報告中
if (clusterNodeAddFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
// 判斷node節點是否處於真正的下線FAIL狀態
markNodeAsFailingIfNeeded(node);
// 如果標識表示節點處於正常狀態
} else {
// 將sender從node的故障報告中刪除
if (clusterNodeDelFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
// 雖然node存在,但是node已經處於下線狀態
// 但是訊息中的標識卻反應該節點不處於下線狀態,並且實際的地址和訊息中的地址發生變化
// 這些表明該節點換了新地址,嘗試進行握手
if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
!(flags & CLUSTER_NODE_NOADDR) &&
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
(strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
{
// 釋放原來的叢集連線物件
if (node->link) freeClusterLink(node->link);
// 設定節點的地址為訊息中的地址
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
// 清除無地址的標識
node->flags &= ~CLUSTER_NODE_NOADDR;
}
// node不存在,沒有在當前叢集中找到
} else {
// 如果node不處於NOADDR狀態,並且叢集中沒有該節點,那麼向node傳送一個握手的訊息
// 注意,當前sender節點必須是本叢集的眾所周知的節點(不在叢集的黑名單中),否則有加入另一個叢集的風險
if (sender &&
!(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
// 開始進行握手
clusterStartHandshake(g->ip,ntohs(g->port));
}
}
/* Next node */
// 下一個節點
g++;
}
}
該函式會根據訊息頭中的count
數,來遍歷count
次流言攜帶的節點資訊,這些節點資訊都是可能處於下線或疑似下線的節點。
那麼首先找到節點資訊所描述的叢集節點,
- 如果訊息中附帶的節點資訊所對應的節點
node
存在
- 如果傳送訊息的節點是主節點,且附帶節點資訊的節點不是
myself
節點 - 那麼如果附帶的節點資訊顯示該
node
節點處於下線或疑似下線狀態,那麼會呼叫clusterNodeAddFailureReport()
函式將sender
節點新增到node
的故障報告的連結串列中。然後呼叫markNodeAsFailingIfNeeded()
函式來判斷該node
節點是否真正的處於客觀下線狀態。 - 否則,節點則是處於正常狀態,則呼叫
clusterNodeDelFailureReport()
函式將sender
節點從node
節點的故障報告連結串列中刪除。 - 如果
node
存在,在myself
叢集中的視角中,該節點處於下線或疑似下線的狀態,但是訊息中的卻反饋不處於下線的狀態,且節點更換了地址 - 釋放原來的節點的連線,設定訊息中新提供的地址,重新嘗試連線新地址。這用來處理節點重新上線的情況。
- 如果傳送訊息的節點是主節點,且附帶節點資訊的節點不是
node
節點不存在當前叢集中
- 確保
sender
節點在當前叢集中,防止加入另一個叢集。且訊息中顯示node
節點有地址,且該節點不在叢集黑名單中。黑名單是在叢集收縮時,將要下線的節點加入黑名單,然後讓叢集所有節點都忘記該節點。 - 如果滿足上面的條件,則開始進行握手操作。
- 確保
我們重點關注node
下線的情況,在標題2.1
時,myself
節點將疑似下線的節點設定為CLUSTER_NODE_PFAIL
標識,因此,接收訊息的節點,呼叫clusterNodeAddFailureReport()
函式,將sender
節點新增到node
的故障報告的連結串列中。程式碼如下:
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
// 獲取故障報告的連結串列
list *l = failing->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
listRewind(l,&li);
// 遍歷故障報告連結串列
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
// 如果存在sender之前傳送的故障報告
if (fr->node == sender) {
// 那麼只更新時間戳
fr->time = mstime();
return 0;
}
}
// 否則建立新的故障報告
fr = zmalloc(sizeof(*fr));
// 設定傳送該報告的節點
fr->node = sender;
// 設定時間
fr->time = mstime();
// 新增到故障報告的連結串列中
listAddNodeTail(l,fr);
return 1;
}
函式很簡單,遍歷下線節點的fail_reports
故障報告連結串列,如果sender
節點之前就已經報告該節點下線,那麼更新報告的時間戳,否則建立新的報告,加入到該連結串列中。
然後呼叫markNodeAsFailingIfNeeded()
函式來判斷該函式是否處於客觀下線狀態。程式碼如下:
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
// 需要大多數的票數,超過一半的節點數量
int needed_quorum = (server.cluster->size / 2) + 1;
// 不處於pfail(需要確認是否故障)狀態,則直接返回
if (!nodeTimedOut(node)) return; /* We can reach it. */
// 處於fail(已確認為故障)狀態,則直接返回
if (nodeFailed(node)) return; /* Already FAILing. */
// 返回認為node節點下線(標記為 PFAIL or FAIL 狀態)的其他節點數量
failures = clusterNodeFailureReportsCount(node);
// 如果當前節點是主節點,也投一票
if (nodeIsMaster(myself)) failures++;
// 如果報告node故障的節點數量不夠總數的一半,無法判定node是否下線,直接返回
if (failures < needed_quorum) return; /* No weak agreement from masters. */
serverLog(LL_NOTICE, "Marking node %.40s as failing (quorum reached).", node->name);
// 取消PFAIL,設定為FAIL
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
// 並設定下線時間
node->fail_time = mstime();
// 廣播下線節點的名字給所有的節點,強制所有的其他可達的節點為該節點設定FAIL標識
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
該函式用來判斷node
節點是否處於客觀下線的狀態,Redis
認為,如果叢集中過半數的主節點都認為該節點處於下線的狀態,那麼該節點就處於客觀下線的狀態,因為needed_quorum
就是計算的票數。
如果該節點不處於FAIL
或PFAIL
狀態,則直接返回。
然後呼叫clusterNodeFailureReportsCount()
函式來計算叢集中有多少個認為該節點下線的主節點數量。該函式就是將計算故障報告的連結串列的長度,因為如果有節點下線,那麼其他正常的主節點每次傳送PING/PONG
訊息時,會將下線節點附加到訊息中,當接收節點處理訊息時,則會將傳送訊息的節點加入到下線節點的故障報告連結串列中,這樣就可以計算叢集中有多少個主節點認為該節點處於下線狀態。
如果myself
節點也是主節點,那麼也有一個投票的權利。
如果所有投票的主節點個數小於需要的票數needed_quorum
,則直接返回,表示無法判斷該節點是否處於客觀下線狀態。
如果達到了needed_quorum
,那麼會取消CLUSTER_NODE_PFAIL
狀態,設定為CLUSTER_NODE_FAIL
狀態。並設定該節點被判斷為客觀下線的時間。
最後一步,就是將客觀下線的節點廣播給叢集中所有的節點。通過傳送FAIL
訊息,呼叫clusterSendFail()
函式,程式碼如下:
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
// 構建FAIL的訊息包包頭
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
// 設定下線節點的名字
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
// 傳送給所有叢集中的節點
clusterBroadcastMessage(buf,ntohl(hdr->totlen));
}
FAIL
訊息和PING/PONG
訊息一樣,首先呼叫clusterBuildMessageHdr
設定訊息包的頭部,設定CLUSTERMSG_TYPE_FAIL
標識,表示該訊息包是一個FAIL
訊息。
該訊息包的頭部設定完畢後,要設定訊息包的內容,FAIL
訊息包的資料部分很簡單,就是clusterMsgDataFail
型別,該結構體只包含一個成員,就是客觀下線的節點名字。
最後呼叫clusterBroadcastMessage()
函式將訊息傳送給整個叢集的所有節點,該函式就是遍歷叢集中的所有節點,除了處於握手狀態的節點和myself
節點不傳送訊息,還有沒有連線的節點不傳送,其他所有的節點都發送FAIL
訊息。
這樣一來,叢集中所有節點就知道了客觀下線的節點。
3. 故障轉移
當故障節點客觀下線了以後,那麼就要自動選舉出一個可以替代他的從節點,從而保證高可用。
當叢集中的節點接收到發來的FAIL
訊息,會執行如下的程式碼處理:Redis Cluster檔案詳細註釋
if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;
if (sender) {
// 獲取下線節點的地址
failing = clusterLookupNode(hdr->data.fail.about.nodename);
// 如果下線節點不是myself節點也不是處於下線狀態
if (failing &&
!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
{
serverLog(LL_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
// 設定FAIL標識
failing->flags |= CLUSTER_NODE_FAIL;
// 設定下線時間
failing->fail_time = mstime();
// 取消PFAIL標識
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
} else {
serverLog(LL_NOTICE,
"Ignoring FAIL message from unknown node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
}
所有收到FAIL
訊息的節點,都會根據傳來的客觀下線的內容,將當前視角中的叢集中的下線節點設定CLUSTER_NODE_FAIL
標識,並且設定下線時間。
然後下線節點的從節點等待下一個週期執行clusterCron()
函式,來開始故障轉移操作。具體的程式碼如下:
if (nodeIsSlave(myself)) {
// 設定手動故障轉移的狀態
clusterHandleManualFailover();
// 執行從節點的自動或手動故障轉移,從節點獲取其主節點的雜湊槽,並傳播新配置
clusterHandleSlaveFailover();
// 如果存在孤立的主節點,並且叢集中的某一主節點有超過2個正常的從節點,並且該主節點正好是myself節點的主節點
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
// 給孤立的主節點遷移一個從節點
clusterHandleSlaveMigration(max_slaves);
}
如果myself
節點是從節點,才會執行這一部分的程式碼。這一部分主要執行了三個動作:
- 呼叫
clusterHandleManualFailover
來設定手動故障轉移的狀態。用於執行了CLUSTER FAILOVER [FORCE|TAKEOVER]
命令的情況。 - 呼叫
clusterHandleSlaveFailover()
函式來執行故障轉移操作。重點關注該函式。 - 為孤立的節點遷移個從節點。滿足以下條件,則可以呼叫
clusterHandleSlaveMigration()
函式為孤立的主節點遷移一個從節點。
- 存在孤立的節點,孤立的節點判斷條件如下:
- 該節點沒有正常的從節點
- 並且該節點負責一部分槽位。
- 並且該節點處於可以將槽位匯出的狀態。
- 叢集中的一個主節點有超過
2
個正常的從節點。 - 上面一個條件中的主節點正好是
myself
節點的主節點。
我們關注第二點,呼叫clusterHandleSlaveFailover()
函式來執行故障轉移操作。該函式可以分為這幾個部分:
- 選舉資格檢測和準備工作
- 準備選舉的時間
- 發起選舉
- 選舉投票
- 替換主節點
- 主從切換廣播給叢集
我們將clusterHandleSlaveFailover()
函式分割為幾部分,一一剖析。
3.1 選舉資格檢測和準備工作
首先函式需要判斷當前執行clusterHandleSlaveFailover()
函式的節點是否具有選舉的資格。部分程式碼如下:
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
// 計算上次選舉所過去的時間
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
// 計算勝選需要的票數
int needed_quorum = (server.cluster->size / 2) + 1;
// 手動故障轉移的標誌
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
mstime_t auth_timeout, auth_retry_time;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
// 計算故障轉移超時時間
auth_timeout = server.cluster_node_timeout*2;
if (auth_timeout < 2000) auth_timeout = 2000;
// 重試的超時時間
auth_retry_time = auth_timeout*2;
// 執行函式的前提條件,在自動或手動故障轉移的情況下都必須滿足:
/*
1. 當前節點是從節點
2. 該從節點的主節點被標記為FAIL狀態,或者是一個手動故障轉移狀態
3. 當前從節點有負責的槽位
*/
// 如果不能滿足以上條件,則直接返回
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
myself->slaveof->numslots == 0)
{
// 設定故障轉移失敗的原因:CLUSTER_CANT_FAILOVER_NONE
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
// 如果當前節點正在和主節點保持連線狀態,計算從節點和主節點斷開的時間
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
// 從data_age刪除一個cluster_node_timeout的時長,因為至少以從節點和主節點斷開連線開始,因為超時的時間不算在內
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
// 檢查這個從節點的資料是否比較新
if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}
函式一開始,先計算了一些之後需要的變數。
auth_age
:表示距離上次選舉所過去的時間。needed_quorum
:表示勝選所需要的票數。manual_failover
:手動故障轉移的標識。1
表示是手動故障轉移,0
表示是自發的故障轉移。auth_timeout
:故障轉移的超時時間,最少為2s
。auth_retry_time
:重試的週期時長。
如果myself
節點是主節點或myself
節點沒有主節點,或者主節點不處於FAIL
狀態,且不是手動故障轉移,或者myself
節點的主節點沒有負責的槽位,則不能執行該函式。直接返回。
根據當前myself
從節點是否處於複製的連線狀態,進行計算資料的時間data_age
,用來表示資料的新舊。如果處於連線狀態,則計算距離最後因此與主節點互動的時間,否則計算距離複製斷開的時間。
如果複製處於斷開連線的狀態,那麼data_age
一定會大於叢集節點超時時間,因此要減去一個超時時間,用來準確的描述距離最後一次複製主節點的資料所經過的時間。
最後如果當前伺服器設定了cluster_slave_validity_factor
值,該變量表示:故障轉移時,從節點最後一次複製主節點資料所經過的時間。如果data_age
超過了規定的時間,那麼表示該從節點的複製的資料太舊,太少,不具備執行故障轉移的資格。
3.2 設定選舉時間
如果當前從節點符合故障轉移的資格,更新選舉開始的時間,只有達到改時間才能執行後續的流程。
// 如果先前的嘗試故障轉移超時並且重試時間已過,我們可以設定一個新的。
if (auth_age > auth_retry_time) {
// 設定新的故障轉移屬性
server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank();
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
/* However if this is a manual failover, no delay is needed. */
// 手動故障轉移的情況
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
}
serverLog(LL_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());
// 傳送一個PONG訊息包給所有的從節點,攜帶有當前的複製偏移量
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
// 如果沒有開始故障轉移,則呼叫clusterGetSlaveRank()獲取當前從節點的最新排名。因為在故障轉移之前可能會收到其他節點發送來的心跳包,因而可以根據心跳包的複製偏移量更新本節點的排名,獲得新排名newrank,如果newrank比之前的排名靠後,則需要增加故障轉移開始時間的延遲,然後將newrank記錄到server.cluster->failover_auth_rank中
if (server.cluster->failover_auth_sent == 0 &&
server.cluster->mf_end == 0)
{
// 獲取新排名
int newrank = clusterGetSlaveRank();
// 新排名比之前的靠後
if (newrank > server.cluster->failover_auth_rank) {
// 計算延遲故障轉移時間
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
// 更新下一次故障轉移的時間和排名
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
serverLog(LL_WARNING,
"Slave rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}
// 如果還沒有到故障轉移選舉的時間,直接返回
if (mstime() < server.cluster->failover_auth_time) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}
// 如果距離故障轉移的時間過了很久,那麼不在執行故障轉移,直接返回
if (auth_age > auth_timeout) {
// 故障轉移過期
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
return;
}
如果上次選舉所過去的時間auth_age
大於auth_retry_time
重試的週期,表示這一次故障轉移超時,那麼就重新設定一個。
更新一些故障操作的時間資訊,然後傳送一個PONG
訊息給所有叢集的從節點,這些從節點和myself
節點有同一個下線的主節點。然後就返回,等待觸發下一次故障轉移。
然後,處理了關於手動故障節點排名的情況。
接下來,如果沒有到達故障轉移的時間,直接返回。
最後,如果上次選舉所過去的時間auth_age
大於故障轉移的超時時間auth_timeout
。那麼不在執行故障轉移,直接返回。
3.3 發起選舉
如果當前達到了故障轉移的時間,那麼就會先發起選舉操作,選出一個執行故障轉移的從節點。
if (server.cluster->failover_auth_sent == 0) {
// 增加當前紀元
server.cluster->currentEpoch++;
// 設定發其故障轉移的紀元
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
// 傳送一個FAILOVE_AUTH_REQUEST訊息給所有的節點,判斷這些節點是否同意該從節點為它的主節點執行故障轉移操作
clusterRequestFailoverAuth();
// 設定為真,表示本節點已經向其他節點發送了投票請求
server.cluster->failover_auth_sent = 1;
// 進入下一個事件迴圈執行的操作,儲存配置檔案,更新節點狀態,同步配置
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */
}
如果還沒有向叢集其他節點發起投票請求,那麼將當前紀元currentEpoch
加一,然後該當前紀元設定發起故障轉移的紀元failover_auth_epoch
。呼叫clusterRequestFailoverAuth()
函式,傳送一個FAILOVE_AUTH_REQUEST
訊息給其他所有叢集節點,等待其他節點回復是否同意該從節點為它的主節點執行故障轉移操作。最後設定failover_auth_sent
為真,表示本節點已經向其他叢集節點發送投票請求了。然後就之久返回,等待其他節點的回覆。
我們看一下發送FAILOVE_AUTH_REQUEST
訊息的函式clusterRequestFailoverAuth()
void clusterRequestFailoverAuth(void) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
// 建立REQUEST訊息包包頭
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
// 如果是一個手動的故障轉移,設定CLUSTERMSG_FLAG0_FORCEACK,表示即使主節點線上,也要認證故障轉移
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
// 計算REQUEST訊息包的長度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
// 廣播這個訊息包
clusterBroadcastMessage(buf,totlen);
}
REQUEST
訊息用來從節點請求是否可以進行故障轉移,因此先建立CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST
標識的訊息包,表示這是一個REQUEST
訊息,然後將該訊息廣播給叢集中的所有節點。
3.4 選舉投票
當叢集中所有的節點接收到REQUEST
訊息後,會執行clusterProcessPacket()
函式的這部分程式碼:
if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
// 如果條件允許,向sender投票,支援它進行故障轉移
clusterSendFailoverAuthIfNeeded(sender,hdr);
}
如果傳送訊息包的節點sender
不是當前叢集的節點,直接返回。否則呼叫clusterSendFailoverAuthIfNeeded()
函式向sender
節點發起投票。該函式的程式碼如下:
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
// 獲取該請求從節點的主節點
clusterNode *master = node->slaveof;
// 獲取請求的當前紀元和配置紀元
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
// 獲取該請求從節點的槽點陣圖資訊
unsigned char *claimed_slots = request->myslots;
// 是否指定強制認證故障轉移的標識
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
// 如果myself是從節點,或者myself沒有負責的槽資訊,那麼myself節點沒有投票權,直接返回
if (nodeIsSlave(myself) || myself->numslots == 0) return;
// 如果請求的當前紀元小於叢集的當前紀元,直接返回。該節點有可能是長時間下線後重新上線,導致版本落後於就叢集的版本
// 因為該請求節點的版本小於叢集的版本,每次有選舉或投票都會更新每個節點的版本,使節點狀態和叢集的狀態是一致的。
if (requestCurrentEpoch < server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
node->name,
(unsigned long long) requestCurrentEpoch,
(unsigned long long) server.cluster->currentEpoch);
return;
}
// 如果最近一次投票的紀元和當前紀元相同,表示叢集已經投過票了
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: already voted for epoch %llu",
node->name,
(unsigned long long) server.cluster->currentEpoch);
return;
}
// 指定的node節點必須為從節點且它的主節點處於下線狀態,否則列印日誌後返回
if (nodeIsMaster(node) || master == NULL ||
(!nodeFailed(master) && !force_ack))
{
// 故障轉移的請求必須由從節點發起
if (nodeIsMaster(node)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: it is a master node",
node->name);
// 從節點找不到他的主節點
} else if (master == NULL) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: I don't know its master",
node->name);
// 從節點的主節點沒有處於下線狀態
} else if (!nodeFailed(master)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: its master is up",
node->name);
}
return;
}
// 在cluster_node_timeout * 2時間內只能投1次票
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
{
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}
// 請求投票的從節點必須有一個宣告負責槽位的配置紀元,這些配置紀元必須比負責相同槽位的主節點的配置紀元要大
for (j = 0; j < CLUSTER_SLOTS; j++) {
// 跳過沒有指定的槽位
if (bitmapTestBit(claimed_slots, j) == 0) continue;
// 如果請求從節點的配置紀元大於槽的配置紀元,則跳過
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
{
continue;
}
// 如果請求從節點的配置紀元小於槽的配置紀元,那麼表示該從節點的配置紀元已經過期,不能給該從節點投票,直接返回
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"slot %d epoch (%llu) > reqEpoch (%llu)",
node->name, j,
(unsigned long long) server.cluster->slots[j]->configEpoch,
(unsigned long long) requestConfigEpoch);
return;
}
// 傳送一個FAILOVER_AUTH_ACK訊息給指定的節點,表示支援該從節點進行故障轉移
clusterSendFailoverAuth(node);