1. 程式人生 > 其它 >簡單共識選主實現自定義分散式鎖

簡單共識選主實現自定義分散式鎖

一些任務只需要一個例項執行,由於高可用要求,需要多臺例項。那麼多例項通訊就成問題,而一些情況下環境比較苛刻,沒有元件可以借用,簡直為難老實人。

共識演算法

有多個例項想要達成共識,那麼可分為兩個陣營:拜占庭將軍問題和非拜占庭將軍問題。由於咱們環境比較苛刻,換句話就是都可信,那麼就是非拜占庭節點,降低思考難度。接著常見的分散式一致性協議有:

  • Paxos
  • Bully
  • Raft
  • Zab
  • Gossip

這裡 Paxos(難實現),Zab(不如Raft通用),這樣又可以縮減下分析目標。接著考慮Gossip,實際上很快,不過理論上可以暫時不一致,所以也移除。分析下bully的選舉:長者為尊,leader掛了,節點通知長者進行選舉,沒回應輪到自己。聽起來就簡單霸道易實現,缺點也能感覺到,長者加入退出都要觸發選舉,速度也有點慢。再看看歷史,Mongo以及ES早期都是用Bully,後期都轉成了類Raft,基本篩選完畢。

Raft簡介

Raft演算法選主中叢集各個節點的角色,一共有3中角色:

  • Leader: 為主節點,同一時刻只有一個Leader節點,負責整個叢集的節點間的協調和管理。
  • Candidate: 候選節點,只有角色為候選者的節點才可以被選為新的Leader,每個節點 都有可以成為候選者。
  • Follower: Leader的跟隨者,這個角色的時候不可以發起選主。

選舉流程:

  1. 初始化時,各個節點均為Follower狀態。
  2. 開始選主時,所有節點的Follower狀態轉為Candidate狀態,並向其他節點發送選主請求。
  3. 其他節點根據收到的選主請求的先後順序,進行回覆是否同意其成為主節點;每個節點只能投一張票。
  4. 發起選主的節點如果得到一半以上的投票,則會成為主節點,狀態變為Leader,其他幾點則會由Candidate轉為Follower狀態,此時Leader和Follower將保持心跳檢測。
  5. 如果Leader節點的任期到了,Leader則會降為Follower,進行新一輪選主。

總結與剪裁

咱們所有節點每個都是平等的,不存在狀態問題,因為任何時候任何節點都時可以當選主;那麼Raft中需要多數人的投票就可以參考bully演算法,通過任期與自身ID判定,且任期永遠有效。就這麼直接,那麼步驟3的回覆也可以省略,訊息結構體也只需要一種,結構如下:

  • Term 節點毛遂自薦時候的時間戳,當做任期
  • Current 節點發送心跳時候的時間
  • Id 節點ID

選舉流程:

  1. 初始化時,各個節點均為Follower狀態。
  2. 開始選主時,所有節點的Follower狀態轉為Candidate狀態,並向其他節點發送自身心跳。
  3. 其他節點收到心跳,對比自己心跳,Term比自身小,或者相同是ID比自身大,則節點降為Follower,不在傳送心跳。
  4. 當Candidate持續心跳有效期時間N內未收到其他節點的心跳,則晉升為Leader,週期傳送心跳。

新節點加入:

  1. 節點加入一個檢測週期後,收到有效心跳,則沉默為Follower,否則進入Candidate。
  2. 參考初始選舉步驟3,新加入任期一定更大,所以不會有波動。

Leader掛了:

  1. Follower節點經過多個檢測週期,直到有效期N失效後,各自進入Candidate,開始互發心跳。
  2. 參考初始選舉步驟3。

還有其他場景,咱們也理理:非Leader節點掛了不影響;Leader假死則迴歸那就是上任歸來,現任讓位;如果腦裂迴歸,那也是直接PK,誰老誰連任。這樣簡單的選擇,大部分場景都滿足了。

程式碼實現

RPC選擇

心跳帶有有效期,因此過期的沒意義,且Leader會一直髮,則丟了一兩個也行。總結下咱們採用UDP,邏輯更簡單,效率更高。第二個問題就是Socket Recv阻塞,有兩個簡單方法:丟個回撥函式,起個執行緒recv然後callback;第二個來個Queue,起個執行緒recv然後壓入。資源都是一個執行緒,不過Queue的話,框架只是資料讀取不處理,邏輯更清晰,所以咱們選擇後者。

import marshal
import socket
import threading


class PeerSocket(object):

    def __init__(self, endpoint, queue):
        self.endpoint = endpoint
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.queue = queue
        self.start()

    def listen(self):
        self.sock.bind(self.endpoint)
        while True:
            content = self.sock.recv(256)
            data = marshal.loads(content)
            self.queue.put(data)

    def send(self, data, addr):
        content = marshal.dumps(data)
        self.sock.sendto(content, addr)

    def start(self):
        task = threading.Thread(target=self.listen)
        task.setDaemon(True)
        task.start()

接著咱們設計需要共享的心跳以及狀態:

    def reset(self):
        self.heartbeat = {
            "term": float('inf'),
            "current_ts": float('-inf'),
            'id': -255
        }
        self.status = Role.follower.value

基於流程,咱們合併所有操作,得出主邏輯如下:

    def watch(self):
        while True:
            with self.lock:
                if self.status == Role.leader.value:
                    self.send_followers()
                elif self.status == Role.candidate.value:
                    self.recv_leader()
                    self.send_followers()
                else:
                    self.recv_leader()
                self.clean()
            time.sleep(0.5)

咱們也可以換個寫法,概括一下就是(通常在不影響效能的情況下,咱們更喜歡直白的寫法,後期回看好看懂):

  1. 當角色不是Follower,則需要傳送心跳
  2. 當角色不是Leader,則需要接收心跳
  3. 每個檢測週期,都是檢測心跳是否過期,更新自身狀態

這樣花費2個執行緒,少量程式碼,老實人也能在苛刻環境下實現節點高可用了。原始碼不多,參考地址: github