JAVA 各種工具方法大全
技術標籤:分散式
推薦閱讀:
通用的CP系統有etcd和consul, 通用的對立面就是專用系統. 所以在某些場合是有這種需求的.
然而etcd embed的可用性極差, Windows上面跑會出現各種問題, 而且不能定製協議, 你必須得用etcd定義好的協議和客戶端來和etcd叢集通訊. 所以這時候的選擇:
1. 忍著
2. 自己實現一個raft演算法庫, 在這上面做應用
有一定的可能性, 起碼MIT 6.824可以做出來, 但是和工業應用還是有很大的差距
3. 找一個工業級raft庫, 然後在這上面做應用
這時候到Raft Consensus Algorithm上面看看就能找到幾個可選的Raft演算法庫, 例如braft,hashicorp/raft,lni/dragonboat.
但是呢, C++程式碼比較難寫的, 所以就pass掉了braft. 就剩下consul raft和dragonboat.
本文就用consul raft做一個簡單的KeyValue服務.
首先前端用的gin, 提供put/get/inc/delete幾個介面, 三個介面都走raft狀態機, 因為要支援多節點, 所以內部非leader節點就需要把請求轉發給leader節點.
前端的程式碼類似於這樣:
func (this *ApiService) Start() error { //轉發請求給leader節點 this.router.Use(this.proxyHandler()) this.router.POST("/get", this.Get) this.router.POST("/put", this.Put) this.router.POST("/delete", this.Delete) this.router.POST("/inc", this.Inc) address := fmt.Sprintf(":%d", this.port) return this.router.Run(address) }
請求都很簡單, 就是直接把命令, 或者叫服務提供的原語塞到Raft狀態機裡面等候Raft狀態Apply, 然後才能拿到結果(future/promise模式), 例如put命令:
func (this *ApiService) Put(ctx *gin.Context) {
req := &Request{}
if err := ctx.ShouldBindJSON(req); err != nil {
ctx.JSON(http.StatusBadRequest, Response{
Error: err.Error(),
})
return
}
result, err := this.raft.ApplyCommand(raft.CommandPut, req.Key, req.Value)
if err != nil {
ctx.JSON(http.StatusInternalServerError, Response{
Error: err.Error(),
})
return
}
ctx.JSON(http.StatusOK, Response{
Value: result.Value,
})
}
前端還有一個轉發請求到leader節點的攔截器(? 應該叫這個名字, 實際上是pipeline模式的一種)
func (this *ApiService) proxyHandler() gin.HandlerFunc {
return func(context *gin.Context) {
if this.raft.IsLeader() {
context.Next()
} else {
leaderServiceAddress := this.raft.GetLeaderServiceAddress()
if this.leaderServiceAddress != leaderServiceAddress {
Director := func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = leaderServiceAddress
}
this.leaderProxy = &httputil.ReverseProxy{
Director: Director,
}
this.leaderServiceAddress = leaderServiceAddress
}
this.leaderProxy.ServeHTTP(context.Writer, context.Request)
context.Abort()
}
}
}
下面是對協議的處理:
func (this *FSM) Apply(log *raft.Log) interface{} {
result := &FSMApplyResult{
Success: false,
}
t, cmd, err := raftLogToCommand(log)
if err != nil {
result.Error = err
return result
}
binary.LittleEndian.PutUint64(keyCache, uint64(cmd.Key))
binary.LittleEndian.PutUint64(valueCache, uint64(cmd.Value))
switch t {
case CommandPut:
result.Success, result.Error = this.add(keyCache, valueCache)
case CommandDelete:
result.Success, result.Error = this.delete(keyCache)
case CommandGet:
result.Value, result.Error = this.get(keyCache)
case CommandInc:
result.Value, result.Error = this.inc(keyCache, cmd.Value)
}
return result
}
輸入給Raft狀態的命令實際上都是序列化好的, Raft狀態機會自己把命令儲存到Storage裡面(可以是記憶體, 也可以是磁碟/DB等). 所以Apply命令的時候, 先對raft log進行解碼, 然後switch去處理.
這邊再看看例如inc的處理:
func (this *FSM) inc(key []byte, add int64) (int64, error) {
var value int64 = 0
err := this.db.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(BBoltBucket)
if err != nil {
return err
}
valueBytes := b.Get(key)
if len(valueBytes) != 8 {
logging.Errorf("FSM.inc, key:%d, value length:%d, Reset",
int64(binary.LittleEndian.Uint64(key)), len(valueBytes))
valueBytes = make([]byte, 8)
}
value = int64(binary.LittleEndian.Uint64(valueBytes))
value += add
binary.LittleEndian.PutUint64(valueBytes, uint64(value))
err = b.Put(key, valueBytes)
return err
})
if err != nil {
return -1, err
}
return value, err
}
這個指令稍微複雜一點, 需要先到db裡面去找, 找到的話, 再加一個N, 然後儲存, 然後返回新的值. 因為raft狀態機apply log的時候, 是順序的, 所以不需要加鎖啥的, inc本身就是原子的.
至此一個簡單的分散式KeyValue服務就實現, 而且還是一個CP系統.
當然這只是一個demo, 實際的應用遠遠比這個複雜, 本文只是提供一種思路.
不必非要把自己綁死在Etcd上, 條條大路通羅馬. 如果你的系統只需要提供有限的操作原理, 那麼是可以考慮Consul Raft或者DragonBoat來製作自定義協議的CP服務. 螞蟻的SOFARaft也可以幹這種事.