go-etcd客戶端操作
阿新 • • 發佈:2022-03-15
golang-etcd客戶端操作
關於golang-etcd的所有api介紹和使用demo,可以參見 https://pkg.go.dev/go.etcd.io/etcd/client/v3#pkg-overview
1. 獲取客戶端連線
func main() { config := clientv3.Config{ Endpoints: []string{"123.57.33.149:2379"}, DialTimeout: 5 * time.Second, } // 獲取客戶端連線 _, err := clientv3.New(config) if err != nil { fmt.Println(err) return } }
clientv3.Config包含的常用配置資訊及其釋義如下:
Endpoints | []string | etcd叢集ip+port |
DialTimeout | time.Duration | 連線超時時間 |
DialKeepAliveTimeout | time.Duration | 客戶端keepalive超時時間 |
DialKeepAliveTime | time.Duration | 檢視是否正常服務 |
MaxCallSendMsgSize | int | 客戶端請求的最大位元組限制,預設2MiB |
MaxCallRecvMsgSize | int | 客戶端響應的最大位元組限制,預設math.MaxInt32 |
Username | string | 使用者名稱 |
Password | string | 密碼 |
2. PUT操作
// 用於寫etcd的鍵值對 kv := clientv3.NewKV(client) // PUT請求,clientv3.WithPrevKV()表示獲取上一個版本的kv putResp, err := kv.Put(context.TODO(), "/cron/jobs/job1", "hello",clientv3.WithPrevKV())if err != nil { fmt.Println(err) return } // 獲取版本號 fmt.Println("Revision:", putResp.Header.Revision) // 如果有上一個kv 返回kv的值 if putResp.PrevKv != nil { fmt.Println("PrevValue:", string(putResp.PrevKv.Value)) }
kv.Put()引數列表解釋:
- 第一個引數為context,可以自己設定可取消和自動過期的context
- 第二個引數為key
- 第三個引數為value
- 後面的引數可選,with開頭,支援多種功能,具體可以參考所有with的文件
3. GET操作
// 用於讀寫etcd的鍵值對 kv := clientv3.NewKV(client) // 簡單的get操作 getResp, err := kv.Get(context.TODO(), "cron/jobs/job1", clientv3.WithCountOnly()) if err != nil { fmt.Println(err) return } fmt.Println(getResp.Count)
4. DELETE操作
// 用於寫etcd的鍵值對 kv := clientv3.NewKV(client) // 讀取cron/jobs下的所有key getResp, err := kv.Get(context.TODO(), "/cron/jobs", clientv3.WithPrefix()) if err != nil { fmt.Println(err) return } // 獲取目錄下所有key-value fmt.Println(getResp.Kvs)
// 用於讀寫etcd的鍵值對 kv := clientv3.NewKV(client) // 刪除指定kv delResp, err := kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()) if err != nil { fmt.Println(err) return } // 被刪除之前的value是什麼 if len(delResp.PrevKvs) != 0 { for _, kvpair := range delResp.PrevKvs { fmt.Println("delete:", string(kvpair.Key), string(kvpair.Value)) } } // 刪除目錄下的所有key delResp, err = kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()) if err != nil { fmt.Println(err) return } // 刪除從這個key開始的後面的兩個key delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1",clientv3.WithFromKey(), clientv3.WithLimit(2)) if err != nil { fmt.Println(err) return }
5. watch操作
// 建立一個用於讀寫的kv kv := clientv3.NewKV(client) // 模擬etcd中kv的變化,每隔1s執行一次put-del操作 go func() { for { kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7") kv.Delete(context.TODO(), "/cron/jobs/job7") time.Sleep(time.Second * 1) } }() // 先get到當前的值,並監聽後續變化 getResp, err := kv.Get(context.TODO(), "/cron/jobs/job7") if err != nil { fmt.Println(err) return } // 現在key是存在的 if len(getResp.Kvs) != 0 { fmt.Println("當前值:", string(getResp.Kvs[0].Value)) } // 監聽的revision起點 watchStartRevision := getResp.Header.Revision + 1 // 建立一個watcher watcher := clientv3.NewWatcher(client) // 啟動監聽 fmt.Println("從這個版本開始監聽:", watchStartRevision) // 設定5s的watch時間 ctx, cancelFunc := context.WithCancel(context.TODO()) time.AfterFunc(5*time.Second, func() { cancelFunc() }) watchRespChan := watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision)) // 得到kv的變化事件,從chan中取值 for watchResp := range watchRespChan { for _, event := range watchResp.Events { //.Events是一個切片 switch event.Type { case mvccpb.PUT: fmt.Println("修改為:", string(event.Kv.Value), "revision:", event.Kv.CreateRevision, event.Kv.ModRevision) case mvccpb.DELETE: fmt.Println("刪除了:", "revision:", event.Kv.ModRevision) } } }
watch操作的邏輯也很簡單,但是要注意確定從哪個版本開始監聽,全部監聽是沒有意義的
6. lease
// 用於申請租約 lease := clientv3.NewLease(client) // 申請一個10s的租約 leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s if err != nil { fmt.Println(err) return } // 拿到租約的id leaseID := leaseGrantResp.ID // 自動續租 keepRespChan, err := lease.KeepAlive(context.TODO(), leaseID) if err != nil { fmt.Println(err) return } // 處理續租應答的協程 go func() { select { case keepResp := <-keepRespChan: if keepRespChan == nil { fmt.Println("lease has expired") goto END } else { // 每秒會續租一次 fmt.Println("收到自動續租應答", keepResp.ID) } } END: }() // 用於讀寫etcd的鍵值對 kv := clientv3.NewKV(client) // put一個key-value,關聯租約,實現10s後過期 // 防止程式宕機 putResp, err := kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseID)) if err != nil { fmt.Println(err) return } fmt.Println("put success", putResp.Header.Revision) for { getResp, err := kv.Get(context.TODO(), "/cron/lock/job1") if err != nil { fmt.Println(err) return } if getResp.Count == 0 { fmt.Println("key-value is expired") return } else { fmt.Println(getResp.Kvs) time.Sleep(2 * time.Second) } }
7. operator
operator和普通的put、get等方法類似,有點像mysql裡面的prepare,又封裝了一層,在實現etcd分散式鎖的時候有一些幫助
kv := clientv3.NewKV(client) // 建立putop putOp := clientv3.OpPut("/cron/jobs/job7", "") // 執行op opResp, err := kv.Do(context.TODO(), putOp) if err != nil { fmt.Println(err) return } fmt.Println("寫入的revision:", opResp.Put().Header.Revision) // 建立getOp getOp := clientv3.OpGet("/cron/jobs/job7") // 執行op getResp, err := kv.Do(context.TODO(), getOp) if err != nil { fmt.Println(err) return } fmt.Println("revision:", getResp.Get().Kvs[0].ModRevision) fmt.Println("取到的值為:", getResp.Get().Kvs[0].Value)