用 Golang 快速實現 Paxos 分散式共識演算法
前文《理解 Paxos》只包含虛擬碼,幫助了理解但又不夠爽,既然現在都講究 Talk is cheap. Show me the code.這次就把文章中的虛擬碼用 Go 語言實現出來,希望能幫助各位朋友更直觀的感受 Paxos 論文中的細節。
但我們需要對演算法做一些簡化,有多簡單呢?我們不持久化儲存任何變數,並且用 chan直接代替 RPC 呼叫。
記得切換到 naive 分支。
定義相關結構體
我們定義 Proposer 如下:
type proposer struct {
// server id
id int
// the largest round number the server has seen
round int
// proposal number=(round number, serverID)
number int
// proposal value
value string
acceptors map[int]bool
net network
}
這些結構體成員都很容易理解,其中 acceptors我們主要用來儲存 Acceptors 的地址,以及記錄我們收到 Acceptor 的成功/失敗響應。
Acceptor 的結構體:
type acceptor struct {
// server id
id int
// the number of the proposal this server will accept, or 0 if it has never received a Prepare request
promiseNumber int
// the number of the last proposal the server has accepted, or 0 if it never accepted any.
acceptedNumber int
// the value from the most recent proposal the server has accepted, or if it has never accepted a proposal
acceptedValue string
learners int
net network
}
主要成員解釋都有註釋,簡單來說我們需要記錄三個資訊:
promiseNumber:承諾的提案編號
acceptedNumber:接受的提案編號
acceptedValue:接受的提案值
定義訊息結構體
訊息結構體定義了 Proposer 和 Acceptor 之間、Acceptor 和 Leaner 之間的通訊協議。最主要的還是 Paxos 的兩階段的四個訊息。
Phase 1 請求:提案編號
Phase 1 響應:如果有被 Accepted 的提案,返回提案編號和提案值
Phase 2 請求:提案編號和提案值
Phase 2 響應:Accepted 的提案編號和提案值
這樣看,我們的訊息結構體只需要提案編號和提案值,加上一個訊息型別,用來區分是哪個階段的訊息。訊息結構體定義在 message.go 檔案,具體如下:
// MsgType represents the type of a paxos phase.
type MsgType uint8
const (
Prepare MsgType=iota
Promise
Propose
Accept
)
type message struct {
tp MsgType
from int
to int
number int // proposal number
value string // proposal value
}
實現網路
網路上可以做的選擇和優化很多,但這裡為了保持簡單的原則,我們將網路定義成 interface。後面完全可以改成 RPC 或 API 等其它通訊方式來實現(沒錯,我已經實現了一個 Go RPC 的版本了)。
type network interface {
send(m message)
recv(timeout time.Duration) (message, bool)
}
接下里我們去實現 network 介面:
type Network struct {
queue map[int]chan message
}
func newNetwork(nodes ...int) *Network {
pn :=&Network{
queue: make(map[int]chan message, 0),
}
for _, a :=range nodes {
pn.queue[a]=make(chan message, 1024)
}
return pn
}
func (net *Network) send(m message) {
log.Printf("net: send %+v", m)
net.queue[m.to]
}
func (net *Network) recvFrom(from int, timeout time.Duration) (message, bool) {
select {
case m :=<-net.queue[from]:
log.Printf("net: recv %+v", m)
return m, true
case <-time.After(timeout):
return message{}, false
}
}
就是用 queue來記錄每個節點的chan,key 則是節點的 server id。
傳送訊息則將 Message傳送到目標節點的chan中,接受訊息直接從chan中讀取資料,並等待對應的超時時間。
不需要做其它網路地址、包相關的東西,所以非常簡單。具體在 network.go檔案。
實現單元測試
這個專案主要使用 go 單元測試來檢驗正確性,我們主要測試兩種場景:
TestSingleProposer(單個 Proposer)
TestTwoProposers(多個 Proposer)
測試程式碼通過執行 Paxos 後檢查 Chosen 返回的提案值是否符合預期。
實現演算法流程
按照角色將檔案分為 proposer.go, acceptor.go 和 learner.go,每個檔案都有一個 run函式來執行程式,run函式執行條件判斷,並在對應的階段執行對應的函式。
按照虛擬碼描述,我們很容易實現 Phase 1 和 Phase 2,把每個階段的請求響應都作為一個函式,我們一步步來看。
第一輪 Prepare RPCs 請求階段:
// Phase 1. (a) A proposer selects a proposal number n
// and sends a prepare request with number n to
// a majority of acceptors.
func (p *proposer) prepare message {
p.round++
p.number=pposalNumber
msg :=make(message, p.majority)
i :=0
for to :=range p.acceptors {
msg[i]=message{
tp: Prepare,
from: p.id,
to: to,
number: p.number,
}
i++
if i==p.majority {
break
}
}
return msg
}
// proposal number=(round number, serverID)
func (p *proposer) proposalNumber int {
return p.round<< 16 | p.id
}
Prepare 請求階段我們將 round+1 然後傳送給多數派 Acceptors。
注:這裡很多部落格和教程都會將 Prepare RPC 發給所有的Acceptors,6.824 的 paxos 實驗就將 RPC 傳送給所有 Acceptors。這裡保持和論文一致,只發送給 a majority of acceptors。
第一輪 Prepare RPCs 響應階段:
接下來在 acceptor.go檔案中處理請求:
func (a *acceptor) handlePrepare(args message) (message, bool) {
if amiseNumber >=args.number {
return message{}, false
}
amiseNumber=args.number
msg :=message{
tp: Promise,
from: a.id,
to: args.from,
number: a.acceptedNumber,
value: a.acceptedValue,
}
return msg, true
}
如果 args.number大於acceptormiseNumber,則承諾將不會接收編號小於args.number的提案(即amiseNumber=args.number)。如果之前有提案被 Accepted 的話,響應還應包含 a.acceptedNumber 和 a.acceptedValue。
否則忽略,返回 false。
第二輪 Accept RPCs 請求階段:
func (p *proposer) accept message {
msg :=make(message, p.majority)
i :=0
for to, ok :=range p.acceptors {
if ok {
msg[i]=message{
tp: Propose,
from: p.id,
to: to,
number: p.number,
value: p.value,
}
i++
}
if i==p.majority {
break
}
}
return msg
}
當 Proposer 收到超過半數 Acceptor 的響應後,Proposer 向多數派的 Acceptor 發起請求並帶上提案編號和提案值。
第二輪 Accept RPCs 響應階段:
func (a *acceptor) handleAccept(args message) bool {
number :=args.number
if number >=amiseNumber {
a.acceptedNumber=number
a.acceptedValue=args.value
amiseNumber=number
return true
}
return false
}
Acceptor 收到 Accept請求,在這期間如果 Acceptor 沒有對比 amiseNumber 更大的編號另行 Promise,則接受該提案。
別忘了:Learning a Chosen Value
在 Paxos 中有一個十分容易混淆的概念:Chosen Value 和 Accepted Value,但如果你看過論文,其實已經說得非常直接了。論文的 2.3 節 Learning a Chosen Value 開頭就說:
To learn that a value has been chosen, a learner must find out that a proposal has been accepted by a majority of acceptors.
所以 Acceptor 接受提案後,會將接受的提案廣播 Leaners,一旦 Leaners 收到超過半數的 Acceptors 的 Accepted 提案,我們就知道這個提案被 Chosen 了。
func (l *learner) chosen (message, bool) {
acceptCounts :=make(map[int]int)
acceptMsg :=make(map[int]message)
for _, accepted :=range l.acceptors {
if accepted.number !=0 {
acceptCounts[accepted.number]++
acceptMsg[accepted.number]=accepted
}
}
for n, count :=range acceptCounts {
if count >=l.majority {
return acceptMsg[n], true
}
}
return message{}, false
}
執行和測試
程式碼拉下來後,直接執行:
go test
寫在後面為什麼不用 mit 6.824 的課程程式碼?
之前我曾把 mit 6.824 的 Raft 答案推到自己的 Github,直到 2021 開課的時候 mit 的助教發郵件讓我將我的程式碼轉為 private,因為這樣會導致學習課程的人直接搜到程式碼,而無法保證作業獨立完成。
確實,實驗是計算機最不可或缺的環節,用 mit 6.824 2021 的 paxos 程式碼會導致很多學習者不去自己解決困難,直接上網搜程式碼,從而導致學習效果不好,違背了 mit 的初衷。
當然,你也可以說現在網上以及很容易搜到 6.824 的各種程式碼了,但出於之前 mit 助教的郵件,我不會將作業程式碼直接發出來。
未來計劃
實現一個完整的(包含網路和儲存的) Paxos
基於 Paxos 實現一個 Paxos KV 儲存
實現其它 Paxos 變種
歡迎各位朋友催更……
結語
本文程式碼在 Github 上,如本文有什麼遺漏或者不對之處,或者各位朋友有什麼新的想法,歡迎提 issue 討論。
技術原創及架構實踐文章,歡迎通過公眾號選單「聯絡我們」進行投稿。
高可用架構
改變網際網路的構建方式