【Redis】Redis Cluster初始化及PING訊息的傳送
Cluster訊息型別定義
#define CLUSTERMSG_TYPE_PING 0 /* Ping訊息型別,節點間進行通訊交換資訊的訊息 */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong訊息型別 (Ping命令的回覆) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet訊息型別,表示節點加入叢集 */
#define CLUSTERMSG_TYPE_FAIL 3 /* FAIL訊息型別,表示節點下線*/
在Redis初始化服務initServer函式中,呼叫aeCreateTimeEvent註冊了時間事件,週期性的執行serverCron函式,在serverCron中可以看到每隔100ms呼叫一次clusterCron函式,執行Redis Cluster定時任務:
void initServer(void) { // 省略... if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } // 省略... } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { // 省略... /* Redis Cluster 定時任務,每隔100ms呼叫一次 */ run_with_period(100) { if (server.cluster_enabled) clusterCron(); } // 省略... }
clusterCron
clusterCron是叢集相關的定時執行函式,每100ms執行一次:
- 遍歷叢集中的所有節點,校驗是否有連線中斷的節點並進行重新連線
- 如果節點是自身或者是沒有地址的節點,跳過
- 如果節點處於握手狀態並且已經超時,跳過
- 如果連線為空,呼叫connConnect進行連線,回撥函式為clusterLinkConnectHandler
- 每執行10次clusterCron函式時隨機選取五個節點,然後從這五個節點選出最早收到PONG回覆的那個節點,也就是找出最久沒有進行通訊的那個節點,向其傳送PING訊息,clusterCron每100ms執行一次,執行10次是1000ms,也就是說每1秒選取一個節點呼叫clusterSendPing函式傳送一次PING訊息
void clusterCron(void) {
// ...
/* 校驗是否有連線中斷的節點並進行重新連線 */
di = dictGetSafeIterator(server.cluster->nodes);
server.cluster->stats_pfail_nodes = 0;
// 遍歷叢集中的所有節點
while((de = dictNext(di)) != NULL) {
// 獲取叢集節點
clusterNode *node = dictGetVal(de);
/* 如果節點是自身或者是沒有地址的節點,跳過 */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
if (node->flags & CLUSTER_NODE_PFAIL)
server.cluster->stats_pfail_nodes++;
/* 如果節點處於握手狀態並且已經超時 */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}
if (node->link == NULL) {
// 建立clusterLink
clusterLink *link = createClusterLink(node);
link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
connSetPrivateData(link->conn, link);
// 建立連線,監聽函式為clusterLinkConnectHandler
if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,
clusterLinkConnectHandler) == -1) {
if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->cport, server.neterr);
freeClusterLink(link);
continue;
}
// 設定link
node->link = link;
}
}
dictReleaseIterator(di);
/* 每執行10次clusterCron函式,傳送一次PING訊息 */
if (!(iteration % 10)) {
int j;
/* 隨機選取節點並找到最早收到pong訊息的節點 */
for (j = 0; j < 5; j++) {
// 隨機選取節點
de = dictGetRandomKey(server.cluster->nodes);
// 獲取節點
clusterNode *this = dictGetVal(de);
/* 如果節點的連線已中斷或者本次PING命令處於活躍狀態 */
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
// 查詢最早收到PONG訊息的那個節點
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
// 如果是最早收到PONG訊息的節點
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
// 傳送PING訊息
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
// ...
}
clusterNode
clusterNode是叢集中節點對應的結構體,包含了以下內容:
typedef struct clusterNode {
mstime_t ctime; /* 節點物件建立時間 */
char name[CLUSTER_NAMELEN]; /* 節點名稱 */
int flags; /* 節點標識 */
uint64_t configEpoch; /* configEpoch */
unsigned char slots[CLUSTER_SLOTS/8]; /* 節點負責的slots */
sds slots_info; /* Slots資訊 */
int numslots; /* 節點負責的slots數量 */
int numslaves; /* 從節點的數量 */
struct clusterNode **slaves; /* 指向從節點的指標 */
struct clusterNode *slaveof; /* 指向主節點的指標 */
mstime_t ping_sent; /* 最近一次傳送PING訊息的時間 */
mstime_t pong_received; /* 收到pong訊息的時間 */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* 標記FAIL狀態的時間 */
mstime_t voted_time; /* 最近一次投票的時間 */
mstime_t repl_offset_time; /* 收到主從複製offset的時間*/
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* 節點主從複製offset */
char ip[NET_IP_STR_LEN]; /* IP */
int port; /* 客戶端通訊埠 */
int pport; /* 使用TLS協議的埠 */
int cport; /* 叢集通訊埠 */
clusterLink *link; /* TCP/IP連線相關資訊 */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
clusterLinkConnectHandler
clusterLinkConnectHandler是建立連線的監聽函式,當連線建立時會呼叫clusterLinkConnectHandler進行處理,在clusterLinkConnectHandler函式中可以看到,又呼叫了connSetReadHandler註冊了可讀事件的監聽,對應的回撥函式為clusterReadHandler,當收到其他節點發送的通訊訊息時會呼叫clusterReadHandler函式處理:
void clusterLinkConnectHandler(connection *conn) {
clusterLink *link = connGetPrivateData(conn);
clusterNode *node = link->node;
/* 校驗連線是否成功 */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
node->name, node->ip, node->cport,
connGetLastError(conn));
freeClusterLink(link);
return;
}
/* 註冊readHandler,監聽函式為clusterReadHandler */
connSetReadHandler(conn, clusterReadHandler);
// 省略..
}
叢集間通訊
通訊訊息結構體定義
clusterMsg
叢集間通訊的訊息對應的結構體為clusterMsg,裡面包含了訊息型別、傳送訊息節點的slots資訊以及節點間通訊的訊息體clusterMsgData等資訊:
typedef struct {
char sig[4]; /* "RCmb"簽名 */
uint32_t totlen; /* 訊息總長度 */
uint16_t ver; /* 協議版本, 當前設定為1 */
uint16_t port; /* 埠 */
uint16_t type; /* 訊息型別 */
// 省略...
char sender[CLUSTER_NAMELEN]; /* 傳送訊息節點的名稱 */
unsigned char myslots[CLUSTER_SLOTS/8]; /* 傳送訊息節點的slots資訊 */
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* 傳送訊息節點的ip */
char notused1[32]; /* 32位元組的保留資料 */
uint16_t pport; /* 使用TLS協議時的埠 */
uint16_t cport; /* 傳送訊息節點的叢集匯流排埠,也就是用於叢集間通訊的埠 */
uint16_t flags; /* 傳送訊息節點的flags標識 */
unsigned char state; /* 傳送訊息節點的叢集狀態 */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data; // 叢集通訊的實際訊息
} clusterMsg;
clusterMsgData
clusterMsgData裡面儲存了節點間進行通訊的實際訊息,不同訊息型別對應不同的資料結構:
- clusterMsgDataGossip:PING, MEET 和 PONG訊息對應的資料結構
- clusterMsgDataFail:FAIL訊息對應的資料結構
- clusterMsgDataPublish:PUBLISH訊息對應的資料結構
- clusterMsgDataUpdate:UPDATE訊息對應的資料結構
- clusterMsgModule:MODULE訊息對應的資料結構
union clusterMsgData {
/* PING, MEET and PONG訊息對應的資料結構 */
struct {
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL訊息對應的資料結構 */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH訊息對應的資料結構 */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE訊息對應的資料結構 */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE訊息對應的資料結構 */
struct {
clusterMsgModule msg;
} module;
};
clusterMsgDataGossip
clusterMsgDataGossip是叢集間傳送PING、MEET 和 PONG訊息對應的資料結構,裡面包含以下資訊:
typedef struct {
char nodename[CLUSTER_NAMELEN]; /* 節點名稱 */
uint32_t ping_sent; /* 傳送PING命令的時間 */
uint32_t pong_received; /* 收到PONG命令的時間 */
char ip[NET_IP_STR_LEN]; /* 節點的IP */
uint16_t port; /* 用於客戶端通訊的埠 */
uint16_t cport; /* 叢集間通訊的埠 */
uint16_t flags; /* 節點的flags標識 */
uint16_t pport; /* 使用TLS協議時的埠 */
uint16_t notused1;
} clusterMsgDataGossip;
PING訊息的傳送
clusterSendPing
clusterSendPing函式用於向指定節點發送PING訊息,Ping訊息中不僅包含當前節點的資訊,也會隨機選取一些其他的節點,將其他節點的資訊封裝在訊息體中進行傳送,隨機選取節點的個數計算規則如下:
-
wanted
:隨機選取的節點個數,預設是叢集中節點的數量除以10 -
freshnodes
:隨機選取的節點個數的最大值,預設叢集中節點的數量減2
如果wanted
小於3,那麼將wanted
置為3,也就是最少選取3個節點;
如果wanted
大於freshnodes
,將wanted
置為freshnodes
的值,也就是最大可以選取freshnodes
個節點;
選取的節點個數wanted
確定之後,處理邏輯如下:
-
呼叫clusterBuildMessageHdr函式構建訊息頭
-
根據
wanted
的數量隨機選取節點,處於以下幾種情況的節點將被跳過-
FAIL下線狀態的節點
-
處於握手狀態的節點
-
沒有地址資訊的節點
-
失去連線的節點並且沒有配置slots.
-
-
呼叫clusterSetGossipEntry函式將選取的節點資訊加入到訊息體中
-
呼叫clusterSendMessage函式傳送訊息
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf; /* 傳送的訊息資料*/
clusterMsg *hdr; /* 節點間通訊訊息 */
int gossipcount = 0;
int wanted; /* 選取的節點個數 */
int totlen; /* 總長度 */
// 叢集中節點的數量 - 2
int freshnodes = dictSize(server.cluster->nodes)-2;
// 叢集中節點的數量除以10
wanted = floor(dictSize(server.cluster->nodes)/10);
// 如果wanted小於3,則設定為3
if (wanted < 3) wanted = 3;
// 如果大於最大節點數,設定為freshnodes
if (wanted > freshnodes) wanted = freshnodes;
//...
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
buf = zcalloc(totlen); // 分配空間
hdr = (clusterMsg*) buf;
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
// 構建訊息頭
clusterBuildMessageHdr(hdr,type);
/* 計算 gossip */
int maxiterations = wanted*3;
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
// 隨機選取節點
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* 如果是自身 */
if (this == myself) continue;
/* 如果是FAIL狀態,跳過 */
if (this->flags & CLUSTER_NODE_PFAIL) continue;
/* 以下節點跳過:
* 1) 處於握手狀態的節點.
* 3) 沒有地址資訊的節點.
* 4) 失去連線的節點並且沒有配置slots.
*/
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* Technically not correct, but saves CPU. */
continue;
}
/* 如果節點已經新增 */
if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
/* 新增到訊息體中 */
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
gossipcount++;
}
// ...
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
// 傳送訊息
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
構建訊息頭
clusterBuildMessageHdr
clusterBuildMessageHdr函式用於構建訊息頭,設定了訊息傳送者的節點相關資訊:
- 設定了簽名、訊息型別、節點IP、埠等資訊
- 設定傳送訊息節點的slots資訊,如果傳送訊息的節點是從節點,需要使用它對應的主節點的slots資訊
- 計算叢集訊息的總長度totlen,並設定到訊息頭中
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
uint64_t offset;
clusterNode *master;
/* 如果是從節點, 使用它對應的主節點的資訊 */
master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;
memset(hdr,0,sizeof(*hdr));
hdr->ver = htons(CLUSTER_PROTO_VER);
// 設定簽名
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
// 設定訊息型別
hdr->type = htons(type);
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) {
// 設定ip
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}
/* 處理埠 */
int announced_port, announced_pport, announced_cport;
deriveAnnouncedPorts(&announced_port, &announced_pport, &announced_cport);
// 設定當前節點的slots資訊
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
// 設定埠
hdr->port = htons(announced_port);
hdr->pport = htons(announced_pport);
hdr->cport = htons(announced_cport);
// 設定標識
hdr->flags = htons(myself->flags);
// 設定叢集狀態
hdr->state = server.cluster->state;
/* 設定currentEpoch和configEpoch */
hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
hdr->configEpoch = htonu64(master->configEpoch);
/* 設定主從複製的offset. */
if (nodeIsSlave(myself))
offset = replicationGetSlaveOffset();
else
offset = server.master_repl_offset;
hdr->offset = htonu64(offset);
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
/* 計算訊息總長度 */
if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
// 設定訊息總長度
hdr->totlen = htonl(totlen);
}
構建訊息體
clusterSetGossipEntry
clusterSetGossipEntry函式用於構建訊息體,將隨機選取的其他節點資訊加入到ping訊息對應的陣列hdr->data.ping.gossip[i]中,並設定節點的相關資訊:
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
gossip = &(hdr->data.ping.gossip[i]);
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
// 設定PING訊息傳送時間
gossip->ping_sent = htonl(n->ping_sent/1000);
// 設定收到PONG訊息時間
gossip->pong_received = htonl(n->pong_received/1000);
// 設定IP
memcpy(gossip->ip,n->ip,sizeof(n->ip));
// 設定埠
gossip->port = htons(n->port);
// 設定叢集埠
gossip->cport = htons(n->cport);
// 設定標識
gossip->flags = htons(n->flags);
gossip->pport = htons(n->pport);
gossip->notused1 = 0;
}
PING訊息的處理
clusterReadHandler
由上面的clusterLinkConnectHandler函式可知,收到其他節點發送的通訊訊息時會呼叫clusterReadHandler函式處理,在clusterReadHandler函式中會開啟while迴圈,不斷讀取資料,直到獲取完整的資料(收到的資料長度rcvbuflen等於訊息中設定資料總長度時),呼叫clusterProcessPacket函式處理收到的訊息:
void clusterReadHandler(connection *conn) {
clusterMsg buf[1];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = connGetPrivateData(conn);
unsigned int readlen, rcvbuflen;
while(1) {
rcvbuflen = link->rcvbuf_len;
// 省略...
// 讀取資料
nread = connRead(conn,buf,readlen);
// 如果資料讀取完畢
if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return;
// 省略...
/* 如果已經獲取完整資料(rcvbuflen等於訊息中設定資料總長度),處理資料包 */
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
// 處理訊息
if (clusterProcessPacket(link)) {
if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
zfree(link->rcvbuf);
link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
}
link->rcvbuf_len = 0;
} else {
return;
}
}
}
}
clusterProcessPacket
clusterProcessPacket函式用於處理收到的通訊訊息,可以看到有許多if else分支,根據訊息型別的不同,進行了不同的處理,這裡先只關注PING訊息的處理:
- 如果訊息型別是PING或者MEET,呼叫clusterSendPing函式傳送PONG訊息,傳入的訊息型別為CLUSTERMSG_TYPE_PONG,說明PING和PONG訊息都是通過clusterSendPing函式實現的,PING和PONG訊息的資料結構一致,那麼回覆的PONG訊息中也會帶上回復者的節點資訊以及回覆者隨機選取的其他節點資訊,以此達到節點間交換資訊的目的
- 如果是PING, PONG或者MEET訊息,並且sender不為空,不為空表示傳送訊息的節點是當前節點已知的,呼叫clusterProcessGossipSection函式處理訊息體中的Gossip資料
int clusterProcessPacket(clusterLink *link) {
// 獲取傳送的訊息
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
// 訊息長度
uint32_t totlen = ntohl(hdr->totlen);
// 訊息型別
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();
uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender;
// 省略...
/* 校驗傳送者是否是已知的節點 */
sender = clusterLookupNode(hdr->sender);
/* 更是傳送者收到資料的時間*/
if (sender) sender->data_received = now;
// 省略...
/* 如果是PING訊息或者MEET訊息 */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
if (!sender && type == CLUSTERMSG_TYPE_MEET)
clusterProcessGossipSection(hdr,link);
/* 傳送PONG訊息,這裡傳入的型別是CLUSTERMSG_TYPE_PONG */
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}
/* PING, PONG, MEET 訊息 */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
// 省略...
/* 處理訊息體中的Gossip節點資料 */
if (sender) clusterProcessGossipSection(hdr,link);
} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
return 1;
}
clusterProcessGossipSection
clusterProcessGossipSection函式用於處理clusterMsg中的Gossip節點資訊g,它從叢集訊息中獲取Gossip節點資料,根據節點數量進行遍歷:
-
呼叫clusterLookupNode函式根據nodename從當前收到訊息的節點的叢集中查詢Gossip節點,查詢結果記為
node
- 如果
node
如果不為空,說明可以從當前節點的叢集中找到,Gossip節點針對當前節點是已知的,需要注意node指向的是當前收到訊息節點中維護的相同nodename的節點,g指向當前正在遍歷的gossip節點(sender傳送的訊息中攜帶gossip陣列),注意兩者的區別 - 如果
node
如果為空,說明Gossip節點針對當前節點是未知的,之前不在當前節點維護的叢集節點中
- 如果
-
如果
node
不為空,也就是當前收到訊息這個節點的叢集中已經存在node
節點,進行如下處理:(1)傳送訊息的節點
sender
是主節點時有以下兩種情況:- 如果
node
是FAIL或者PFAIL狀態,需要將node
新增到傳送訊息節點sender
的下線節點連結串列fail_reports中(clusterNodeAddFailureReport函式),並將node
標記為下線狀態(markNodeAsFailingIfNeeded函式) - 如果
node
不是FAIL或者PFAIL狀態,需要校驗node
是否已經在sender
的下線節點連結串列fail_reports中,如果在需要從中移除
(2)如果
node
節點不是FAIL、PFAIL、NOADDR狀態,並且node
的ip或者埠與g
指向的gossip節點中的ip或者埠不一致,需要更新node中的ip和埠 - 如果
-
如果
node
為空,說明之前不在當前節點維護的叢集節點中,如果gossip節點不處於NOADDR狀態並且不在nodes_black_list中,新建節點,加入到當前收到訊息的節點維護的叢集資料server.cluster中
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
// 獲取clusterMsgDataGossip資料
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
// 傳送訊息的節點
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
while(count--) {
// 獲取節點標識
uint16_t flags = ntohs(g->flags);
clusterNode *node;
sds ci;
if (server.verbosity == LL_DEBUG) {
ci = representClusterNodeFlags(sdsempty(), flags);
serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ntohs(g->cport),
ci);
sdsfree(ci);
}
/* 根據nodename查詢節點,node指向當前收到訊息節點中維護的節點*/
node = clusterLookupNode(g->nodename);
// 如果節點已知
if (node) {
/* 如果傳送者是主節點 */
if (sender && nodeIsMaster(sender) && node != myself) {
// 如果gossip節點是FAIL或者PFAIL狀態
if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
// 將gossip節點加入到sender下線節點連結串列fail_reports中
if (clusterNodeAddFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
// 標記節點下線
markNodeAsFailingIfNeeded(node);
} else {
// 校驗節點是否在下線節點連結串列fail_reports中,如果在需要移除恢復線上狀態
if (clusterNodeDelFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
/* 如果節點不是FAIL或者PFAIL狀態,並且node中記錄的ping傳送時間為0,並且node不在fail_reports中*/
if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
node->ping_sent == 0 &&
clusterNodeFailureReportsCount(node) == 0)
{
mstime_t pongtime = ntohl(g->pong_received);
pongtime *= 1000; /* 轉為毫秒 */
if (pongtime <= (server.mstime+500) &&
pongtime > node->pong_received)
{
node->pong_received = pongtime; // 更新收到pong訊息時間
}
}
/* 如果node節點不是FAIL、PFAIL、NOADDR狀態,並且node的ip或者埠與g節點中的ip或者埠不一致,需要更新node中的ip和埠 */
/* 需要注意node節點和g節點的區別,node節點是從當前收到訊息節點中根據節點id查詢到的節點,也就是接收者自己記錄的節點資訊 */
/* g指向當前在遍歷的那個gossip節點,也就是傳送者帶過來的節點資訊 */
if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
!(flags & CLUSTER_NODE_NOADDR) &&
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
(strcasecmp(node->ip,g->ip) ||
node->port != ntohs(g->port) ||
node->cport != ntohs(g->cport)))
{
if (node->link) freeClusterLink(node->link);
// 更新node節點中的埠、ip資訊
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->pport = ntohs(g->pport);
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
}
} else { // 如果節點未知
/* 如果節點不處於NOADDR狀態並且不在nodes_black_list中 */
if (sender &&
!(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterNode *node;
// 建立節點
node = createClusterNode(g->nodename, flags);
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->pport = ntohs(g->pport);
node->cport = ntohs(g->cport);
// 加入到當前節點維護的叢集server.cluster中
clusterAddNode(node);
}
}
/* 遍歷下一個節點 */
g++;
}
}
總結
參考
極客時間 - Redis原始碼剖析與實戰(蔣德鈞)
zhaiguanjie-Redis原始碼剖析
Redis版本:redis-6.2.5