1. 程式人生 > 其它 >go-etcd客戶端操作

go-etcd客戶端操作

golang-etcd客戶端操作

關於golang-etcd的所有api介紹和使用demo,可以參見 https://pkg.go.dev/go.etcd.io/etcd/client/v3#pkg-overview

 

1. 獲取客戶端連線

func main() {
    config := clientv3.Config{
        Endpoints:   []string{"123.57.33.149:2379"},
        DialTimeout: 5 * time.Second,
    }

    // 獲取客戶端連線
    _, err := clientv3.New(config)
    if err !
= nil { fmt.Println(err) return } }

 

clientv3.Config包含的常用配置資訊及其釋義如下:

Endpoints []string etcd叢集ip+port
DialTimeout time.Duration 連線超時時間
DialKeepAliveTimeout time.Duration 客戶端keepalive超時時間
DialKeepAliveTime time.Duration 檢視是否正常服務
MaxCallSendMsgSize int 客戶端請求的最大位元組限制,預設2MiB
MaxCallRecvMsgSize int 客戶端響應的最大位元組限制,預設math.MaxInt32
Username string 使用者名稱
Password string 密碼

 

2. PUT操作

// 用於寫etcd的鍵值對
kv := clientv3.NewKV(client)

// PUT請求,clientv3.WithPrevKV()表示獲取上一個版本的kv
putResp, err := kv.Put(context.TODO(), "/cron/jobs/job1", "hello",clientv3.WithPrevKV())
if err != nil { fmt.Println(err) return } // 獲取版本號 fmt.Println("Revision:", putResp.Header.Revision) // 如果有上一個kv 返回kv的值 if putResp.PrevKv != nil { fmt.Println("PrevValue:", string(putResp.PrevKv.Value)) }

kv.Put()引數列表解釋:

  • 第一個引數為context,可以自己設定可取消和自動過期的context
  • 第二個引數為key
  • 第三個引數為value
  • 後面的引數可選,with開頭,支援多種功能,具體可以參考所有with的文件

 

3. GET操作

// 用於讀寫etcd的鍵值對
kv := clientv3.NewKV(client)

// 簡單的get操作
getResp, err := kv.Get(context.TODO(), "cron/jobs/job1", clientv3.WithCountOnly())
if err != nil {
    fmt.Println(err)
    return
}
fmt.Println(getResp.Count)

 

4. DELETE操作

// 用於寫etcd的鍵值對
kv := clientv3.NewKV(client)

// 讀取cron/jobs下的所有key
getResp, err := kv.Get(context.TODO(), "/cron/jobs", clientv3.WithPrefix())
if err != nil {
    fmt.Println(err)
    return
}

// 獲取目錄下所有key-value
fmt.Println(getResp.Kvs)

 

// 用於讀寫etcd的鍵值對
kv := clientv3.NewKV(client)

// 刪除指定kv
delResp, err := kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV())
if err != nil {
    fmt.Println(err)
    return
}

// 被刪除之前的value是什麼
if len(delResp.PrevKvs) != 0 {
    for _, kvpair := range delResp.PrevKvs {
        fmt.Println("delete:", string(kvpair.Key), string(kvpair.Value))
    }
}

// 刪除目錄下的所有key
delResp, err = kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix())
if err != nil {
    fmt.Println(err)
    return
}

// 刪除從這個key開始的後面的兩個key
delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1",clientv3.WithFromKey(), clientv3.WithLimit(2))
if err != nil {
    fmt.Println(err)
    return
}

 

5. watch操作

// 建立一個用於讀寫的kv
kv := clientv3.NewKV(client)

// 模擬etcd中kv的變化,每隔1s執行一次put-del操作
go func() {
    for {
        kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
        kv.Delete(context.TODO(), "/cron/jobs/job7")
        time.Sleep(time.Second * 1)
    }
}()

// 先get到當前的值,並監聽後續變化
getResp, err := kv.Get(context.TODO(), "/cron/jobs/job7")
if err != nil {
    fmt.Println(err)
    return
}

// 現在key是存在的
if len(getResp.Kvs) != 0 {
    fmt.Println("當前值:", string(getResp.Kvs[0].Value))
}

// 監聽的revision起點
watchStartRevision := getResp.Header.Revision + 1

// 建立一個watcher
watcher := clientv3.NewWatcher(client)

// 啟動監聽
fmt.Println("從這個版本開始監聽:", watchStartRevision)

// 設定5s的watch時間
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func() {
        cancelFunc()
})
watchRespChan := watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

// 得到kv的變化事件,從chan中取值
for watchResp := range watchRespChan {
    for _, event := range watchResp.Events { //.Events是一個切片
        switch event.Type {
        case mvccpb.PUT:
            fmt.Println("修改為:", string(event.Kv.Value),
                    "revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
        case mvccpb.DELETE:
            fmt.Println("刪除了:", "revision:", event.Kv.ModRevision)
        }
    }
}

watch操作的邏輯也很簡單,但是要注意確定從哪個版本開始監聽,全部監聽是沒有意義的

 

6. lease

// 用於申請租約
lease := clientv3.NewLease(client)

// 申請一個10s的租約
leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
if err != nil {
    fmt.Println(err)
    return
}

// 拿到租約的id
leaseID := leaseGrantResp.ID

// 自動續租
keepRespChan, err := lease.KeepAlive(context.TODO(), leaseID)
if err != nil {
    fmt.Println(err)
    return
}

// 處理續租應答的協程
go func() {
    select {
    case keepResp := <-keepRespChan:
        if keepRespChan == nil {
            fmt.Println("lease has expired")
            goto END
        } else {
            // 每秒會續租一次
            fmt.Println("收到自動續租應答", keepResp.ID)
        }
    }
END:
}()

// 用於讀寫etcd的鍵值對
kv := clientv3.NewKV(client)

// put一個key-value,關聯租約,實現10s後過期
// 防止程式宕機
putResp, err := kv.Put(context.TODO(), "/cron/lock/job1", "",
        clientv3.WithLease(leaseID))
if err != nil {
    fmt.Println(err)
    return
}
fmt.Println("put success", putResp.Header.Revision)

for {
    getResp, err := kv.Get(context.TODO(), "/cron/lock/job1")
    if err != nil {
        fmt.Println(err)
        return
    }
    if getResp.Count == 0 {
        fmt.Println("key-value is expired")
        return
    } else {
        fmt.Println(getResp.Kvs)
        time.Sleep(2 * time.Second)
    }
}

 

7. operator

operator和普通的put、get等方法類似,有點像mysql裡面的prepare,又封裝了一層,在實現etcd分散式鎖的時候有一些幫助

kv := clientv3.NewKV(client)

// 建立putop
putOp := clientv3.OpPut("/cron/jobs/job7", "")

// 執行op
opResp, err := kv.Do(context.TODO(), putOp)
if err != nil {
    fmt.Println(err)
    return
}

fmt.Println("寫入的revision:", opResp.Put().Header.Revision)

// 建立getOp
getOp := clientv3.OpGet("/cron/jobs/job7")

// 執行op
getResp, err := kv.Do(context.TODO(), getOp)
if err != nil {
    fmt.Println(err)
    return
}
fmt.Println("revision:", getResp.Get().Kvs[0].ModRevision)
fmt.Println("取到的值為:", getResp.Get().Kvs[0].Value)