gRPC負載均衡(客戶端負載均衡)
阿新 • • 發佈:2020-05-18
### 前言
[上篇](https://bingjian-zhu.github.io/2020/05/14/etcd%E5%AE%9E%E7%8E%B0%E6%9C%8D%E5%8A%A1%E5%8F%91%E7%8E%B0/)介紹瞭如何使用`etcd`實現服務發現,本篇將基於etcd的服務發現前提下,介紹如何實現gRPC客戶端負載均衡。
### gRPC負載均衡
gRPC官方文件提供了關於gRPC負載均衡方案[Load Balancing in gRPC](https://github.com/grpc/grpc/blob/master/doc/load-balancing.md),此方案是為gRPC設計的,下面我們對此進行分析。
#### 1、對每次呼叫進行負載均衡
gRPC中的負載平衡是以每次呼叫為基礎,而不是以每個連線為基礎。換句話說,即使所有的請求都來自一個客戶端,我們仍希望它們在所有的伺服器上實現負載平衡。
#### 2、負載均衡的方法
* `集中式`(Proxy Model)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518153536494-684598725.png)
在服務消費者和服務提供者之間有一個獨立的負載均衡(LB),通常是專門的硬體裝置如 F5,或者基於軟體如 LVS,HAproxy等實現。LB上有所有服務的地址對映表,通常由運維配置註冊,當服務消費方呼叫某個目標服務時,它向LB發起請求,由LB以某種策略,比如輪詢(Round-Robin)做負載均衡後將請求轉發到目標服務。LB一般具備健康檢查能力,能自動摘除不健康的服務例項。
該方案主要問題:服務消費方、提供方之間增加了一級,有一定效能開銷,請求量大時,效率較低。
> 可能有讀者會認為集中式負載均衡存在這樣的問題,一旦負載均衡服務掛掉,那整個系統將不能使用。
> 解決方案:可以對負載均衡服務進行DNS負載均衡,通過對一個域名設定多個IP地址,每次DNS解析時輪詢返回負載均衡服務地址,從而實現簡單的DNS負載均衡。
* `客戶端負載`(Balancing-aware Client)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518155900462-1370526164.png)
針對第一個方案的不足,此方案將LB的功能整合到服務消費方程序裡,也被稱為軟負載或者客戶端負載方案。服務提供方啟動時,首先將服務地址註冊到服務登錄檔,同時定期報心跳到服務登錄檔以表明服務的存活狀態,相當於健康檢查,服務消費方要訪問某個服務時,它通過內建的LB元件向服務登錄檔查詢,同時快取並定期重新整理目標服務地址列表,然後以某種負載均衡策略選擇一個目標服務地址,最後向目標服務發起請求。LB和服務發現能力被分散到每一個服務消費者的程序內部,同時服務消費方和服務提供方之間是直接呼叫,沒有額外開銷,效能比較好。
該方案主要問題:要用多種語言、多個版本的客戶端編寫和維護負載均衡策略,使客戶端的程式碼大大複雜化。
* `獨立LB服務`(External Load Balancing Service)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518170636421-1833253282.png)
該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。
不同之處是將LB和服務發現功能從程序內移出來,變成主機上的一個獨立程序。主機上的一個或者多個服務要訪問目標服務時,他們都通過同一主機上的獨立LB程序做服務發現和負載均衡。該方案也是一種分散式方案沒有單點問題,服務呼叫方和LB之間是程序內呼叫效能好,同時該方案還簡化了服務呼叫方,不需要為不同語言開發客戶庫。
本篇將介紹第二種負載均衡方法,客戶端負載均衡。
### 實現gRPC客戶端負載均衡
gRPC已提供了簡單的負載均衡策略(如:Round Robin),我們只需實現它提供的`Builder`和`Resolver`介面,就能完成gRPC客戶端負載均衡。
```go
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
Scheme() string
}
```
`Builder`介面:建立一個`resolver`(本文稱之服務發現),用於監視名稱解析更新。
`Build`方法:為給定目標建立一個新的`resolver`,當呼叫`grpc.Dial()`時執行。
`Scheme`方法:返回此`resolver`支援的方案,`Scheme`定義可參考:https://github.com/grpc/grpc/blob/master/doc/naming.md
```go
type Resolver interface {
ResolveNow(ResolveNowOption)
Close()
}
```
`Resolver`介面:監視指定目標的更新,包括地址更新和服務配置更新。
`ResolveNow`方法:被 gRPC 呼叫,以嘗試再次解析目標名稱。只用於提示,可忽略該方法。
`Close`方法:關閉`resolver`
根據以上兩個介面,我們把服務發現的功能寫在`Build`方法中,把獲取到的負載均衡服務地址返回到客戶端,並監視服務更新情況,以修改客戶端連線。
修改服務發現程式碼,`discovery.go`
```go
package etcdv3
import (
"context"
"log"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/resolver"
)
const schema = "grpclb"
//ServiceDiscovery 服務發現
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
cc resolver.ClientConn
serverList map[string]resolver.Address //服務列表
lock sync.Mutex
}
//NewServiceDiscovery 新建發現服務
func NewServiceDiscovery(endpoints []string) resolver.Builder {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
}
}
//Build 為給定目標建立一個新的`resolver`,當呼叫`grpc.Dial()`時執行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
log.Println("Build")
s.cc = cc
s.serverList = make(map[string]resolver.Address)
prefix := "/" + target.Scheme + "/" + target.Endpoint + "/"
//根據字首獲取現有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
s.cc.NewAddress(s.getServices())
//監視字首,修改變更的server
go s.watcher(prefix)
return s, nil
}
// ResolveNow 監視目標更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
log.Println("ResolveNow")
}
//Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
return schema
}
//Close 關閉
func (s *ServiceDiscovery) Close() {
log.Println("Close")
s.cli.Close()
}
//watcher 監聽字首
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //新增或修改
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //刪除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服務地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = resolver.Address{Addr: val}
s.cc.NewAddress(s.getServices())
log.Println("put key :", key, "val:", val)
}
//DelServiceList 刪除服務地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
s.cc.NewAddress(s.getServices())
log.Println("del key:", key)
}
//GetServices 獲取服務地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
addrs := make([]resolver.Address, 0, len(s.serverList))
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
```
程式碼主要修改以下地方:
1. 把獲取的服務地址轉成`resolver.Address`,供gRPC客戶端連線。
2. 根據`schema`的定義規則,修改`key`格式。
服務註冊主要修改`key`儲存格式,`register.go`
```go
package etcdv3
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
//ServiceRegister 建立租約註冊服務
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.LeaseID //租約ID
//租約keepalieve相應chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建註冊服務
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: "/" + schema + "/" + serName + "/" + addr,
val: addr,
}
//申請租約設定時間keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
//設定租約
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//設定租約時間
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//註冊服務並繫結租約
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//設定續租 定期傳送需求請求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan 監聽 續租情況
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("續約成功", leaseKeepResp)
}
log.Println("關閉續租")
}
// Close 登出服務
func (s *ServiceRegister) Close() error {
//撤銷租約
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤銷租約")
return s.cli.Close()
}
```
客戶端修改gRPC連線服務的部分程式碼即可:
```go
func main() {
r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
resolver.Register(r)
// 連線伺服器
conn, err := grpc.Dial(r.Scheme()+"://8.8.8.8/simple_grpc", grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
if err != nil {
log.Fatalf("net.Connect err: %v", err)
}
defer conn.Close()
// 建立gRPC連線
grpcClient = pb.NewSimpleClient(conn)
```
gRPC內建了簡單的負載均衡策略`round_robin`,根據負載均衡地址,以輪詢的方式進行呼叫服務。
服務端啟動時,把服務地址註冊到`etcd`中即可:
```go
func main() {
// 監聽本地埠
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
log.Println(Address + " net.Listing...")
// 新建gRPC伺服器例項
grpcServer := grpc.NewServer()
// 在gRPC伺服器註冊我們的服務
pb.RegisterSimpleServer(grpcServer, &SimpleService{})
//把服務註冊到etcd
ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5)
if err != nil {
log.Fatalf("register service err: %v", err)
}
defer ser.Close()
//用伺服器 Serve() 方法以及我們的埠資訊區實現阻塞等待,直到程序被殺死或者 Stop() 被呼叫
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
```
### 執行效果
我們先啟動並註冊三個服務
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518201520301-2141314089.png)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518201526062-1105611810.png)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518201529806-1864982377.png)
然後客戶端進行呼叫
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518201645385-8940133.png)
看服務端接收到的請求
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518201919646-136721041.png)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518201925163-1429636105.png)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518201929077-822092499.png)
關閉`localhost:8000`服務,剩餘`localhost:8001`和`localhost:8002`服務接收請求
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518202359155-2143850614.png)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518202405272-1990664274.png)
重新開啟`localhost:8000`服務
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518202655967-135791051.png)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518202700598-298101288.png)
![](https://img2020.cnblogs.com/blog/1508611/202005/1508611-20200518202703530-602882933.png)
可以看到,gRPC客戶端負載均衡執行良好。
### 總結
本文介紹了gRPC客戶端負載均衡的實現,它簡單實現了gRPC負載均衡的功能。但在對接其他語言時候比較麻煩,需要每種語言都實現一套服務發現和負載策略,且如果要較為複雜的負載策略,需要修改客戶端程式碼才能完成。
下篇將介紹如何實現官方推薦的負載均衡策略(`External Load Balancing Service`)。
原始碼地址:https://github.com/Bingjian-Zhu/etcd-example
參考:
* https://segmentfault.com/a/1190000008672912
* https://github.com/wothing/