1. 程式人生 > 實用技巧 >JAVA 各種工具方法大全

JAVA 各種工具方法大全

技術標籤:分散式

推薦閱讀:

通用的CP系統有etcd和consul, 通用的對立面就是專用系統. 所以在某些場合是有這種需求的.

然而etcd embed的可用性極差, Windows上面跑會出現各種問題, 而且不能定製協議, 你必須得用etcd定義好的協議和客戶端來和etcd叢集通訊. 所以這時候的選擇:

1. 忍著

2. 自己實現一個raft演算法庫, 在這上面做應用

有一定的可能性, 起碼MIT 6.824可以做出來, 但是和工業應用還是有很大的差距

3. 找一個工業級raft庫, 然後在這上面做應用

這時候到Raft Consensus Algorithm上面看看就能找到幾個可選的Raft演算法庫, 例如braft,hashicorp/raft,lni/dragonboat.

但是呢, C++程式碼比較難寫的, 所以就pass掉了braft. 就剩下consul raft和dragonboat.

本文就用consul raft做一個簡單的KeyValue服務.

首先前端用的gin, 提供put/get/inc/delete幾個介面, 三個介面都走raft狀態機, 因為要支援多節點, 所以內部非leader節點就需要把請求轉發給leader節點.

前端的程式碼類似於這樣:

func (this *ApiService) Start() error {
        //轉發請求給leader節點
    this.router.Use(this.proxyHandler())
 
    this.router.POST("/get", this.Get)
    this.router.POST("/put", this.Put)
    this.router.POST("/delete", this.Delete)
    this.router.POST("/inc", this.Inc)
 
    address := fmt.Sprintf(":%d", this.port)
    return this.router.Run(address)
}

請求都很簡單, 就是直接把命令, 或者叫服務提供的原語塞到Raft狀態機裡面等候Raft狀態Apply, 然後才能拿到結果(future/promise模式), 例如put命令:

func (this *ApiService) Put(ctx *gin.Context) {
    req := &Request{}
    if err := ctx.ShouldBindJSON(req); err != nil {
        ctx.JSON(http.StatusBadRequest, Response{
            Error: err.Error(),
        })
        return
    }
    result, err := this.raft.ApplyCommand(raft.CommandPut, req.Key, req.Value)
    if err != nil {
        ctx.JSON(http.StatusInternalServerError, Response{
            Error: err.Error(),
        })
        return
    }
    ctx.JSON(http.StatusOK, Response{
        Value: result.Value,
    })
}

前端還有一個轉發請求到leader節點的攔截器(? 應該叫這個名字, 實際上是pipeline模式的一種)

func (this *ApiService) proxyHandler() gin.HandlerFunc {
    return func(context *gin.Context) {
        if this.raft.IsLeader() {
            context.Next()
        } else {
            leaderServiceAddress := this.raft.GetLeaderServiceAddress()
            if this.leaderServiceAddress != leaderServiceAddress {
                Director := func(req *http.Request) {
                    req.URL.Scheme = "http"
                    req.URL.Host = leaderServiceAddress
                }
                this.leaderProxy = &httputil.ReverseProxy{
                    Director: Director,
                }
                this.leaderServiceAddress = leaderServiceAddress
            }
            this.leaderProxy.ServeHTTP(context.Writer, context.Request)
            context.Abort()
        }
    }
}

下面是對協議的處理:

func (this *FSM) Apply(log *raft.Log) interface{} {
    result := &FSMApplyResult{
        Success: false,
    }
    t, cmd, err := raftLogToCommand(log)
    if err != nil {
        result.Error = err
        return result
    }
    binary.LittleEndian.PutUint64(keyCache, uint64(cmd.Key))
    binary.LittleEndian.PutUint64(valueCache, uint64(cmd.Value))
    switch t {
    case CommandPut:
        result.Success, result.Error = this.add(keyCache, valueCache)
    case CommandDelete:
        result.Success, result.Error = this.delete(keyCache)
    case CommandGet:
        result.Value, result.Error = this.get(keyCache)
    case CommandInc:
        result.Value, result.Error = this.inc(keyCache, cmd.Value)
    }
    return result
}

輸入給Raft狀態的命令實際上都是序列化好的, Raft狀態機會自己把命令儲存到Storage裡面(可以是記憶體, 也可以是磁碟/DB等). 所以Apply命令的時候, 先對raft log進行解碼, 然後switch去處理.

這邊再看看例如inc的處理:

func (this *FSM) inc(key []byte, add int64) (int64, error) {
    var value int64 = 0
    err := this.db.Update(func(tx *bbolt.Tx) error {
        b, err := tx.CreateBucketIfNotExists(BBoltBucket)
        if err != nil {
            return err
        }
        valueBytes := b.Get(key)
        if len(valueBytes) != 8 {
            logging.Errorf("FSM.inc, key:%d, value length:%d, Reset",
                int64(binary.LittleEndian.Uint64(key)), len(valueBytes))
            valueBytes = make([]byte, 8)
        }
        value = int64(binary.LittleEndian.Uint64(valueBytes))
        value += add
        binary.LittleEndian.PutUint64(valueBytes, uint64(value))
        err = b.Put(key, valueBytes)
        return err
    })
    if err != nil {
        return -1, err
    }
    return value, err
}

這個指令稍微複雜一點, 需要先到db裡面去找, 找到的話, 再加一個N, 然後儲存, 然後返回新的值. 因為raft狀態機apply log的時候, 是順序的, 所以不需要加鎖啥的, inc本身就是原子的.

至此一個簡單的分散式KeyValue服務就實現, 而且還是一個CP系統.

當然這只是一個demo, 實際的應用遠遠比這個複雜, 本文只是提供一種思路.

不必非要把自己綁死在Etcd上, 條條大路通羅馬. 如果你的系統只需要提供有限的操作原理, 那麼是可以考慮Consul Raft或者DragonBoat來製作自定義協議的CP服務. 螞蟻的SOFARaft也可以幹這種事.