Redis原始碼剖析和註釋(二十五)--- Redis Cluster 的通訊流程深入剖析(載入配置檔案、節點握手、分配槽)
Redis Cluster 通訊流程深入剖析
1. Redis Cluster 介紹和搭建
這篇部落格會介紹Redis Cluster
的資料分割槽理論和一個三主三從叢集的搭建。
2. Redis Cluster 和 Redis Sentinel
Redis 2.8
之後正式提供了Redis Sentinel(哨兵)
架構,而Redis Cluster(叢集)
是在Redis 3.0
正式加入的功能。
Redis Cluster
和 Redis Sentinel
都可以搭建Redis
多節點服務,而目的都是解決Redis
主從複製的問題,但是他們還是有一些不同。
Redis
主從複製可將主節點資料同步給從節點,從節點此時有兩個作用:
- 一旦主節點宕機,從節點作為主節點的備份可以隨時頂上來。
- 擴充套件主節點的讀能力,分擔主節點讀壓力。
但是,會出現以下問題:
- 一旦主節點宕機,從節點晉升成主節點,同時需要修改應用方的主節點地址,還需要命令所有從節點去複製新的主節點,整個過程需要人工干預。
- 主節點的寫能力或儲存能力受到單機的限制。
Redis的解決方案:
Redis Sentinel
旨在解決第一個問題,即使主節點宕機下線,Redis Sentinel
可以自動完成故障檢測和故障轉移,並通知應用方,真正實現高可用性(HA)。Redis Cluster
則是Redis
分散式的解決方案,解決後兩個問題。當單機記憶體、併發、流量等瓶頸時,可以採用Cluster
3. 搭建 Redis Cluster的通訊流程深入剖析
- 準備節點
- 節點握手
- 分配槽位
我們就根據這個流程分析Redis Cluster
的執行過程。
3.1 準備節點
我們首先要準備6
個節點,並且準備號對應埠號的配置檔案,在配置檔案中,要開啟cluster-enabled yes
選項,表示該節點以叢集模式開啟。因為叢集節點伺服器可以看做一個普通的Redis
伺服器,因此,叢集節點開啟伺服器的流程和普通的相似,只不過打開了一些關於叢集的標識。
當我們執行這條命令時,就會執行主函式
sudo redis-server conf/redis-6379. conf
在main()
函式中,我們需要關注這幾個函式:
loadServerConfig(configfile,options)
載入配置檔案。
- 底層最終呼叫
loadServerConfigFromString()
函式,會解析到cluster-
開頭的叢集的相關配置,並且儲存到伺服器的狀態中。
- 底層最終呼叫
initServer()
初始化伺服器。
- 會為伺服器設定時間事件的處理函式
serverCron()
,該函式會每間隔100ms
執行一次叢集的週期性函式clusterCron()
。 - 之後會執行
clusterInit()
,來初始化server.cluster
,這是一個clusterState
型別的結構,儲存的是叢集的狀態資訊。 - 接著在
clusterInit()
函式中,如果是第一次建立叢集節點,會建立一個隨機名字的節點並且會生成一個叢集專有的配置檔案。如果是重啟之前的叢集節點,會讀取第一次建立的叢集專有配置檔案,建立與之前相同名字的叢集節點。
- 會為伺服器設定時間事件的處理函式
verifyClusterConfigWithData()
該函式在載入AOF檔案或RDB檔案後被呼叫,用來檢查載入的資料是否正確和校驗配置是否正確。aeSetBeforeSleepProc()
在進入事件迴圈之前,為伺服器設定每次事件迴圈之前都要執行的一個函式beforeSleep()
,該函式一開始就會執行叢集的clusterBeforeSleep()
函式。aeMain()
進入事件迴圈,一開始就會執行之前設定的beforeSleep()
函式,之後就等待事件發生,處理就緒的事件。
以上就是主函式在開啟叢集節點時會執行到的主要程式碼。
在第二步初始化時,會建立一個clusterState
型別的結構來儲存當前節點視角下的叢集狀態。我們列出該結構體的程式碼:
typedef struct clusterState {
clusterNode *myself; /* This node */
// 當前紀元
uint64_t currentEpoch;
// 叢集的狀態
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
// 叢集中至少負責一個槽的主節點個數
int size; /* Num of master nodes with at least one slot */
// 儲存叢集節點的字典,鍵是節點名字,值是clusterNode結構的指標
dict *nodes; /* Hash table of name -> clusterNode structures */
// 防止重複新增節點的黑名單
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
// 匯入槽資料到目標節點,該陣列記錄這些節點
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
// 匯出槽資料到目標節點,該陣列記錄這些節點
clusterNode *importing_slots_from[CLUSTER_SLOTS];
// 槽和負責槽節點的對映
clusterNode *slots[CLUSTER_SLOTS];
// 槽對映到鍵的有序集合
zskiplist *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
// 之前或下一次選舉的時間
mstime_t failover_auth_time; /* Time of previous or next election. */
// 節點獲得支援的票數
int failover_auth_count; /* Number of votes received so far. */
// 如果為真,表示本節點已經向其他節點發送了投票請求
int failover_auth_sent; /* True if we already asked for votes. */
// 該從節點在當前請求中的排名
int failover_auth_rank; /* This slave rank for current auth request. */
// 當前選舉的紀元
uint64_t failover_auth_epoch; /* Epoch of the current election. */
// 從節點不能執行故障轉移的原因
int cant_failover_reason;
/* Manual failover state in common. */
// 如果為0,表示沒有正在進行手動的故障轉移。否則表示手動故障轉移的時間限制
mstime_t mf_end;
/* Manual failover state of master. */
// 執行手動孤戰轉移的從節點
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
// 從節點記錄手動故障轉移時的主節點偏移量
long long mf_master_offset;
// 非零值表示手動故障轉移能開始
int mf_can_start;
/* The followign fields are used by masters to take state on elections. */
// 叢集最近一次投票的紀元
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
// 呼叫clusterBeforeSleep()所做的一些事
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
// 傳送的位元組數
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
// 通過Cluster接收到的訊息數量
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
} clusterState;
初始化完當前叢集狀態後,會建立叢集節點,執行的程式碼是這樣的:
myself = server.cluster->myself = createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
首先myself
是一個全域性變數,定義在cluster.h
中,它指向當前叢集節點,server.cluster->myself
是叢集狀態結構中指向當前叢集節點的變數,createClusterNode()
函式用來建立一個叢集節點,並設定了兩個標識,表明身份狀態資訊。
該函式會建立一個如下結構來描述叢集節點。
typedef struct clusterNode {
// 節點建立的時間
mstime_t ctime; /* Node object creation time. */
// 名字
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
// 標識
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
// 節點的槽點陣圖
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
// 當前節點複製槽的數量
int numslots; /* Number of slots handled by this node */
// 從節點的數量
int numslaves; /* Number of slave nodes, if this is a master */
// 從節點指標陣列
struct clusterNode **slaves; /* pointers to slave nodes */
// 指向主節點,即使是從節點也可以為NULL
struct clusterNode *slaveof;
// 最近一次傳送PING的時間
mstime_t ping_sent; /* Unix time we sent latest ping */
// 接收到PONG的時間
mstime_t pong_received; /* Unix time we received the pong */
// 被設定為FAIL的下線時間
mstime_t fail_time; /* Unix time when FAIL flag was set */
// 最近一次為從節點投票的時間
mstime_t voted_time; /* Last time we voted for a slave of this master */
// 更新複製偏移量的時間
mstime_t repl_offset_time; /* Unix time we received offset for this node */
// 孤立的主節點遷移的時間
mstime_t orphaned_time; /* Starting time of orphaned master condition */
// 該節點已知的複製偏移量
long long repl_offset; /* Last known repl offset for this node. */
// ip地址
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
// 節點埠號
int port; /* Latest known port of this node */
// 與該節點關聯的連線物件
clusterLink *link; /* TCP/IP link with this node */
// 儲存下線報告的連結串列
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
初始化該結構時,會建立一個link
為空的節點,該變數是clusterLink
的指標,用來描述該節點與一個節點建立的連線。該結構定義如下:
typedef struct clusterLink {
// 連線建立的時間
mstime_t ctime; /* Link creation time */
// TCP連線的檔案描述符
int fd; /* TCP socket file descriptor */
// 輸出(傳送)緩衝區
sds sndbuf; /* Packet send buffer */
// 輸入(接收)緩衝區
sds rcvbuf; /* Packet reception buffer */
// 關聯該連線的節點
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;
該結構用於叢集兩個節點之間相互發送訊息。如果節點A傳送MEET
訊息給節點B,那麼節點A會建立一個clusterLink
結構的連線,fd
設定為連線後的套節字,node
設定為節點B,最後將該clusterLink
結構儲存到節點B的link
中。
3.2 節點握手
當我們建立好了6個節點時,需要通過節點握手來感知到到指定的程序。節點握手是指一批執行在叢集模式的節點通過Gossip
協議彼此通訊。節點握手是叢集彼此通訊的第一步,可以詳細分為這幾個過程:
myself
節點發送MEET
訊息給目標節點。- 目標節點處理
MEET
訊息,並回復一個PONG
訊息給myself
節點。 myself
節點處理PONG
訊息,回覆一個PING
訊息給目標節點。
這裡只列出了握手階段的通訊過程,之後無論什麼節點,都會每隔1s
傳送一個PING
命令給隨機篩選出的5
個節點,以進行故障檢測。
接下來會分別以myself
節點和目標節點的視角分別剖析這個握手的過程。
3.2.1 myself
節點發送 MEET 訊息
由客戶端發起命令:cluster meet <ip> <port>
當節點接收到客戶端的cluster meet
命令後會呼叫對應的函式來處理命令,該命令的執行函式是clusterCommand()
函式,該函式能夠處理所有的cluster
命令,因此我們列出處理meet
選項的程式碼:
// CLUSTER MEET <ip> <port>命令
// 與給定地址的節點建立連線
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
long long port;
// 獲取埠
if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s",
(char*)c->argv[3]->ptr);
return;
}
// 如果沒有正在進行握手,那麼根據執行的地址開始進行握手操作
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
// 連線成功回覆ok
} else {
addReply(c,shared.ok);
}
}
該函式先根據cluster meet <ip> <port>
命令傳入的引數,獲取要與目標節點建立連線的節點地址,然後根據節點地址執行clusterStartHandshake()
函式來開始執行握手操作。該函式程式碼如下:
int clusterStartHandshake(char *ip, int port) {
clusterNode *n;
char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa;
// 檢查地址是否非法
if (inet_pton(AF_INET,ip,
&(((struct sockaddr_in *)&sa)->sin_addr)))
{
sa.ss_family = AF_INET;
} else if (inet_pton(AF_INET6,ip,
&(((struct sockaddr_in6 *)&sa)->sin6_addr)))
{
sa.ss_family = AF_INET6;
} else {
errno = EINVAL;
return 0;
}
// 檢查埠號是否合法
if (port <= 0 || port > (65535-CLUSTER_PORT_INCR)) {
errno = EINVAL;
return 0;
}
// 設定 norm_ip 作為節點地址的標準字串表示形式
memset(norm_ip,0,NET_IP_STR_LEN);
if (sa.ss_family == AF_INET)
inet_ntop(AF_INET,
(void*)&(((struct sockaddr_in *)&sa)->sin_addr),
norm_ip,NET_IP_STR_LEN);
else
inet_ntop(AF_INET6,
(void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
norm_ip,NET_IP_STR_LEN);
// 判斷當前地址是否處於握手狀態,如果是,則設定errno並返回,該函式被用來避免重複和相同地址的節點進行握手
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
// 為node設定一個隨機的地址,當握手完成時會為其設定真正的名字
// 建立一個隨機名字的節點
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
// 設定地址
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
// 新增到叢集中
clusterAddNode(n);
return 1;
}
該函式先判斷傳入的地址是否非法,如果非法會設定errno
,然後會呼叫clusterHandshakeInProgress()
函式來判斷是否要進行握手的節點也處於握手狀態,以避免重複和相同地址的目標節點進行握手。然後建立一個隨機名字的目標節點,並設定該目標節點的狀態,如下:
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
然後呼叫clusterAddNode()
函式將該目標節點新增到叢集中,也就是server.cluster->nodes
字典,該字典的鍵是節點的名字,值是指向clusterNode()
結構的指標。
此時myself
節點並沒有將meet
訊息傳送給指定地址的目標節點,而是設定叢集中目標節點的狀態。而傳送meet
訊息則是在clusterCron()
函式中執行。我們列出週期性函式中傳送MEET
訊息的程式碼:
// 獲取握手狀態超時的時間,最低為1s
// 如果一個處於握手狀態的節點如果沒有在該超時時限內變成一個普通的節點,那麼該節點從節點字典中被刪除
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
// 檢查是否當前叢集中有斷開連線的節點和重新建立連線的節點
di = dictGetSafeIterator(server.cluster->nodes);
// 遍歷所有叢集中的節點,如果有未建立連線的節點,那麼傳送PING或PONG訊息,建立連線
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
// 跳過myself節點和處於NOADDR狀態的節點
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
// 如果仍然node節點處於握手狀態,但是從建立連線開始到現在已經超時
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
// 從叢集中刪除該節點,遍歷下一個節點
clusterDelNode(node);
continue;
}
// 如果節點的連線物件為空
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
// myself節點連線這個node節點
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
// 連接出錯,跳過該節點
if (fd == -1) {
// 如果ping_sent為0,察覺故障無法執行,因此要設定傳送PING的時間,當建立連線後會真正的的傳送PING命令
if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+CLUSTER_PORT_INCR,
server.neterr);
continue;
}
// 為node節點建立一個連線物件
link = createClusterLink(node);
// 設定連線物件的屬性
link->fd = fd;
// 為node設定連線物件
node->link = link;
// 監聽該連線的可讀事件,設定可讀時間的讀處理函式
aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
// 備份舊的傳送PING的時間
old_ping_sent = node->ping_sent;
// 如果node節點指定了MEET標識,那麼傳送MEET命令,否則傳送PING命令
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 如果不是第一次傳送PING命令,要將傳送PING的時間還原,等待被clusterSendPing()更新
if (old_ping_sent) {
node->ping_sent = old_ping_sent;
}
// 傳送MEET訊息後,清除MEET標識
// 如果沒有接收到PONG回覆,那麼不會在向該節點發送訊息
// 如果接收到了PONG回覆,取消MEET/HANDSHAKE狀態,傳送一個正常的PING訊息。
node->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);
clusterNode()
函式一開始就會處理叢集中斷開連線的節點和重新建立連線的節點。
以myself
節點的視角,遍歷叢集中所有的節點,跳過操作當前myself
節點和沒有指定地址的節點,然後判斷處於握手狀態的節點是否在建立連線的過程中超時,如果超時則會刪除該節點。如果還沒有建立連線,那麼myself
節點會與當前這個目標節點建立TCP
連線,並獲取套接字fd
,根據這個套接字,就可以建立clusterLink
結構的連線物件,並將這個連線物件儲存到當前這個目標節點。
myself
節點建立完連線後,首先會監聽與目標節點建立的fd
的可讀事件,並設定對應的處理程式clusterReadHandler()
,因為當傳送MEET
訊息給目標節點後,要接收目標節點回復的PING
。
接下來,myself
節點就呼叫clusterSendPing()
函式傳送MEET
訊息給目標節點。MEET
訊息是特殊的PING
訊息,只用於通知新節點的加入,而PING
訊息還需要更改一些時間資訊,以便進行故障檢測。
最後無論如何都要取消CLUSTER_NODE_MEET
標識,但是沒有取消CLUSTER_NODE_HANDSHAKE
該標識,表示仍處於握手狀態,但是已經發送了MEET
訊息了。
3.2.2 目標節點處理 MEET 訊息回覆 PONG 訊息
當myself
節點將MEET
訊息傳送給目標節點之前,就設定了clusterReadHandler()
函式為處理接收的PONG
訊息。當時目標節點如何接收到MEET
訊息,並且回覆PONG
訊息給myself
節點呢?
在叢集模式下,每個節點初始化時呼叫的clusterInit
時,會監聽節點的埠等待客戶端的連線,並且會將該監聽的套接字fd
儲存到server.cfd
陣列中,然後建立檔案事件,監聽該套接字fd
的可讀事件,並設定可讀事件處理函式clusterAcceptHandler()
,等待客戶端傳送資料。
那麼,在myself
節點在傳送MEET
訊息首先會連線目標節點所監聽的埠,觸發目標節點執行clusterAcceptHandler()
函式,該函式實際上就是accept()
函式,接收myself
節點的連線,然後監聽該連線上的可讀事件,設定可讀事件的處理函式為clusterReadHandler()
,等待myself
節點發送資料,當myself
節點發送MEET
訊息給目標節點時,觸發目標節點執行clusterReadHandler()
函式來處理訊息。
接下來,我們以目標節點的視角,來分析處理MEET
訊息的過程。
clusterReadHandler()
函式底層就是一個read()
函式,程式碼如下:
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[sizeof(clusterMsg)];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = (clusterLink*) privdata;
unsigned int readlen, rcvbuflen;
UNUSED(el);
UNUSED(mask);
// 迴圈從fd讀取資料
while(1) { /* Read as long as there is data to read. */
// 獲取連線物件的接收緩衝區的長度,表示一次最多能多大的資料量
rcvbuflen = sdslen(link->rcvbuf);
// 如果接收緩衝區的長度小於八位元組,就無法讀入訊息的總長
if (rcvbuflen < 8) {
readlen = 8 - rcvbuflen;
// 能夠讀入完整資料資訊
} else {
hdr = (clusterMsg*) link->rcvbuf;
// 如果是8個位元組
if (rcvbuflen == 8) {
// 如果前四個位元組不是"RCmb"簽名,釋放連線
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
{
serverLog(LL_WARNING,
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
return;
}
}
// 記錄已經讀入的內容長度
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}
// 從fd中讀資料
nread = read(fd,buf,readlen);
// 沒有資料可讀
if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
// 讀錯誤,釋放連線
if (nread <= 0) {
serverLog(LL_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link);
return;
} else {
// 將讀到的資料追加到連線物件的接收緩衝區中
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread;
}
// 檢查接收的資料是否完整
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
// 如果讀到的資料有效,處理讀到接收緩衝區的資料
if (clusterProcessPacket(link)) {
// 處理成功,則設定新的空的接收緩衝區
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}
之前在介紹clusterLink
物件時,每個連線物件都有一個link->rcvbuf
接收緩衝區和link->sndbuf
傳送緩衝區,因此這個函式就是從fd
將資料讀到link
的接收緩衝區,然後進行是否讀完整的判斷,如果完整的讀完資料,就呼叫clusterProcessPacket()
函式來處理讀到的資料,這裡會處理MEET
訊息。該函式是一個通用的處理函式,因此能夠處理各種型別的訊息,所列只列出處理MEET
訊息的重要部分:
// 從叢集中查詢sender節點
sender = clusterLookupNode(hdr->sender);
// 初始處理PING和MEET請求,用PONG作為回覆
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
// 我們使用傳入的MEET訊息來設定當前myself節點的地址,因為只有其他叢集中的節點在握手的時會發送MEET訊息,當有節點加入叢集時,或者如果我們改變地址,這些節點將使用我們公開的地址來連線我們,所以在叢集中,通過套接字來獲取地址是一個簡單的方法去發現或更新我們自己的地址,而不是在配置中的硬設定
// 但是,如果我們根本沒有地址,即使使用正常的PING資料包,我們也會更新該地址。 如果是錯誤的,那麼會被MEET修改
// 如果是MEET訊息
// 或者是其他訊息但是當前叢集節點的IP為空
if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') {
char ip[NET_IP_STR_LEN];
// 可以根據fd來獲取ip,並設定myself節點的IP
if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,NET_IP_STR_LEN);
serverLog(LL_WARNING,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
// 如果當前sender節點是一個新的節點,並且訊息是MEET訊息型別,那麼將這個節點新增到叢集中
// 當前該節點的flags、slaveof等等都沒有設定,當從其他節點接收到PONG時可以從中獲取到資訊
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
// 建立一個處於握手狀態的節點
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
// 設定ip和port
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);
// 新增到叢集中
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
// 如果是從一個未知的節點發送過來MEET包,處理流言資訊
if (!sender && type == CLUSTERMSG_TYPE_MEET)
// 處理流言中的 PING or PONG 資料包
clusterProcessGossipSection(hdr,link);
/* Anyway reply with a PONG */
// 回覆一個PONG訊息
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}
在該函式中,首先先會對訊息中的簽名、版本、訊息總大小,訊息中包含的節點資訊數量等等都進行判斷,確保該訊息是一個合法的訊息,然後就計算訊息的總長度,來判斷接收到的訊息和讀到的訊息是否一致完整。
現在,再次強調一遍,當前是以目標節點的視角處理MEET
訊息。
目標節點呼叫clusterLookupNode()
函式在目標節點視角中的叢集查詢MEET
訊息的傳送節點hdr->sender
,該節點就是myself
節點,由於這是第一次兩個節點之間的握手,那麼myself
節點一定在目標節點視角中的叢集是找不到的,所以sender
變數為NULL
。
然後就進入if
條件判斷,首先目標節點會根據MEET
訊息來獲取自己的地址並更新自己的地址,因為如果通過從配置檔案來設定地址,當節點重新上線,地址就有可能改變,但是配置檔案中卻沒有修改,所用通過套接字獲取地址來更新節點地址是一種非常好的辦法。
然後繼續執行第二個if
中的程式碼,第一次MEET
訊息,而且sender
傳送該訊息的節點並不存在目標節點視角中的叢集,所以會為傳送訊息的myself
節點建立一個處於握手狀態的節點,並且,將該節點加入到目標節點視角中的叢集。這樣一來,目標節點就知道了myself
節點的存在。
最後就是呼叫clusterSendPing()
函式,指定回覆一個PONG
訊息給myself
節點。
3.2.3 myself
節點處理 PONG 訊息回覆 PING 訊息
myself
在傳送訊息MEET
訊息之前,就已經為監聽fd
的可讀訊息,當目標節點處理完MEET
訊息並回復PONG
訊息之後,觸發myself
節點的可讀事件,呼叫clusterReadHandler()
函式來處理目標節點發送來的PONG
訊息。
這次是以myself
節點的視角來分析處理PONG
訊息。
clusterReadHandler()
函式就是目標節點第一次接收myself
節點發送MEET
訊息的函式,底層是read()
函式來將套接字中的資料讀取到link->rcvbuf
接收緩衝區中,程式碼在標題3.2.2
。它最後還是呼叫clusterProcessPacket()
函式來處理PONG
訊息。
但是這次處理程式碼的部分不同,因為myself
節點視角中的叢集可以找到目標節點,也就是說,myself
節點已經“認識”了目標節點。
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
serverLog(LL_DEBUG,"%s packet received: %p",
type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
(void*)link->node);
// 如果關聯該連線的節點存在
if (link->node) {
// 如果關聯該連線的節點處於握手狀態
if (nodeInHandshake(link->node)) {
// sender節點存在,用該新的連線地址更新sender節點的地址
if (sender) {
serverLog(LL_VERBOSE,
"Handshake: we already know node %.40s, "
"updating the address if needed.", sender->name);
if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
// 釋放關聯該連線的節點
clusterDelNode(link->node);
return 0;
}
// 將關聯該連線的節點的名字用sender的名字替代
clusterRenameNode(link->node, hdr->sender);
serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
// 取消握手狀態,設定節點的角色
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
// 如果sender的地址和關聯該連線的節點的地址不相同
} else if (memcmp(link->node->name,hdr->sender,
CLUSTER_NAMELEN) != 0)
{
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
link->node->name,
(int)(mstime()-(link->node->ctime)),
link->node->flags);
// 設定NOADDR標識,情況關聯連線節點的地址
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
link->node->port = 0;
// 釋放連線物件
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0;
}
}
// 關聯該連線的節點存在,且訊息型別為PONG
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
// 更新接收到PONG的時間
link->node->pong_received = mstime();
// 清零最近一次傳送PING的時間戳
link->node->ping_sent = 0;
// 接收到PONG回覆,可以刪除PFAIL(疑似下線)標識
// FAIL標識能否刪除,需要clearNodeFailureIfNeeded()來決定
// 如果關聯該連線的節點疑似下線
if (nodeTimedOut(link->node)) {
// 取消PFAIL標識
link->node->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
// 如果關聯該連線的節點已經被判斷為下線
} else if (nodeFailed(link->node)) {
// 如果一個節點被標識為FAIL,需要檢查是否取消該節點的FAIL標識,因為該節點在一定時間內重新上線了
clearNodeFailureIfNeeded(link->node);
}
}
}
和之前處理MEET
訊息一樣,首先先會對訊息中的簽名、版本、訊息總大小,訊息中包含的節點資訊數量等等都進行判斷,確保該訊息是一個合法的訊息,然後就計算訊息的總長度,來判斷接收到的訊息和讀到的訊息是否一致完整。然後處理上述部分的程式碼。
由於myself
節點已經“認識”目標節點,因此myself
節點在傳送MEET
訊息時已經為叢集(myself
節點視角)中的目標節點設定了連線物件,因此會執行判斷連線物件是否存在的程式碼if (nodeInHandshake(link->node))
,並且在myself
節點發送完MEET
訊息後,只取消了目標節點的CLUSTER_NODE_MEET
標識,保留了CLUSTER_NODE_HANDSHAKE
標識,因此會執行if (sender)
判斷。
目標節點發送過來的PONG
訊息,在訊息包的頭部會包含sender
傳送節點的資訊,但是名字對不上號,這是因為myself
節點建立目標節點加入叢集的時候,隨機給他起的名字,因為myself
節點當時也不知道目標節點的名字,所以在叢集中找不到sender
的名字,因此這個判斷會失敗,呼叫clusterRenameNode()
函式把它的名字改過來,這樣myself
節點就真正的認識了目標節點,重新認識。之後會將目標節點的CLUSTER_NODE_HANDSHAKE
狀態取消,並且設定它的角色狀態。
然後就是執行if (link->node && type == CLUSTERMSG_TYPE_PONG)
判斷,更新接收PONG
的時間戳,清零傳送PING
的時間戳,根據接收PONG
的時間等資訊判斷目標節點是否下線,如果下線要進行故障轉移等操作。
之後myself
節點並不會立即向目標節點發送PING
訊息,而是要等待下一次時間事件的發生,在clusterCron()
函式中,每次執行都需要對叢集中所有節點進行故障檢測和主從切換等等操作,因此在遍歷節點時,會處理以下一種情況:
while((de = dictNext(di)) != NULL) {
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;
if (node->link && node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
// 給node節點發送一個PING訊息
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
}
首先跳過操作myself
節點和處於握手狀態的節點,在myself
節點重新認識目標節點後,就將目標節點的握手狀態取消了,因此會對目標節點做下面的判斷操作。
當myself
節點接收到PONG
就會將目標節點node->ping_sent
設定為0
,表示目標節點還沒有傳送過PING
訊息,因此會發送PING
訊息給目標節點。
當傳送了這個PING
訊息之後,節點之間的握手操作就完成了。之後每隔1s
都會發送PING
包,來進行故障檢測等工作。
3.2.4 Gossip協議
搭建Redis Cluster
時,首先通過CLUSTER MEET
命令將所有的節點加入到一個叢集中,但是並沒有在所有節點兩兩之間都執行CLUSTER MEET
命令,那麼因為節點之間使用Gossip
協議進行工作。
Gossip
翻譯過來就是流言,類似與病毒傳播一樣,只要一個人感染,如果時間足夠,那麼和被感染的人在一起的所有人都會被感染,因此隨著時間推移,叢集內的所有節點都會互相知道對方的存在。
關於Gossip介紹可以參考:Gossip 演算法
在Redis
中,節點資訊是如何傳播的呢?答案是通過傳送PING
或PONG
訊息時,會包含節點資訊,然後進行傳播的。
我們先介紹一下Redis Cluster
中,訊息是如何抽象的。一個訊息物件可以是PING
、PONG
、MEET
,也可以是UPDATE
、PUBLISH
、FAIL
等等訊息。他們都是clusterMsg
型別的結構,該型別主要由訊息包頭部和訊息資料組成。
- 訊息包頭部包含簽名、訊息總大小、版本和傳送訊息節點的資訊。
- 訊息資料則是一個聯合體
union clusterMsgData
,聯合體中又有不同的結構體來構建不同的訊息。
PING
、PONG
、MEET
屬於一類,是clusterMsgDataGossip
型別的陣列,可以存放多個節點的資訊,該結構如下:
typedef struct {
// 節點名字
char nodename[CLUSTER_NAMELEN];
// 最近一次傳送PING的時間戳
uint32_t ping_sent;
// 最近一次接收PONG的時間戳
uint32_t pong_received;
// 節點的IP地址
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
// 節點的埠號
uint16_t port; /* port last time it was seen */
// 節點的標識
uint16_t flags; /* node->flags copy */
// 未使用
uint16_t notused1; /* Some room for future improvements. */
uint32_t notused2;
} clusterMsgDataGossip;
在clusterSendPing()
函式中,首先就是會將隨機選擇的節點的資訊加入到訊息中。程式碼如下:
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
int totlen; /* Total packet length. */
// freshnodes 的值是除了當前myself節點和傳送訊息的兩個節點之外,叢集中的所有節點
// freshnodes 表示的意思是gossip協議中可以包含的有關節點資訊的最大個數
int freshnodes = dictSize(server.cluster->nodes)-2;
// wanted 的值是叢集節點的十分之一向下取整,並且最小等於3
// wanted 表示的意思是gossip中要包含的其他節點資訊個數
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
// 因此 wanted 最多等於 freshnodes。
if (wanted > freshnodes) wanted = freshnodes;
// 計算分配訊息的最大空間
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*wanted);
// 訊息的總長最少為一個訊息結構的大小
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
// 分配空間
buf = zcalloc(totlen);
hdr = (clusterMsg*) buf;
// 設定傳送PING命令的時間
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
// 構建訊息的頭部
clusterBuildMessageHdr(hdr,type);
int maxiterations = wanted*3;
// 構建訊息內容
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
// 隨機選擇一個叢集節點
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
// 1. 跳過當前節點,不選myself節點
if (this == myself) continue;
// 2. 偏愛選擇處於下線狀態或疑似下線狀態的節點
if (maxiterations > wanted*2 &&
!(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)))
continue;
// 以下節點不能作為被選中的節點:
/*
1. 處於握手狀態的節點
2. 帶有NOADDR標識的節點
3. 因為不處理任何槽而斷開連線的節點
*/
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* Tecnically not correct, but saves CPU. */
continue;
}
// 如果已經在gossip的訊息中新增過了當前節點,則退出迴圈
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
CLUSTER_NAMELEN) == 0) break;
}
// j 一定 == gossipcount
if (j != gossipcount) continue;
/* Add it */
// 這個節點滿足條件,則將其新增到gossip訊息中
freshnodes--;
// 指向新增該節點的那個空間
gossip = &(hdr->data.ping.gossip[gossipcount]);
// 新增名字
memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN);
// 記錄傳送PING的時間
gossip->ping_sent = htonl(this->ping_sent);
// 接收到PING回覆的時間
gossip->pong_received = htonl(this->pong_received);
// 設定該節點的IP和port
memcpy(gossip->ip,this->ip,sizeof(this->ip));
gossip->port = htons(this->port);
// 記錄標識
gossip->flags = htons(this->flags);
gossip->notused1 = 0;
gossip->notused2 = 0;
// 已經新增到gossip訊息的節點數加1
gossipcount++;
}
// 計算訊息的總長度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
// 記錄訊息節點的數量到包頭
hdr->count = htons(gossipcount);
// 記錄訊息節點的總長到包頭
hdr->totlen = htonl(totlen);
// 傳送訊息
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
重點關注這幾個變數:
freshnodes
int freshnodes = dictSize(server.cluster->nodes)-2;
freshnodes
的值是除了當前myself節點和傳送訊息的兩個節點之外,叢集中的所有節點。freshnodes
表示的意思是gossip協議中可以包含的有關節點資訊的最大個數