1. 程式人生 > >redis cluster叢集的原始碼分析(1)

redis cluster叢集的原始碼分析(1)

對於cluster.c的原始碼分析,我將會分兩部分介紹。本文主要分析叢集通訊和通訊故障。

先大致歸納下cluster的主要函式

void clusterCron(void);//叢集的定時任務
int clusterProcessPacket(clusterLink *link);//訊息處理中心
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link);//根據訊息中的gossip資訊更新nodes裡的節點
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);//接收叢集中節點的連線
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);//讀處理函式
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask);//寫處理函式
void clusterSendPing(clusterLink *link, int type);//傳送ping,pong,meet訊息給指定節點,同時將gossip資訊(叢集其他節點的存活狀況)更新給指定的節點
void clusterSendFail(char *nodename);//傳送fail訊息給所有連線的節點
void clusterSendFailoverAuth(clusterNode *node);//傳送FailoverAuth訊息給指定節點,給指定節點投票
void clusterSendMFStart(clusterNode *node);//傳送MFStart訊息給指定節點
void clusterSendUpdate(clusterLink *link, clusterNode *node);//傳送關於node的update資訊給節點,去更新指定節點的槽資訊
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);//判斷是否需要給傳送投票,如果可以就給傳送者投票
void clusterHandleSlaveFailover(void);//處理failover
void clusterHandleSlaveMigration(int max_slaves);//處理slave遷移
void resetManualFailover(void);//在啟動或終止failover時,進行初始化
void clusterDoBeforeSleep(int flags);
clusterNode *clusterLookupNode(char *name);//根據節點名字在nodes中獲取節點
void clusterSetMaster(clusterNode *n);//將節點設定為master,如果自身是master,則轉變成slave
void clusterSetNodeAsMaster(clusterNode *n);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
clusterNode *createClusterNode(char *nodename, int flags);
int clusterAddNode(clusterNode *node);
void clusterDelNode(clusterNode *delnode);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
void clusterCloseAllSlots(void);

叢集通訊

       叢集中的各個節點通過傳送和接收資訊來進行通訊,我們把傳送訊息的節點叫做傳送者,接收訊息資訊節點

叫做接收者。傳送的資訊主要有以下九種:

1、MEET訊息:當傳送者接收到客戶端傳送的cluster meet命令時,傳送者會向接收者傳送meet訊息,請求接收加入到傳送者所在的叢集裡。

2、PING訊息:叢集裡用來檢測相應節點是否線上的訊息,每個節點預設每隔一秒從節點列表隨機選5個節點,然後對最長時間沒有收到PONG回覆的節點發送PING訊息。此外,如果上次收到某個節點的PONG訊息回覆的時間,距離當前時間超過了cluster-node-time選項設定的一半,那麼會對此節點發送PING訊息,這樣可以防止節點長期沒被隨機到,而導致長期沒有傳送PING檢測是否存活。

3、PONG訊息:當接收者收到傳送者發來的meet訊息或者ping訊息時,為了向傳送者確認這條meet訊息或ping訊息已到達,接收者向傳送者返回一條pong訊息。另外,一個節點也可以通過向叢集廣播自己的pong訊息來讓叢集中的其他節點立即重新整理關於這個節點的認識、。

4、FAIL訊息:當一個主節點a判斷另外一個主節點b已經進入fail狀態時,節點a向叢集廣播一條關於節點b的fail訊息,所有收到這條訊息的節點都會立即將節點b標記為已下線。

5、PUBLISH訊息:當節點接收到一個PUBLISH命令時,節點會執行這個命令,並向叢集廣播一條PUBLISH訊息,所有接收到PUBLISH訊息的節點都會執行相同的PUBLISH命令。

6、FAILOVER_AUTH_REQUEST訊息:當slave的master進入fail狀態,slave向叢集中的所有的節點發送訊息,但是隻有master才能給自己投票failover自己的maser。

7、FAILOVER_AUTH_ACK訊息:當master接收到FAILOVER_AUTH_REQUEST訊息,如果傳送者滿足投票條件且自己在當前紀元未投票就給它投票,返回FAILOVER_AUTH_ACK訊息.。

8、UPDATE訊息:當接收到ping、pong或meet訊息時,檢測到自己與傳送者slots不一致,且傳送的slots的紀元過時, 就傳送slots中紀元大於傳送者的節點資訊作為update訊息的內容給傳送者。

9、MFSTART訊息:當傳送者接收到客戶端傳送的cluster failover命令時,傳送者會向自己的master傳送MFSTART訊息,進行手動failover。

     clusterProcessPacket是cluster的訊息處理中心,來負責處理叢集中的MEET,PING,FAIL,PUBLISH,FAILOVER_AUTH_REQUEST,FAILOVER_AUTH_ACK,UPDATE,MFSTART這幾種訊息。(吐槽一下,clusterProcessPacket函式長達幾百行看著眼花繚亂)

int clusterProcessPacket(clusterLink *link) {
	……
    //檢查傳送者是否已知節點且是否處於handshake狀態
    sender = clusterLookupNode(hdr->sender);
    if (sender && !nodeInHandshake(sender)) {
        //根據訊息更新發送者在此節點的上的配置紀元資訊和此節點的當前紀元
        if (senderCurrentEpoch > server.cluster->currentEpoch)
            server.cluster->currentEpoch = senderCurrentEpoch;
        if (senderConfigEpoch > sender->configEpoch)
            sender->configEpoch = senderConfigEpoch;
        //主從複製的offset
        sender->repl_offset = ntohu64(hdr->offset);
        sender->repl_offset_time = mstime();
        //更新全域性的主從複製offset
        if (server.cluster->mf_end &&nodeIsSlave(myself) &&myself->slaveof == sender 
       &&hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&server.cluster->mf_master_offset == 0)
        {
            server.cluster->mf_master_offset = sender->repl_offset;
        }
    }
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
        //通過ping訊息和meet訊息中來設定myself的地址
        if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&server.cluster_announce_ip == NULL)
        {
            ……
            memcpy(myself->ip,ip,NET_IP_STR_LEN);
            ……
        }
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            //如果是meet訊息,為傳送者建立clusterNode結構,然後加入clusterState.nodes
            node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
		    ……
            clusterAddNode(node);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
        if (!sender && type == CLUSTERMSG_TYPE_MEET)//傳送者是個未知節點並且是meet訊息
            clusterProcessGossipSection(hdr,link);//將訊息gossip資訊中的節點更新到自己nodes字典中
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
    }
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||type == CLUSTERMSG_TYPE_MEET)
    {
        if (link->node) {
        	//傳送者還處於握手狀態
            if (nodeInHandshake(link->node)) {
                //如果我們已經有這個節點,根據訊息嘗試更新這個節點的地址
                if (sender) {                  	
                    if (nodeUpdateAddressIfNeeded(sender,link,hdr))
                    ……
                }                
                ……
            } else if (memcmp(link->node->name,hdr->sender,CLUSTER_NAMELEN) != 0)
            {//如果訊息中節點的名字和link中的名字不匹配,刪除link 
                ……            
                freeClusterLink(link);
                ……
            }
        }
        if (link->node && type == CLUSTERMSG_TYPE_PONG) {
            //收到PONG資訊,更新節點資訊,刪除CLUSTER_NODE_PFAIL或CLUSTER_NODE_FAIL狀態
            if (nodeTimedOut(link->node)) {
                link->node->flags &= ~CLUSTER_NODE_PFAIL;
            } else if (nodeFailed(link->node)) {
                clearNodeFailureIfNeeded(link->node);
            }
        }
        //檢查槽資訊是否一致
	    if (sender) {
            sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
            if (sender_master) {           	
                dirty_slots = memcmp(sender_master->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
            }
        }
        //傳送者是master,且槽資訊不一致,更新本地的槽資訊
        if (sender && nodeIsMaster(sender) && dirty_slots)
            clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
        //更新發送者過時的槽資訊
        if (sender && dirty_slots) {
            for (j = 0; j < CLUSTER_SLOTS; j++) {
                if (bitmapTestBit(hdr->myslots,j)) {
                  if (server.cluster->slots[j] == sender||server.cluster->slots[j] == NULL) continue;
                  if (server.cluster->slots[j]->configEpoch >senderConfigEpoch)
                  {   clusterSendUpdate(sender->link,server.cluster->slots[j]);
                        break;
                  }
                }
            }
        }
       	//處於槽資訊衝突
        if (sender &&nodeIsMaster(myself) && nodeIsMaster(sender) 
        &&senderConfigEpoch == myself->configEpoch)
        {
            clusterHandleConfigEpochCollision(sender);
        }
        //根據msg的gossip資訊更新nodes字典中的節點資訊(新增節點或者更新節點的線上狀態)
        if (sender) clusterProcessGossipSection(hdr,link);
    } else if (type == CLUSTERMSG_TYPE_FAIL) {
        clusterNode *failing;
        if (sender) {
            failing = clusterLookupNode(hdr->data.fail.about.nodename);
            if (failing &&!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
            {      //將傳送者節點更新FAIL
                failing->flags |= CLUSTER_NODE_FAIL;
                ……
            }
        } 
    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
        if (dictSize(server.pubsub_channels) ||listLength(server.pubsub_patterns))
        {    ……//向訂閱的使用者群發訊息
            pubsubPublishMessage(channel,message);
            ……    
        }
    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
        //處理FAILOVER_AUTH_REQUEST訊息,check是否可以給其投票    
        clusterSendFailoverAuthIfNeeded(sender,hdr);
    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
        //處理FAILOVER_AUTH_ACK訊息
        //傳送者是master,其節點有槽,並且當前紀元大於當前的票選紀元,投票成功
        ……
        server.cluster->failover_auth_count++;
    } else if (type == CLUSTERMSG_TYPE_MFSTART) {
        //當我是master,傳送者是我的slave,才可以啟動手動failover
        resetManualFailover();
        ……
    } else if (type == CLUSTERMSG_TYPE_UPDATE) {
        //根據訊息中的nodename更新對應節點的configEpoch,slots
        ……
        clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,hdr->data.update.nodecfg.slots);
    } 
    ……
}

        clusterProcessGossipSection主要是在接收到meet,pong訊息的時候,將訊息中的gossip資訊更新到自己的

nodes字典中,判斷是否有節點已經處於FAIL狀態。

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
    while(count--) {
        ……      
        node = clusterLookupNode(g->nodename);
        if (node) {
            //根據訊息中的clusterMsgDataGossip更新節點狀態
            if (sender && nodeIsMaster(sender) && node != myself) {
                if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
                    if (clusterNodeAddFailureReport(node,sender)) {//在這個節點的fail_reports里加入sender
                    }
                    markNodeAsFailingIfNeeded(node);//判斷在這個節點的fail_reports,將節點標記為fail
                } else {
                    if (clusterNodeDelFailureReport(node,sender)) {//在這個節點的fail_reports裡刪除sender
                    }
                }
            }
            ……
        } else {//節點不存在,開始handshake(主要是接收到meet訊息,與gossip中的節點進行handshake)
            if (sender &&!(flags & CLUSTER_NODE_NOADDR) &&!clusterBlacklistExists(g->nodename))
            {   
                clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
            }
        }
        g++;
    }
}


叢集通訊故障

     叢集故障檢測主要是在定時任務clusterCron定期向隨機節點和長期沒被隨機到的節點發送ping,然後根據返回

pong的時間判斷是疑似下線(PFAIL)。叢集通過ping和pong訊息將節點知道nodes的線上狀態傳播到其他節點。當

檢測到某個節點的fail_reports大於等於(server.cluster->size / 2) + 1時,標記這個節點為FAIL,然後廣播FAIL訊息到整

個叢集。slave中定時任務clusterCron檢測自己的master為FAIL,就啟動Failover。首先開始選舉,根據與master

的offset偏差進行排序決定誰優先請求被投票。投票完成後,票數大於等於(server.cluster->size / 2) + 1的節點成為

master,如果沒有大於(server.cluster->size / 2) + 1,等待下次投票。

       clusterCron是叢集的定時任務,根據server .cluster->nodes的節點狀態資訊作出相應的處理,起到監控作用。

1、對handshake的節點建立連線,同時刪除handshake超時的節點;

2、向隨機節點和now-pong_received>cluster_node_timeout/2的傳送ping訊息;

3、遍歷nodes檢查有沒有超時還沒返回pong的節點,然後標記為pfail的節點(之後通過gossip訊息將pfail的節點

     傳播給別的節點,clusterProcessGossipSection函式中更新gossip訊息,如果當某個節點的failreport超過

     (server.cluster->size / 2) + 1);

4、統計孤立master,判斷是否需要slave遷移,以避免孤立master fail沒有slave failover;

5、節點是slave,判斷是否進行手動failover或者failover;

void clusterCron(void) {
    ……
    di = dictGetSafeIterator(server.cluster->nodes);
    //遍歷nodes字典中的節點,檢查與節點的連線情況並處理
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
        //節點handshake超時,刪除節點
        if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
            clusterDelNode(node);
            continue;
        }//新加入字典的節點,建立連線,併發送ping或者meet
        if (node->link == NULL) {
            ……
            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,node->cport, NET_FIRST_BIND_ADDR);
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
            old_ping_sent = node->ping_sent;
            clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?CLUSTERMSG_TYPE_MEET 
                          : CLUSTERMSG_TYPE_PING);
            if (old_ping_sent) {
                node->ping_sent = old_ping_sent;
            }
            node->flags &= ~CLUSTER_NODE_MEET;
        }
    }
    //每隔一秒從節點列表隨機選5個節點,然後對最老回覆PONG的時間的節點發送PING訊息
    if (!(iteration % 10)) {
        int j;
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
                continue;
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        if (min_pong_node) {
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        ……
        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
            int okslaves = clusterCountNonFailingSlaves(node);
            if (okslaves == 0 && node->numslots > 0 &&
                node->flags & CLUSTER_NODE_MIGRATE_TO) 
            {   //記錄孤立master的數量
                orphaned_masters++;
            }//記錄最大可用slaves的數量
            if (okslaves > max_slaves) max_slaves = okslaves;
            if (nodeIsSlave(myself) && myself->slaveof == node)
                this_slaves = okslaves;//如果我是slave記錄我的master的可用slave數量
        }
        if (node->link && now - node->link->ctime >server.cluster_node_timeout &&node->ping_sent && 
            node->pong_received < node->ping_sent && now - node->ping_sent > server.cluster_node_timeout/2)
        {//當節點超過cluster_node_timeout/2的時間還沒回復pong,斷開連線
            freeClusterLink(node->link);
        }
        if (node->link &&node->ping_sent == 0 &&(now - node->pong_received) > server.cluster_node_timeout/2)
        {//如果距離上次收到PONG訊息回覆的時間超過cluster_node_timeout/2,傳送ping訊息
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }
        if (server.cluster->mf_end &&nodeIsMaster(myself) &&server.cluster->mf_slave == node &&node->link)
        {//給請求手動failover的slave傳送ping訊息
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }
        if (node->ping_sent == 0) continue;
        delay = now - node->ping_sent;
        if (delay > server.cluster_node_timeout) {
            //給節點發送ping訊息,超過cluster_node_timeout沒收到該節點的pong回覆,將節點標記為PFAIL
            if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
                node->flags |= CLUSTER_NODE_PFAIL;
                update_state = 1;
            }
        }
    }
    dictReleaseIterator(di);
    if (nodeIsSlave(myself) &&server.masterhost == NULL &&myself->slaveof &&nodeHasAddr(myself->slaveof))
    {//如果我是slave,而且我沒開啟主從複製,則開啟主從複製
        replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
    }
    manualFailoverCheckTimeout();//檢查手動failover是否超時
    if (nodeIsSlave(myself)) {
        clusterHandleManualFailover();//處理手動failover
        clusterHandleSlaveFailover();//處理failover
        //存在孤立master,且我所在的主從的OKslave最大同時大於,啟動從節點遷移
        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
            clusterHandleSlaveMigration(max_slaves);
    }
    if (update_state || server.cluster->state == CLUSTER_FAIL)
        clusterUpdateState();//更新叢集狀態
}

       clusterHandleSlaveFailover檢查master是否下線,然後從節點開始Failover,請求被投票獲取授權,獲取授權後提升為master。

void clusterHandleSlaveFailover(void) {
    ……
    if (nodeIsMaster(myself) ||myself->slaveof == NULL ||(!nodeFailed(myself->slaveof) && !manual_failover) ||myself->slaveof->numslots == 0)
    {//myself->slaveof的flag為FAIL,已經下線
        server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
        return;
    }
    //距離上次嘗試failover的時間>cluster_node_timeout*4,啟動failover
    if (auth_age > auth_retry_time) {
        server.cluster->failover_auth_time = mstime() +500 + random() % 500; 
        server.cluster->failover_auth_count = 0;
        server.cluster->failover_auth_sent = 0;
        server.cluster->failover_auth_rank = clusterGetSlaveRank();
        server.cluster->failover_auth_time +=server.cluster->failover_auth_rank * 1000;
	//根據offset的偏差排序,偏差越小可以優先請求被投票
        if (server.cluster->mf_end) {//手動failover,可以優先請求被投票
            server.cluster->failover_auth_time = mstime();
            server.cluster->failover_auth_rank = 0;
        }
        clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
        return;
    }//還沒請求被投票,向整個叢集請求被投票
    if (server.cluster->failover_auth_sent == 0) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        clusterRequestFailoverAuth();
        server.cluster->failover_auth_sent = 1;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_FSYNC_CONFIG);
        return; /* Wait for replies. */
    }
    //檢查自己得到票數是否達到needed_quorum
    if (server.cluster->failover_auth_count >= needed_quorum) {
        if (myself->configEpoch < server.cluster->failover_auth_epoch) {
            myself->configEpoch = server.cluster->failover_auth_epoch;
        }
        clusterFailoverReplaceYourMaster();//升級為主節點
    } 
}

	//根據offset的偏差排序,偏差越小可以優先請求被投票
        if (server.cluster->mf_end) {//手動failover,可以優先請求被投票
            server.cluster->failover_auth_time = mstime();
            server.cluster->failover_auth_rank = 0;
        }
        clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
        return;
    }//還沒請求被投票,向整個叢集請求被投票
    if (server.cluster->failover_auth_sent == 0) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        clusterRequestFailoverAuth();
        server.cluster->failover_auth_sent = 1;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_FSYNC_CONFIG);
        return; /* Wait for replies. */
    }
    //檢查自己得到票數是否達到needed_quorum
    if (server.cluster->failover_auth_count >= needed_quorum) {
        if (myself->configEpoch < server.cluster->failover_auth_epoch) {
            myself->configEpoch = server.cluster->failover_auth_epoch;
        }
        clusterFailoverReplaceYourMaster();//升級為主節點
    } 
}