1. 程式人生 > >etcd實現服務發現

etcd實現服務發現

### 前言 [etcd環境安裝與使用](https://bingjian-zhu.github.io/2020/05/09/etcd%E7%8E%AF%E5%A2%83%E5%AE%89%E8%A3%85%E4%B8%8E%E4%BD%BF%E7%94%A8/)文章中介紹了etcd的安裝及`v3 API`使用,本篇將介紹如何使用etcd實現服務發現功能。 ### 服務發現介紹 服務發現要解決的也是分散式系統中最常見的問題之一,即在同一個分散式叢集中的程序或服務,要如何才能找到對方並建立連線。本質上來說,服務發現就是想要了解叢集中是否有程序在監聽 udp 或 tcp 埠,並且通過名字就可以查詢和連線。 ![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200514171049345-955603950.png) 服務發現需要實現一下基本功能: * `服務註冊`:同一service的所有節點註冊到相同目錄下,節點啟動後將自己的資訊註冊到所屬服務的目錄中。 * `健康檢查`:服務節點定時進行健康檢查。註冊到服務目錄中的資訊設定一個較短的TTL,執行正常的服務節點每隔一段時間會去更新資訊的TTL ,從而達到健康檢查效果。 * `服務發現`:通過服務節點能查詢到服務提供外部訪問的 IP 和埠號。比如閘道器代理服務時能夠及時的發現服務中新增節點、丟棄不可用的服務節點。 接下來介紹如何使用etcd實現服務發現。 ### 服務註冊及健康檢查 根據etcd的`v3 API`,當啟動一個服務時候,我們把服務的地址寫進etcd,註冊服務。同時繫結租約(lease),並以續租約(keep leases alive)的方式檢測服務是否正常執行,從而實現健康檢查。 go程式碼實現: ```go package main import ( "context" "log" "time" "go.etcd.io/etcd/clientv3" ) //ServiceRegister 建立租約註冊服務 type ServiceRegister struct { cli *clientv3.Client //etcd client leaseID clientv3.LeaseID //租約ID //租約keepalieve相應chan keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse key string //key val string //value } //NewServiceRegister 新建註冊服務 func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } ser := &ServiceRegister{ cli: cli, key: key, val: val, } //申請租約設定時間keepalive if err := ser.putKeyWithLease(lease); err != nil { return nil, err } return ser, nil } //設定租約 func (s *ServiceRegister) putKeyWithLease(lease int64) error { //設定租約時間 resp, err := s.cli.Grant(context.Background(), lease) if err != nil { return err } //註冊服務並繫結租約 _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) if err != nil { return err } //設定續租 定期傳送需求請求 leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) if err != nil { return err } s.leaseID = resp.ID log.Println(s.leaseID) s.keepAliveChan = leaseRespChan log.Printf("Put key:%s val:%s success!", s.key, s.val) return nil } //ListenLeaseRespChan 監聽 續租情況 func (s *ServiceRegister) ListenLeaseRespChan() { for leaseKeepResp := range s.keepAliveChan { log.Println("續約成功", leaseKeepResp) } log.Println("關閉續租") } // Close 登出服務 func (s *ServiceRegister) Close() error { //撤銷租約 if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { return err } log.Println("撤銷租約") return s.cli.Close() } func main() { var endpoints = []string{"localhost:2379"} ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5) if err != nil { log.Fatalln(err) } //監聽續租相應chan go ser.ListenLeaseRespChan() select { // case <-time.After(20 * time.Second): // ser.Close() } } ``` 主動退出服務時,可以呼叫Close()方法,撤銷租約,從而登出服務。 ### 服務發現 根據etcd的`v3 API`,很容易想到使用`Watch`監視某類服務,通過`Watch`感知服務的`新增`,`修改`或`刪除`操作,修改服務列表。 ```go package main import ( "context" "log" "sync" "time" "github.com/coreos/etcd/mvcc/mvccpb" "go.etcd.io/etcd/clientv3" ) //ServiceDiscovery 服務發現 type ServiceDiscovery struct { cli *clientv3.Client //etcd client serverList map[string]string //服務列表 lock sync.Mutex } //NewServiceDiscovery 新建發現服務 func NewServiceDiscovery(endpoints []string) *ServiceDiscovery { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } return &ServiceDiscovery{ cli: cli, serverList: make(map[string]string), } } //WatchService 初始化服務列表和監視 func (s *ServiceDiscovery) WatchService(prefix string) error { //根據字首獲取現有的key resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return err } for _, ev := range resp.Kvs { s.SetServiceList(string(ev.Key), string(ev.Value)) } //監視字首,修改變更的server go s.watcher(prefix) return nil } //watcher 監聽字首 func (s *ServiceDiscovery) watcher(prefix string) { rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix()) log.Printf("watching prefix:%s now...", prefix) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: //修改或者新增 s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value)) case mvccpb.DELETE: //刪除 s.DelServiceList(string(ev.Kv.Key)) } } } } //SetServiceList 新增服務地址 func (s *ServiceDiscovery) SetServiceList(key, val string) { s.lock.Lock() defer s.lock.Unlock() s.serverList[key] = string(val) log.Println("put key :", key, "val:", val) } //DelServiceList 刪除服務地址 func (s *ServiceDiscovery) DelServiceList(key string) { s.lock.Lock() defer s.lock.Unlock() delete(s.serverList, key) log.Println("del key:", key) } //GetServices 獲取服務地址 func (s *ServiceDiscovery) GetServices() []string { s.lock.Lock() defer s.lock.Unlock() addrs := make([]string, 0) for _, v := range s.serverList { addrs = append(addrs, v) } return addrs } //Close 關閉服務 func (s *ServiceDiscovery) Close() error { return s.cli.Close() } func main() { var endpoints = []string{"localhost:2379"} ser := NewServiceDiscovery(endpoints) defer ser.Close() ser.WatchService("/web/") ser.WatchService("/gRPC/") for { select { case <-time.Tick(10 * time.Second): log.Println(ser.GetServices()) } } } ``` 執行: ``` #執行服務發現 $go run discovery.go watching prefix:/web/ now... put key : /web/node1 val:localhost:8000 [localhost:8000] #另一個終端執行服務註冊 $go run register.go Put key:/web/node1 val:localhost:8000 success! 續約成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7 續約成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7 ... ``` ### 總結 基於 Raft 演算法的 etcd 天生是一個強一致性高可用的服務儲存目錄,使用者可以在 etcd 中註冊服務,並且對註冊的服務設定key TTL,定時保持服務的心跳以達到監控健康狀態的效果。通過在 etcd 指定的主題下注冊的服務也能在對應的主題下查詢到。 為了確保連線,我們可以在每個服務機器上都部署一個 Proxy 模式的 etcd,這樣就可以確保能訪問 etcd 叢集的服務都能互相連線。 參考: * https://segmentfault.com/a/1190000020944777 * https://blog.csdn.net/blogsun/article/details/102861648 * https://www.infoq.cn/article/etcd-interpretation-application-scenario-implement-pri