gRPC服務發現&負載均衡 本身是單機的非分散式,需要簡單開發
構建高可用、高效能的通訊服務,通常採用服務註冊與發現、負載均衡和容錯處理等機制實現。根據負載均衡實現所在的位置不同,通常可分為以下三種解決方案:
1、集中式LB(Proxy Model)
在服務消費者和服務提供者之間有一個獨立的LB,通常是專門的硬體裝置如 F5,或者基於軟體如 LVS,HAproxy等實現。LB上有所有服務的地址對映表,通常由運維配置註冊,當服務消費方呼叫某個目標服務時,它向LB發起請求,由LB以某種策略,比如輪詢(Round-Robin)做負載均衡後將請求轉發到目標服務。LB一般具備健康檢查能力,能自動摘除不健康的服務例項。 該方案主要問題:
-
單點問題,所有服務呼叫流量都經過LB,當服務數量和呼叫量大的時候,LB容易成為瓶頸,且一旦LB發生故障影響整個系統;
-
服務消費方、提供方之間增加了一級,有一定效能開銷。
2、程序內LB(Balancing-aware Client)
針對第一個方案的不足,此方案將LB的功能整合到服務消費方程序裡,也被稱為軟負載或者客戶端負載方案。服務提供方啟動時,首先將服務地址註冊到服務登錄檔,同時定期報心跳到服務登錄檔以表明服務的存活狀態,相當於健康檢查,服務消費方要訪問某個服務時,它通過內建的LB元件向服務登錄檔查詢,同時快取並定期重新整理目標服務地址列表,然後以某種負載均衡策略選擇一個目標服務地址,最後向目標服務發起請求。LB和服務發現能力被分散到每一個服務消費者的程序內部,同時服務消費方和服務提供方之間是直接呼叫,沒有額外開銷,效能比較好。該方案主要問題:
-
開發成本,該方案將服務呼叫方整合到客戶端的程序裡頭,如果有多種不同的語言棧,就要配合開發多種不同的客戶端,有一定的研發和維護成本;
-
另外生產環境中,後續如果要對客戶庫進行升級,勢必要求服務呼叫方修改程式碼並重新發布,升級較複雜。
3、獨立 LB 程序(External Load Balancing Service)
該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。
不同之處是將LB和服務發現功能從程序內移出來,變成主機上的一個獨立程序。主機上的一個或者多個服務要訪問目標服務時,他們都通過同一主機上的獨立LB程序做服務發現和負載均衡。該方案也是一種分散式方案沒有單點問題,一個LB程序掛了隻影響該主機上的服務呼叫方,服務呼叫方和LB之間是程序內呼叫效能好,同時該方案還簡化了服務呼叫方,不需要為不同語言開發客戶庫,LB的升級不需要服務呼叫方改程式碼。
該方案主要問題:部署較複雜,環節多,出錯除錯排查問題不方便。
gRPC服務發現及負載均衡實現
gRPC開源元件官方並未直接提供服務註冊與發現的功能實現,但其設計文件已提供實現的思路,並在不同語言的gRPC程式碼API中已提供了命名解析和負載均衡介面供擴充套件。
其基本實現原理:
-
服務啟動後gRPC客戶端向命名伺服器發出名稱解析請求,名稱將解析為一個或多個IP地址,每個IP地址標示它是伺服器地址還是負載均衡器地址,以及標示要使用那個客戶端負載均衡策略或服務配置。
-
客戶端例項化負載均衡策略,如果解析返回的地址是負載均衡器地址,則客戶端將使用grpclb策略,否則客戶端使用服務配置請求的負載均衡策略。
-
負載均衡策略為每個伺服器地址建立一個子通道(channel)。
-
當有rpc請求時,負載均衡策略決定那個子通道即grpc伺服器將接收請求,當可用伺服器為空時客戶端的請求將被阻塞。
根據gRPC官方提供的設計思路,基於程序內LB方案(即第2個案,阿里開源的服務框架 Dubbo 也是採用類似機制),結合分散式一致的元件(如Zookeeper、Consul、Etcd),可找到gRPC服務發現和負載均衡的可行解決方案。接下來以GO語言為例,簡單介紹下基於Etcd3的關鍵程式碼實現:
1)命名解析實現:resolver.go
package etcdv3
import (
"errors"
"fmt"
"strings"
etcd3 "github.com/coreos/etcd/clientv3"
"google.golang.org/grpc/naming"
)
// resolver is the implementaion of grpc.naming.Resolver
type resolver struct {
serviceName string // service name to resolve
}
// NewResolver return resolver with service name
func NewResolver(serviceName string) *resolver {
return &resolver{serviceName: serviceName}
}
// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (re *resolver) Resolve(target string) (naming.Watcher, error) {
if re.serviceName == "" {
return nil, errors.New("grpclb: no service name provided")
}
// generate etcd client
client, err := etcd3.New(etcd3.Config{
Endpoints: strings.Split(target, ","),
})
if err != nil {
return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
}
// Return watcher
return &watcher{re: re, client: *client}, nil
}
2)服務發現實現:watcher.go
package etcdv3
import (
"fmt"
etcd3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
"github.com/coreos/etcd/mvcc/mvccpb"
)
// watcher is the implementaion of grpc.naming.Watcher
type watcher struct {
re *resolver // re: Etcd Resolver
client etcd3.Client
isInitialized bool
}
// Close do nothing
func (w *watcher) Close() {
}
// Next to return the updates
func (w *watcher) Next() ([]*naming.Update, error) {
// prefix is the etcd prefix/value to watch
prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)
// check if is initialized
if !w.isInitialized {
// query addresses from etcd
resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())
w.isInitialized = true
if err == nil {
addrs := extractAddrs(resp)
//if not empty, return the updates or watcher new dir
if l := len(addrs); l != 0 {
updates := make([]*naming.Update, l)
for i := range addrs {
updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
}
return updates, nil
}
}
}
// generate etcd Watcher
rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT:
return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil
case mvccpb.DELETE:
return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil
}
}
}
return nil, nil
}
func extractAddrs(resp *etcd3.GetResponse) []string {
addrs := []string{}
if resp == nil || resp.Kvs == nil {
return addrs
}
for i := range resp.Kvs {
if v := resp.Kvs[i].Value; v != nil {
addrs = append(addrs, string(v))
}
}
return addrs
}
3)服務註冊實現:register.go
package etcdv3
import (
"fmt"
"log"
"strings"
"time"
etcd3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)
// Prefix should start and end with no slash
var Prefix = "etcd3_naming"
var client etcd3.Client
var serviceKey string
var stopSignal = make(chan bool, 1)
// Register
func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {
serviceValue := fmt.Sprintf("%s:%d", host, port)
serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)
// get endpoints for register dial address
var err error
client, err := etcd3.New(etcd3.Config{
Endpoints: strings.Split(target, ","),
})
if err != nil {
return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
}
go func() {
// invoke self-register with ticker
ticker := time.NewTicker(interval)
for {
// minimum lease TTL is ttl-second
resp, _ := client.Grant(context.TODO(), int64(ttl))
// should get first, if not exist, set it
_, err := client.Get(context.Background(), serviceKey)
if err != nil {
if err == rpctypes.ErrKeyNotFound {
if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())
}
} else {
log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())
}
} else {
// refresh set to true for not notifying the watcher
if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())
}
}
select {
case <-stopSignal:
return
case <-ticker.C:
}
}
}()
return nil
}
// UnRegister delete registered service from etcd
func UnRegister() error {
stopSignal <- true
stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock
var err error;
if _, err := client.Delete(context.Background(), serviceKey); err != nil {
log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())
} else {
log.Printf("grpclb: deregister '%s' ok.", serviceKey)
}
return err
}
4)介面描述檔案:helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.midea.jr.test.grpc";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {
}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
5)實現服務端介面:helloworldserver.go
package main
import (
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
grpclb "com.midea/jr/grpclb/naming/etcd/v3"
"com.midea/jr/grpclb/example/pb"
)
var (
serv = flag.String("service", "hello_service", "service name")
port = flag.Int("port", 50001, "listening port")
reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
if err != nil {
panic(err)
}
err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)
if err != nil {
panic(err)
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
log.Printf("receive signal '%v'", s)
grpclb.UnRegister()
os.Exit(1)
}()
log.Printf("starting hello service at %d", *port)
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
s.Serve(lis)
}
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
6)實現客戶端介面:helloworldclient.go
package main
import (
"flag"
"fmt"
"time"
grpclb "com.midea/jr/grpclb/naming/etcd/v3"
"com.midea/jr/grpclb/example/pb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"strconv"
)
var (
serv = flag.String("service", "hello_service", "service name")
reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)
func main() {
flag.Parse()
r := grpclb.NewResolver(*serv)
b := grpc.RoundRobin(r)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))
if err != nil {
panic(err)
}
ticker := time.NewTicker(1 * time.Second)
for t := range ticker.C {
client := pb.NewGreeterClient(conn)
resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
if err == nil {
fmt.Printf("%v: Reply is %s\n", t, resp.Message)
}
}
}
7)執行測試
-
執行3個服務端S1、S2、S3,1個客戶端C,觀察各服務端接收的請求數是否相等?
-
關閉1個服務端S1,觀察請求是否會轉移到另外2個服務端?
-
重新啟動S1服務端,觀察另外2個服務端請求是否會平均分配到S1?
-
關閉Etcd3伺服器,觀察客戶端與服務端通訊是否正常?
關閉通訊仍然正常,但新服務端不會註冊進來,服務端掉線了也無法摘除掉。 -
重新啟動Etcd3伺服器,服務端上下線可自動恢復正常。
-
關閉所有服務端,客戶端請求將被阻塞。