簡單共識選主實現自定義分散式鎖
一些任務只需要一個例項執行,由於高可用要求,需要多臺例項。那麼多例項通訊就成問題,而一些情況下環境比較苛刻,沒有元件可以借用,簡直為難老實人。
共識演算法
有多個例項想要達成共識,那麼可分為兩個陣營:拜占庭將軍問題和非拜占庭將軍問題。由於咱們環境比較苛刻,換句話就是都可信,那麼就是非拜占庭節點,降低思考難度。接著常見的分散式一致性協議有:
- Paxos
- Bully
- Raft
- Zab
- Gossip
這裡 Paxos(難實現),Zab(不如Raft通用),這樣又可以縮減下分析目標。接著考慮Gossip,實際上很快,不過理論上可以暫時不一致,所以也移除。分析下bully的選舉:長者為尊,leader掛了,節點通知長者進行選舉,沒回應輪到自己。聽起來就簡單霸道易實現,缺點也能感覺到,長者加入退出都要觸發選舉,速度也有點慢。再看看歷史,Mongo以及ES早期都是用Bully,後期都轉成了類Raft,基本篩選完畢。
Raft簡介
Raft演算法選主中叢集各個節點的角色,一共有3中角色:
- Leader: 為主節點,同一時刻只有一個Leader節點,負責整個叢集的節點間的協調和管理。
- Candidate: 候選節點,只有角色為候選者的節點才可以被選為新的Leader,每個節點 都有可以成為候選者。
- Follower: Leader的跟隨者,這個角色的時候不可以發起選主。
選舉流程:
- 初始化時,各個節點均為Follower狀態。
- 開始選主時,所有節點的Follower狀態轉為Candidate狀態,並向其他節點發送選主請求。
- 其他節點根據收到的選主請求的先後順序,進行回覆是否同意其成為主節點;每個節點只能投一張票。
- 發起選主的節點如果得到一半以上的投票,則會成為主節點,狀態變為Leader,其他幾點則會由Candidate轉為Follower狀態,此時Leader和Follower將保持心跳檢測。
- 如果Leader節點的任期到了,Leader則會降為Follower,進行新一輪選主。
總結與剪裁
咱們所有節點每個都是平等的,不存在狀態問題,因為任何時候任何節點都時可以當選主;那麼Raft中需要多數人的投票就可以參考bully演算法,通過任期與自身ID判定,且任期永遠有效。就這麼直接,那麼步驟3的回覆也可以省略,訊息結構體也只需要一種,結構如下:
- Term 節點毛遂自薦時候的時間戳,當做任期
- Current 節點發送心跳時候的時間
- Id 節點ID
選舉流程:
- 初始化時,各個節點均為Follower狀態。
- 開始選主時,所有節點的Follower狀態轉為Candidate狀態,並向其他節點發送自身心跳。
- 其他節點收到心跳,對比自己心跳,Term比自身小,或者相同是ID比自身大,則節點降為Follower,不在傳送心跳。
- 當Candidate持續心跳有效期時間N內未收到其他節點的心跳,則晉升為Leader,週期傳送心跳。
新節點加入:
- 節點加入一個檢測週期後,收到有效心跳,則沉默為Follower,否則進入Candidate。
- 參考初始選舉步驟3,新加入任期一定更大,所以不會有波動。
Leader掛了:
- Follower節點經過多個檢測週期,直到有效期N失效後,各自進入Candidate,開始互發心跳。
- 參考初始選舉步驟3。
還有其他場景,咱們也理理:非Leader節點掛了不影響;Leader假死則迴歸那就是上任歸來,現任讓位;如果腦裂迴歸,那也是直接PK,誰老誰連任。這樣簡單的選擇,大部分場景都滿足了。
程式碼實現
RPC選擇
心跳帶有有效期,因此過期的沒意義,且Leader會一直髮,則丟了一兩個也行。總結下咱們採用UDP,邏輯更簡單,效率更高。第二個問題就是Socket Recv阻塞,有兩個簡單方法:丟個回撥函式,起個執行緒recv然後callback;第二個來個Queue,起個執行緒recv然後壓入。資源都是一個執行緒,不過Queue的話,框架只是資料讀取不處理,邏輯更清晰,所以咱們選擇後者。
import marshal
import socket
import threading
class PeerSocket(object):
def __init__(self, endpoint, queue):
self.endpoint = endpoint
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.queue = queue
self.start()
def listen(self):
self.sock.bind(self.endpoint)
while True:
content = self.sock.recv(256)
data = marshal.loads(content)
self.queue.put(data)
def send(self, data, addr):
content = marshal.dumps(data)
self.sock.sendto(content, addr)
def start(self):
task = threading.Thread(target=self.listen)
task.setDaemon(True)
task.start()
接著咱們設計需要共享的心跳以及狀態:
def reset(self):
self.heartbeat = {
"term": float('inf'),
"current_ts": float('-inf'),
'id': -255
}
self.status = Role.follower.value
基於流程,咱們合併所有操作,得出主邏輯如下:
def watch(self):
while True:
with self.lock:
if self.status == Role.leader.value:
self.send_followers()
elif self.status == Role.candidate.value:
self.recv_leader()
self.send_followers()
else:
self.recv_leader()
self.clean()
time.sleep(0.5)
咱們也可以換個寫法,概括一下就是(通常在不影響效能的情況下,咱們更喜歡直白的寫法,後期回看好看懂):
- 當角色不是Follower,則需要傳送心跳
- 當角色不是Leader,則需要接收心跳
- 每個檢測週期,都是檢測心跳是否過期,更新自身狀態
這樣花費2個執行緒,少量程式碼,老實人也能在苛刻環境下實現節點高可用了。原始碼不多,參考地址: github