1. 程式人生 > >gRPC學習

gRPC學習

概述

gRPC 一開始由 google 開發,是一款語言中立、平臺中立、開源的遠端過程呼叫(RPC)系統。

在 gRPC 裡客戶端應用可以像呼叫本地物件一樣直接呼叫另一臺不同的機器上服務端應用的方法,使得您能夠更容易地建立分散式應用和服務。與許多 RPC 系統類似,gRPC 也是基於以下理念:定義一個服務,指定其能夠被遠端呼叫的方法(包含引數和返回型別)。在服務端實現這個介面,並執行一個 gRPC 伺服器來處理客戶端呼叫。在客戶端擁有一個存根能夠像服務端一樣的方法。

特性

基於HTTP/2
HTTP/2 提供了連線多路複用、雙向流、伺服器推送、請求優先順序、首部壓縮等機制。可以節省頻寬、降低TCP連結次數、節省CPU,幫助移動裝置延長電池壽命等。gRPC 的協議設計上使用了HTTP2 現有的語義,請求和響應的資料使用HTTP Body 傳送,其他的控制資訊則用Header 表示。
IDL使用ProtoBuf
gRPC使用ProtoBuf來定義服務,ProtoBuf是由Google開發的一種資料序列化協議(類似於XML、JSON、hessian)。ProtoBuf能夠將資料進行序列化,並廣泛應用在資料儲存、通訊協議等方面。壓縮和傳輸效率高,語法簡單,表達力強。
多語言支援(C, C++, Python, PHP, Nodejs, C#, Objective-C、Golang、Java)
gRPC支援多種語言,並能夠基於語言自動生成客戶端和服務端功能庫。目前已提供了C版本grpc、Java版本grpc-java 和 Go版本grpc-go,其它語言的版本正在積極開發中,其中,grpc支援C、C++、Node.js、Python、Ruby、Objective-C、PHP和C#等語言,grpc-java已經支援Android開發。
gRPC已經應用在Google的雲服務和對外提供的API中,其主要應用場景如下:

  • 低延遲、高擴充套件性、分散式的系統
  • 同雲伺服器進行通訊的移動應用客戶端
  • 設計語言獨立、高效、精確的新協議
  • 便於各方面擴充套件的分層設計,如認證、負載均衡、日誌記錄、監控等

HTTP2.0 特性

HTTP/2,也就是超文字傳輸協議第2版,不論是1還是2,HTTP的基本語義是不變的,比如方法語義(GET/PUST/PUT/DELETE),狀態碼(200/404/500等),Range Request,Cacheing,Authentication、URL路徑, 不同的主要是下面幾點:

多路複用 (Multiplexing)

在 HTTP/1.1 協議中 「瀏覽器客戶端在同一時間,針對同一域名下的請求有一定數量限制。超過限制數目的請求會被阻塞」。

HTTP/2 的多路複用(Multiplexing) 則允許同時通過單一的 HTTP/2 連線發起多重的請求-響應訊息。
因此 HTTP/2 可以很容易的去實現多流並行而不用依賴建立多個 TCP 連線,HTTP/2 把 HTTP 協議通訊的基本單位縮小為一個一個的幀,這些幀對應著邏輯流中的訊息。並行地在同一個 TCP 連線上雙向交換訊息。

二進位制幀

HTTP/2 傳輸的資料是二進位制的。相比 HTTP/1.1 的純文字資料,二進位制資料一個顯而易見的好處是:更小的傳輸體積。這就意味著更低的負載。二進位制的幀也更易於解析而且不易出錯,純文字幀在解析的時候還要考慮處理空格、大小寫、空行和換行等問題,而二進位制幀就不存在這個問題。

首部壓縮(Header Compression)

HTTP是無狀態協議。簡而言之,這意味著每個請求必須要攜帶伺服器需要的所有細節,而不是讓伺服器儲存住之前請求的元資料。因為http2沒有改變這個正規化,所以它也需要這樣(攜帶所有細節),因此 HTTP 請求的頭部需要包含用於標識身份的資料比如 cookies,而這些資料的量也在隨著時間增長。每一個請求的頭部都包含這些大量的重複資料,無疑是一種很大的負擔。對請求頭部進行壓縮,將會大大減輕這種負擔,尤其對移動端來說,效能提高非常明顯。

HTTP/2 使用的壓縮方式是 HPACK。 http://http2.github.io/http2-spec/compression.html

HTTP2.0在客戶端和伺服器端使用“首部表”來跟蹤和儲存之前傳送的鍵-值對,對於相同的資料,不再通過每次請求和響應傳送;通訊期間幾乎不會改變的通用鍵-值對(使用者代理、可接受的媒體型別,等等)只需傳送一次。

事實上,如果請求中不包含首部(例如對同一資源的輪詢請求),那麼首部開銷就是零位元組。此時所有首部都自動使用之前請求傳送的首部。

如果首部發生變化了,那麼只需要傳送變化了資料在Headers幀裡面,新增或修改的首部幀會被追加到“首部表”。首部表在 HTTP2.0的連線存續期內始終存在,由客戶端和伺服器共同漸進地更新。

服務端推送(Server Push)

HTTP/2 的伺服器推送所作的工作就是,伺服器在收到客戶端對某個資源的請求時,會判斷客戶端十有八九還要請求其他的什麼資源,然後一同把這些資源都發送給客戶端,即便客戶端還沒有明確表示它需要這些資源。

客戶端可以選擇把額外的資源放入快取中(所以這個特點也叫 Cache push),也可以選擇傳送一個 RST_STREAM frame 拒絕任何它不想要的資源。

主動重置連結

Length的HTTP訊息被送出之後,我們就很難中斷它了。當然,通常我們可以斷開整個TCP連結(但也不總是可以這樣),但這樣導致的代價就是需要重新通過三次握手建立一個新的TCP連線。

HTTP/2 引入了一個 RST_STREAM frame 來讓客戶端在已有的連線中傳送重置請求,從而中斷或者放棄響應。當瀏覽器進行頁面跳轉或者使用者取消下載時,它可以防止建立新連線,避免浪費所有頻寬。

與其他rpc比較

與thrift,dubbo,motan等比較

  • Motan Dubbox thrift gRPC rpcx
    開發語言 Java Java 跨語言 跨語言 go
    分散式服務治理 Y Y 可以配合zookeeper, Eureka等實現 可以配合etcd(go),zookeeper,consul等實現 自帶服務註冊中心,也支援zookerper,etcd等發現方式
    底層協議 motan協議,使用tcp長連線 Dubbo 協議、 Rmi 協議、 Hessian 協議、 HTTP 協議、 WebService 協議、Dubbo Thrift 協議、Memcached 協議 tpc/http/frame http2 tcp長連結
    訊息序列化 hessian2,json hessian2,json,resr,kyro,FST等,可擴充套件protobuf等 thrift protobuf Gob、Json、MessagePack、gencode、ProtoBuf等
    跨語言程式設計 N(支援php client和c server) N Y Y N
    負載均衡 ActiveWeight 、Random 、 RoundRobin 、LocalFirst 、 Consistent 、ConfigurableWeight Random 、RoundRobin 、ConsistentHash 、 LeastActive Haproxy, zookerper+客戶端負載均衡等方案 負載均衡軟體HaProxy等 支援隨機請求、輪詢、低併發優先、一致性 Hash等
    容錯 Failover 失效切換、Failfast 快速失敗 Failover 、 Failfast 、Failsafe 、 Failback 、 Forking、 Broadcast Failover 具有 Failover 失效切換的容錯策略 失敗重試(Failover)、快速失敗(Failfast)
    註冊中心 consul zookeeper zookeeper etcd,zookeeper,consul zookerper,etcd
    效能 ★★ ★★ ★★★★ 比grpc快2-5倍 ★★★ 比dubbox,motan快 ★★★★★ 比thrift快1-1.5倍
    側重優勢 服務管理 服務管理 跨語言,效能++ 跨語言,效能 效能++,服務治理
    客戶端非同步呼叫方案
    使用thrift IDL “oneway” 關鍵字(無返回結果),+callback
    tcp非同步請求
  • thrift IDL引數不支援函式或服務
    stream傳輸,雙向通訊
    服務端非同步處理 1、TNonblockingServer(java/c++,php); THsHaServer(java/c++); TThreadpoolServer(java/c++); TThreadSelectorServer(java/c++)
    2、結合訊息佇列或中介軟體
    3、swoole/goroutine等多工支援 同上,使用stream傳輸。Stream物件在傳輸過程中會被當做集合,用Iterator來遍歷處理
    grpc vs thrift:

使用gRPC的公司或專案:

Google
Mochi中國
阿里OTS
騰訊部分部門
Tensorflow專案中使用了grpc
CoreOS — Production API for etcd v3 is entirely gRPC. etcd v3的介面全部使用grpc
Square — replacement for all of their internal RPC. one of the very first adopters and contributors to gRPC.
ngrok — all 20+ internal services communicate via gRPC 一個內網轉發產品
Netflix
Yik Yak
VSCO
Cockroach

使用Thrift的公司或專案:

Facebook
雪球
餓了麼
今日頭條
evernote
友盟
小米
美團
Quora
Twitter
Pinterest
Foursquare
Maxeler Technologies

gRPC優缺點:

優點:

protobuf二進位制訊息,效能好/效率高(空間和時間效率都很不錯)
proto檔案生成目的碼,簡單易用
序列化反序列化直接對應程式中的資料類,不需要解析後在進行對映(XML,JSON都是這種方式)
支援向前相容(新加欄位採用預設值)和向後相容(忽略新加欄位),簡化升級
支援多種語言(可以把proto檔案看做IDL檔案)
Netty等一些框架整合

缺點:

1)GRPC尚未提供連線池,需要自行實現
2)尚未提供“服務發現”、“負載均衡”機制
3)因為基於HTTP2,絕大部多數HTTP Server、Nginx都尚不支援,即Nginx不能將GRPC請求作為HTTP請求來負載均衡,而是作為普通的TCP請求。(nginx1.9版本已支援)
4) Protobuf二進位制可讀性差(貌似提供了Text_Fromat功能)
預設不具備動態特性(可以通過動態定義生成訊息型別或者動態編譯支援)

grpc坑:

來自https://news.ycombinator.com/item?id=12345223的網友:
http2只允許單個連結傳輸10億流資料。原因在於:
htt2使用31位整形標示流,服務端使用奇數,客戶端使用偶數,所以總共10億可用。

HTTP/2.0 uses an unsigned 31-bit integer to identity individual streams over a connection.
Server-initiated streams must use even identifiers.
Client-initiated streams must use odd identifiers.
解決思路:超過一定數量的流,需要重啟連結。

gRPC通訊方式

gRPC有四種通訊方式:
1、 Simple RPC
簡單rpc
這就是一般的rpc呼叫,一個請求物件對應一個返回物件
proto語法:

rpc simpleHello(Person) returns (Result) {}
1
2、 Server-side streaming RPC
服務端流式rpc
一個請求物件,服務端可以傳回多個結果物件
proto語法

rpc serverStreamHello(Person) returns (stream Result) {}
1
3、 Client-side streaming RPC
客戶端流式rpc
客戶端傳入多個請求物件,服務端返回一個響應結果
proto語法

rpc clientStreamHello(stream Person) returns (Result) {}
1
4、 Bidirectional streaming RPC
雙向流式rpc
結合客戶端流式rpc和服務端流式rpc,可以傳入多個物件,返回多個響應物件
proto語法

rpc biStreamHello(stream Person) returns (stream Result) {}
1
服務定義及ProtoBuf

gRPC使用ProtoBuf定義服務, 我們可以一次性的在一個 .proto 檔案中定義服務並使用任何支援它的語言去實現客戶端和伺服器,反過來,它們可以在各種環境中,從雲伺服器到你自己的平板電腦—— gRPC 幫你解決了不同語言及環境間通訊的複雜性。使用 protocol buffers 還能獲得其他好處,包括高效的序列號,簡單的 IDL 以及容易進行介面更新。

protoc編譯工具

protoc工具可在https://github.com/google/protobuf/releases 下載到原始碼。
linux下安裝

protobuf語法

1、syntax = “proto3”;
檔案的第一行指定了你使用的是proto3的語法:如果你不指定,protocol buffer 編譯器就會認為你使用的是proto2的語法。這個語句必須出現在.proto檔案的非空非註釋的第一行。
2、message SearchRequest {……}
message 定義實體,c/c++/go中的結構體,php中類
3、基本資料型別

4、註釋符號: 雙斜線,如://xxxxxxxxxxxxxxxxxxx
5、欄位唯一數字標識(用於在二進位制格式中識別各個欄位,上線後不宜再變動):Tags
1到15使用一個位元組來編碼,包括標識數字和欄位型別(你可以在Protocol Buffer 編碼中檢視更多詳細);16到2047佔用兩個位元組。因此定義proto檔案時應該保留1到15,用作出現最頻繁的訊息型別的標識。記得為將來會繼續增加並可能頻繁出現的元素留一點兒標識區間,也就是說,不要一下子把1—15全部用完,為將來留一點兒。
標識數字的合法範圍:最小是1,最大是 229 - 1,或者536,870,911。
另外,不能使用19000 到 19999之間的數字(FieldDescriptor::kFirstReservedNumber through FieldDescriptor::kLastReservedNumber),因為它們被Protocol Buffers保留使用
6、欄位修飾符:
required:值不可為空
optional:可選欄位
singular:符合語法規則的訊息包含零個或者一個這樣的欄位(最多一個)
repeated:一個欄位在合法的訊息中可以重複出現一定次數(包括零次)。重複出現的值的次序將被保留。在proto3中,重複出現的值型別欄位預設採用壓縮編碼。你可以在這裡找到更多關於壓縮編碼的東西: Protocol Buffer Encoding。
預設值: optional PhoneType type = 2 [default = HOME];
proto3中,省略required,optional,singular,由protoc自動選擇。
7、代理類生成
1)、C++, 每一個.proto 檔案可以生成一個 .h 檔案和一個 .cc 檔案
2)、Java, 每一個.proto檔案可以生成一個 .java 檔案
3)、Python, 每一個.proto檔案生成一個模組,其中為每一個訊息型別生成一個靜態的描述器,在執行時,和一個metaclass一起使用來建立必要的Python資料訪問類
4)、Go, 每一個.proto生成一個 .pb.go 檔案
5)、Ruby, 每一個.proto生成一個 .rb 檔案
6)、Objective-C, 每一個.proto 檔案可以生成一個 pbobjc.h 和一個pbobjc.m 檔案
7)、C#, 每一個.proto檔案可以生成一個.cs檔案.
8)、php, 每一個message訊息體生成一個.php類檔案,並在GPBMetadata目錄生成一個對應包名的.php類檔案,用於儲存.proto的二進位制元資料。
8、欄位預設值

  • strings, 預設值是空字串(empty string)
  • bytes, 預設值是空bytes(empty bytes)
  • bools, 預設值是false
  • numeric, 預設值是0
  • enums, 預設值是第一個列舉值(value必須為0)
  • message fields, the field is not set. Its exact value is langauge-dependent. See the generated code guide for details.
  • repeated fields,預設值為empty,通常是一個空list
    9、列舉

// 列舉型別,必須從0開始,序號可跨越。同一包下不能重名,所以加字首來區別
enum WshExportInstStatus {
INST_INITED = 0;
INST_RUNNING = 1;
INST_FINISH = 2;
INST_FAILED = 3;
}
10、Maps欄位型別

map<key_type, value_type> map_field = N;
1
其中key_type可以是任意Integer或者string型別(所以,除了floating和bytes的任意標量型別都是可以的)value_type可以是任意型別。
例如,如果你希望建立一個project的對映,每個Projecct使用一個string作為key,你可以像下面這樣定義:

map<string, Project> projects = 3;
1
Map的欄位可以是repeated。
序列化後的順序和map迭代器的順序是不確定的,所以你不要期望以固定順序處理Map
當為.proto檔案產生生成文字格式的時候,map會按照key 的順序排序,數值化的key會按照數值排序。
從序列化中解析或者融合時,如果有重複的key則後一個key不會被使用,當從文字格式中解析map時,如果存在重複的key。
11、預設值
字串型別預設為空字串
位元組型別預設為空位元組
布林型別預設false
數值型別預設為0值
enums型別預設為第一個定義的列舉值,必須是0

12、服務
服務使用service{}包起來,每個方法使用rpc起一行申明,一個方法包含一個請求訊息體和一個返回訊息體

service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}

更多protobuf參考(google)
更多protobuf參考(csdn)

golang中使用gRPC

前期準備

go get -u "google.golang.org/grpc"
go get -u "google.golang.org/grpc/reflection"
1
2
例如我定義的檔案exporttask.proto:

// 微生活匯出服務
// 匯出功能介面
/
1、 建立任務模板
2、 查詢、列出任務模板
3、 刪除任務模板
4、 新增匯出任務例項
5、 查詢任務狀態
/
syntax = "proto3";

// java 語法特別選項
option java_multiple_files = true;
option java_package = "io.grpc.welife.WshExportTask";
option java_outer_classname = "WshExportTask";
// 包名 golang包名,php中namespase,
package exporttask;

// 匯出任務服務定義
service ExportTask {
// 建立任務模板
rpc CreateTpl(WshExportTaskCreateTplReq) returns (WshExportTaskCreateTplRes) {}
// 查詢任務模板
rpc ListTpl(WshExportTaskListTplReq) returns (WshExportTaskListTplRes) {}
}

// 列舉型別,必須從0開始,序號可跨越。同一包下不能重名,所以加字首來區別
enum WshExportTplStatus {
TPL_INITED = 0;
TPL_NORMAL = 1;
TPL_DELETED = 9;
}

enum WshExportFormat {
FMT_DEFAULT = 0;
FMT_CSV = 1;
FMT_XLS = 2;
}

message WshExportTpl {
string etplName = 1;
string etplTag = 2;
WshExportFormat etplOutputFormat = 3;
string etplOutputColumns = 4;
string etplExpr = 5;
int32 etplId = 6;
int32 etplExecTimes = 7;
int32 etplExecOkTimes = 8;
int32 etplStatus = 9;
string etplCreated = 10;
string etplUpdated = 11;
string etplDeleted = 12;
int32 operatorId = 13;
}

message WshExportTaskCreateTplReq {
string etplName = 1;
string etplTag = 2;
string etplExpr = 3;
string etplOutputColumns = 4;
WshExportFormat etplOutputFormat = 5;
int32 operatorId = 6;
}

message WshExportTaskCreateTplRes {
string errCode = 1;
string errMsg = 2;
WshExportTpl data = 3;
}

message WshExportTaskListTplReq {
int32 etplId = 1;
string etplName = 2;
string etplTag = 3;
}

// repeated 表示陣列
message WshExportTaskListTplRes {
string errCode = 1;
string errMsg = 2;
repeated WshExportTpl data = 3;
}

使用protoc命令生成golang對應的rpc程式碼:

格式 protoc --go_out=plugins=grpc:{go程式碼輸出路徑} {proto檔案}

protoc --go_out=plugins=grpc:./ ./exporttask.proto
1
2
生成對應當exporttask.pb.go

// Code generated by protoc-gen-go. DO NOT EDIT.
// source: exporttask.proto

/*
Package exporttask is a generated protocol buffer package.

包名 golang包名,php中namespase,

It is generated from these files:
exporttask.proto

It has these top-level messages:
WshExportTpl
WshExportTaskCreateTplReq
WshExportTaskCreateTplRes
WshExportTaskListTplReq
WshExportTaskListTplRes
*/
package exporttask

import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"

import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package

// 列舉型別,必須從0開始,序號可跨越。同一包下不能重名,所以加字首來區別
type WshExportTplStatus int32

const (
WshExportTplStatus_TPL_INITED WshExportTplStatus = 0
WshExportTplStatus_TPL_NORMAL WshExportTplStatus = 1
WshExportTplStatus_TPL_DELETED WshExportTplStatus = 9
)

var WshExportTplStatus_name = map[int32]string{
0: "TPL_INITED",
1: "TPL_NORMAL",
9: "TPL_DELETED",
}
var WshExportTplStatus_value = map[string]int32{
"TPL_INITED": 0,
"TPL_NORMAL": 1,
"TPL_DELETED": 9,
}

func (x WshExportTplStatus) String() string {
return proto.EnumName(WshExportTplStatus_name, int32(x))
}
func (WshExportTplStatus) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }

type WshExportFormat int32

const (
WshExportFormat_FMT_DEFAULT WshExportFormat = 0
WshExportFormat_FMT_CSV WshExportFormat = 1
WshExportFormat_FMT_XLS WshExportFormat = 2
)

var WshExportFormat_name = map[int32]string{
0: "FMT_DEFAULT",
1: "FMT_CSV",
2: "FMT_XLS",
}
var WshExportFormat_value = map[string]int32{
"FMT_DEFAULT": 0,
"FMT_CSV": 1,
"FMT_XLS": 2,
}

func (x WshExportFormat) String() string {
return proto.EnumName(WshExportFormat_name, int32(x))
}
func (WshExportFormat) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }

type WshExportTpl struct {
EtplName string protobuf:"bytes,1,opt,name=etplName" json:"etplName,omitempty"
EtplTag string protobuf:"bytes,2,opt,name=etplTag" json:"etplTag,omitempty"
EtplOutputFormat WshExportFormat protobuf:"varint,3,opt,name=etplOutputFormat,enum=exporttask.WshExportFormat" json:"etplOutputFormat,omitempty"
EtplOutputColumns string protobuf:"bytes,4,opt,name=etplOutputColumns" json:"etplOutputColumns,omitempty"
EtplExpr string protobuf:"bytes,5,opt,name=etplExpr" json:"etplExpr,omitempty"
EtplId int32 protobuf:"varint,6,opt,name=etplId" json:"etplId,omitempty"
EtplExecTimes int32 protobuf:"varint,7,opt,name=etplExecTimes" json:"etplExecTimes,omitempty"
EtplExecOkTimes int32 protobuf:"varint,8,opt,name=etplExecOkTimes" json:"etplExecOkTimes,omitempty"
EtplStatus int32 protobuf:"varint,9,opt,name=etplStatus" json:"etplStatus,omitempty"
EtplCreated string protobuf:"bytes,10,opt,name=etplCreated" json:"etplCreated,omitempty"
EtplUpdated string protobuf:"bytes,11,opt,name=etplUpdated" json:"etplUpdated,omitempty"
EtplDeleted string protobuf:"bytes,12,opt,name=etplDeleted" json:"etplDeleted,omitempty"
OperatorId int32 protobuf:"varint,13,opt,name=operatorId" json:"operatorId,omitempty"
}

func (m WshExportTpl) Reset() { m = WshExportTpl{} }
func (m WshExportTpl) String() string { return proto.CompactTextString(m) }
func (
WshExportTpl) ProtoMessage() {}
func (*WshExportTpl) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }

func (m *WshExportTpl) GetEtplName() string {
if m != nil {
return m.EtplName
}
return ""
}

func (m *WshExportTpl) GetEtplTag() string {
if m != nil {
return m.EtplTag
}
return ""
}

func (m *WshExportTpl) GetEtplOutputFormat() WshExportFormat {
if m != nil {
return m.EtplOutputFormat
}
return WshExportFormat_FMT_DEFAULT
}

func (m *WshExportTpl) GetEtplOutputColumns() string {
if m != nil {
return m.EtplOutputColumns
}
return ""
}

func (m *WshExportTpl) GetEtplExpr() string {
if m != nil {
return m.EtplExpr
}
return ""
}

func (m *WshExportTpl) GetEtplId() int32 {
if m != nil {
return m.EtplId
}
return 0
}

func (m *WshExportTpl) GetEtplExecTimes() int32 {
if m != nil {
return m.EtplExecTimes
}
return 0
}

func (m *WshExportTpl) GetEtplExecOkTimes() int32 {
if m != nil {
return m.EtplExecOkTimes
}
return 0
}

func (m *WshExportTpl) GetEtplStatus() int32 {
if m != nil {
return m.EtplStatus
}
return 0
}

func (m *WshExportTpl) GetEtplCreated() string {
if m != nil {
return m.EtplCreated
}
return ""
}

func (m *WshExportTpl) GetEtplUpdated() string {
if m != nil {
return m.EtplUpdated
}
return ""
}

func (m *WshExportTpl) GetEtplDeleted() string {
if m != nil {
return m.EtplDeleted
}
return ""
}

func (m *WshExportTpl) GetOperatorId() int32 {
if m != nil {
return m.OperatorId
}
return 0
}

type WshExportTaskCreateTplReq struct {
EtplName string protobuf:"bytes,1,opt,name=etplName" json:"etplName,omitempty"
EtplTag string protobuf:"bytes,2,opt,name=etplTag" json:"etplTag,omitempty"
EtplExpr string protobuf:"bytes,3,opt,name=etplExpr" json:"etplExpr,omitempty"
EtplOutputColumns string protobuf:"bytes,4,opt,name=etplOutputColumns" json:"etplOutputColumns,omitempty"
EtplOutputFormat WshExportFormat protobuf:"varint,5,opt,name=etplOutputFormat,enum=exporttask.WshExportFormat" json:"etplOutputFormat,omitempty"
OperatorId int32 protobuf:"varint,6,opt,name=operatorId" json:"operatorId,omitempty"
}

func (m WshExportTaskCreateTplReq) Reset() { m = WshExportTaskCreateTplReq{} }
func (m WshExportTaskCreateTplReq) String() string { return proto.CompactTextString(m) }
func (
WshExportTaskCreateTplReq) ProtoMessage() {}
func (*WshExportTaskCreateTplReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }

func (m *WshExportTaskCreateTplReq) GetEtplName() string {
if m != nil {
return m.EtplName
}
return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplTag() string {
if m != nil {
return m.EtplTag
}
return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplExpr() string {
if m != nil {
return m.EtplExpr
}
return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplOutputColumns() string {
if m != nil {
return m.EtplOutputColumns
}
return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplOutputFormat() WshExportFormat {
if m != nil {
return m.EtplOutputFormat
}
return WshExportFormat_FMT_DEFAULT
}

func (m *WshExportTaskCreateTplReq) GetOperatorId() int32 {
if m != nil {
return m.OperatorId
}
return 0
}

type WshExportTaskCreateTplRes struct {
ErrCode string protobuf:"bytes,1,opt,name=errCode" json:"errCode,omitempty"
ErrMsg string protobuf:"bytes,2,opt,name=errMsg" json:"errMsg,omitempty"
Data *WshExportTpl protobuf:"bytes,3,opt,name=data" json:"data,omitempty"
}

func (m WshExportTaskCreateTplRes) Reset() { m = WshExportTaskCreateTplRes{} }
func (m WshExportTaskCreateTplRes) String() string { return proto.CompactTextString(m) }
func (
WshExportTaskCreateTplRes) ProtoMessage() {}
func (*WshExportTaskCreateTplRes) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }

func (m *WshExportTaskCreateTplRes) GetErrCode() string {
if m != nil {
return m.ErrCode
}
return ""
}

func (m *WshExportTaskCreateTplRes) GetErrMsg() string {
if m != nil {
return m.ErrMsg
}
return ""
}

func (m WshExportTaskCreateTplRes) GetData() WshExportTpl {
if m != nil {
return m.Data
}
return nil
}

type WshExportTaskListTplReq struct {
EtplId int32 protobuf:"varint,1,opt,name=etplId" json:"etplId,omitempty"
EtplName string protobuf:"bytes,2,opt,name=etplName" json:"etplName,omitempty"
EtplTag string protobuf:"bytes,3,opt,name=etplTag" json:"etplTag,omitempty"
}

func (m WshExportTaskListTplReq) Reset() { m = WshExportTaskListTplReq{} }
func (m WshExportTaskListTplReq) String() string { return proto.CompactTextString(m) }
func (
WshExportTaskListTplReq) ProtoMessage() {}
func (*WshExportTaskListTplReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }

func (m *WshExportTaskListTplReq) GetEtplId() int32 {
if m != nil {
return m.EtplId
}
return 0
}

func (m *WshExportTaskListTplReq) GetEtplName() string {
if m != nil {
return m.EtplName
}
return ""
}

func (m *WshExportTaskListTplReq) GetEtplTag() string {
if m != nil {
return m.EtplTag
}
return ""
}

// repeated 表示陣列
type WshExportTaskListTplRes struct {
ErrCode string protobuf:"bytes,1,opt,name=errCode" json:"errCode,omitempty"
ErrMsg string protobuf:"bytes,2,opt,name=errMsg" json:"errMsg,omitempty"
Data []*WshExportTpl protobuf:"bytes,3,rep,name=data" json:"data,omitempty"
}

func (m WshExportTaskListTplRes) Reset() { m = WshExportTaskListTplRes{} }
func (m WshExportTaskListTplRes) String() string { return proto.CompactTextString(m) }
func (
WshExportTaskListTplRes) ProtoMessage() {}
func (*WshExportTaskListTplRes) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }

func (m *WshExportTaskListTplRes) GetErrCode() string {
if m != nil {
return m.ErrCode
}
return ""
}

func (m *WshExportTaskListTplRes) GetErrMsg() string {
if m != nil {
return m.ErrMsg
}
return ""
}

func (m WshExportTaskListTplRes) GetData() []WshExportTpl {
if m != nil {
return m.Data
}
return nil
}

func init() {
proto.RegisterType((WshExportTpl)(nil), "exporttask.WshExportTpl")
proto.RegisterType((
WshExportTaskCreateTplReq)(nil), "exporttask.WshExportTaskCreateTplReq")
proto.RegisterType((WshExportTaskCreateTplRes)(nil), "exporttask.WshExportTaskCreateTplRes")
proto.RegisterType((
WshExportTaskListTplReq)(nil), "exporttask.WshExportTaskListTplReq")
proto.RegisterType((*WshExportTaskListTplRes)(nil), "exporttask.WshExportTaskListTplRes")
proto.RegisterEnum("exporttask.WshExportTplStatus", WshExportTplStatus_name, WshExportTplStatus_value)
proto.RegisterEnum("exporttask.WshExportFormat", WshExportFormat_name, WshExportFormat_value)
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4

// Client API for ExportTask service

type ExportTaskClient interface {
// 建立任務模板
CreateTpl(ctx context.Context, in WshExportTaskCreateTplReq, opts ...grpc.CallOption) (WshExportTaskCreateTplRes, error)
// 查詢任務模板
ListTpl(ctx context.Context, in WshExportTaskListTplReq, opts ...grpc.CallOption) (WshExportTaskListTplRes, error)
}

type exportTaskClient struct {
cc *grpc.ClientConn
}

func NewExportTaskClient(cc *grpc.ClientConn) ExportTaskClient {
return &exportTaskClient{cc}
}

func (c exportTaskClient) CreateTpl(ctx context.Context, in WshExportTaskCreateTplReq, opts ...grpc.CallOption) (*WshExportTaskCreateTplRes, error) {
out := new(WshExportTaskCreateTplRes)
err := grpc.Invoke(ctx, "/exporttask.ExportTask/CreateTpl", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}

func (c exportTaskClient) ListTpl(ctx context.Context, in WshExportTaskListTplReq, opts ...grpc.CallOption) (*WshExportTaskListTplRes, error) {
out := new(WshExportTaskListTplRes)
err := grpc.Invoke(ctx, "/exporttask.ExportTask/ListTpl", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// Server API for ExportTask service

type ExportTaskServer interface {
// 建立任務模板
CreateTpl(context.Context, WshExportTaskCreateTplReq) (WshExportTaskCreateTplRes, error)
// 查詢任務模板
ListTpl(context.Context, WshExportTaskListTplReq) (WshExportTaskListTplRes, error)
}

func RegisterExportTaskServer(s *grpc.Server, srv ExportTaskServer) {
s.RegisterService(&_ExportTask_serviceDesc, srv)
}

func _ExportTask_CreateTpl_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WshExportTaskCreateTplReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ExportTaskServer).CreateTpl(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/exporttask.ExportTask/CreateTpl",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ExportTaskServer).CreateTpl(ctx, req.(*WshExportTaskCreateTplReq))
}
return interceptor(ctx, in, info, handler)
}

func _ExportTask_ListTpl_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WshExportTaskListTplReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ExportTaskServer).ListTpl(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/exporttask.ExportTask/ListTpl",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ExportTaskServer).ListTpl(ctx, req.(*WshExportTaskListTplReq))
}
return interceptor(ctx, in, info, handler)
}

var _ExportTask_serviceDesc = grpc.ServiceDesc{
ServiceName: "exporttask.ExportTask",
HandlerType: (*ExportTaskServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "CreateTpl",
Handler: _ExportTask_CreateTpl_Handler,
},
{
MethodName: "ListTpl",
Handler: _ExportTask_ListTpl_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "exporttask.proto",
}

func init() { proto.RegisterFile("exporttask.proto", fileDescriptor0) }

var fileDescriptor0 = []byte{
// 550 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x54, 0x5f, 0x8f, 0xd2, 0x4e,
0x14, 0xa5, 0xb0, 0xc0, 0x72, 0x59, 0x96, 0xfe, 0xe6, 0xe1, 0xe7, 0x88, 0xc6, 0x90, 0xaa, 0x09,
0xd9, 0x6c, 0x30, 0xc1, 0x57, 0x5f, 0x76, 0xa1, 0x18, 0x92, 0x02, 0x1b, 0x28, 0x6a, 0xe2, 0xc3,
0xa6, 0x6e, 0x47, 0x24, 0x14, 0x5b, 0x67, 0x86, 0xb8, 0xc6, 0x57, 0x3f, 0x98, 0x5f, 0xc6, 0xef,
0x61, 0x66, 0xda, 0x61, 0x5a, 0xfe, 0xc4, 0x75, 0x7d, 0x9b, 0x73, 0xe6, 0x94, 0x7b, 0xef, 0x39,
0x77, 0x00, 0x93, 0xdc, 0x46, 0x21, 0xe5, 0xdc, 0x63, 0xcb, 0x76, 0x44, 0x43, 0x1e, 0x22, 0xd0,
0x8c, 0xf5, 0xab, 0x00, 0x27, 0x6f, 0xd9, 0x27, 0x5b, 0x32, 0x6e, 0x14, 0xa0, 0x06, 0x1c, 0x13,
0x1e, 0x05, 0x23, 0x6f, 0x45, 0xb0, 0xd1, 0x34, 0x5a, 0x95, 0xc9, 0x06, 0x23, 0x0c, 0x65, 0x71,
0x76, 0xbd, 0x39, 0xce, 0xcb, 0x2b, 0x05, 0xd1, 0x6b, 0x30, 0xc5, 0x71, 0xbc, 0xe6, 0xd1, 0x9a,
0xf7, 0x43, 0xba, 0xf2, 0x38, 0x2e, 0x34, 0x8d, 0xd6, 0x69, 0xe7, 0x51, 0x3b, 0x55, 0x7f, 0x53,
0x29, 0x96, 0x4c, 0x76, 0x3e, 0x42, 0xe7, 0xf0, 0x9f, 0xe6, 0xba, 0x61, 0xb0, 0x5e, 0x7d, 0x66,
0xf8, 0x48, 0x16, 0xdb, 0xbd, 0x50, 0xcd, 0xda, 0xb7, 0x11, 0xc5, 0x45, 0xdd, 0xac, 0xc0, 0xe8,
0x7f, 0x28, 0x89, 0xf3, 0xc0, 0xc7, 0xa5, 0xa6, 0xd1, 0x2a, 0x4e, 0x12, 0x84, 0x9e, 0x41, 0x2d,
0xd6, 0x90, 0x1b, 0x77, 0xb1, 0x22, 0x0c, 0x97, 0xe5, 0x75, 0x96, 0x44, 0x2d, 0xa8, 0x2b, 0x62,
0xbc, 0x8c, 0x75, 0xc7, 0x52, 0xb7, 0x4d, 0xa3, 0x27, 0x00, 0x82, 0x9a, 0x72, 0x8f, 0xaf, 0x19,
0xae, 0x48, 0x51, 0x8a, 0x41, 0x4d, 0xa8, 0x0a, 0xd4, 0xa5, 0xc4, 0xe3, 0xc4, 0xc7, 0x20, 0xdb,
0x4c, 0x53, 0x4a, 0x31, 0x8b, 0x7c, 0xa9, 0xa8, 0x6a, 0x45, 0x42, 0x29, 0x45, 0x8f, 0x04, 0x44,
0x28, 0x4e, 0xb4, 0x22, 0xa1, 0x44, 0x17, 0x61, 0x44, 0xa8, 0xc7, 0x43, 0x3a, 0xf0, 0x71, 0x2d,
0xee, 0x42, 0x33, 0xd6, 0x8f, 0x3c, 0x3c, 0xd4, 0x39, 0x7b, 0x6c, 0x19, 0x17, 0x77, 0xa3, 0x60,
0x42, 0xbe, 0xdc, 0x33, 0xf4, 0xb4, 0xfb, 0x85, 0x2d, 0xf7, 0xff, 0x2e, 0xc7, 0x7d, 0xeb, 0x53,
0xbc, 0xcf, 0xfa, 0x64, 0x6d, 0x28, 0xed, 0xd8, 0xf0, 0xfd, 0xb0, 0x0b, 0x4c, 0x4e, 0x4a, 0x69,
0x37, 0xf4, 0x95, 0x09, 0x0a, 0xca, 0x5d, 0xa2, 0x74, 0xc8, 0x94, 0x05, 0x09, 0x42, 0xe7, 0x70,
0xe4, 0x7b, 0xdc, 0x93, 0xd3, 0x57, 0x3b, 0x78, 0x6f, 0xaf, 0xe2, 0xc7, 0xa5, 0xca, 0x9a, 0xc3,
0x83, 0x4c, 0x71, 0x67, 0xc1, 0x78, 0x12, 0x80, 0x5e, 0x56, 0x23, 0xb3, 0xac, 0xe9, 0x60, 0xf2,
0x87, 0x83, 0x29, 0x64, 0x82, 0xb1, 0xbe, 0x1d, 0x2a, 0xf4, 0x6f, 0x33, 0x16, 0xfe, 0x3c, 0xe3,
0x99, 0x0d, 0x28, 0xcd, 0x26, 0x6f, 0xe0, 0x14, 0xc0, 0xbd, 0x72, 0xae, 0x07, 0xa3, 0x81, 0x6b,
0xf7, 0xcc, 0x9c, 0xc2, 0xa3, 0xf1, 0x64, 0x78, 0xe1, 0x98, 0x06, 0xaa, 0x43, 0x55, 0xe0, 0x9e,
0xed, 0xd8, 0x42, 0x50, 0x39, 0x7b, 0x05, 0xf5, 0xad, 0xb0, 0x85, 0xa6, 0x3f, 0x74, 0xaf, 0x7b,
0x76, 0xff, 0x62, 0xe6, 0xb8, 0x66, 0x0e, 0x55, 0xa1, 0x2c, 0x88, 0xee, 0xf4, 0x8d, 0x69, 0x28,
0xf0, 0xce, 0x99, 0x9a, 0xf9, 0xce, 0x4f, 0x03, 0x40, 0x4f, 0x8f, 0xde, 0x43, 0x65, 0x93, 0x33,
0x7a, 0xbe, 0x7f, 0x80, 0xad, 0x17, 0xd1, 0xb8, 0x93, 0x8c, 0x59, 0x39, 0x34, 0x83, 0x72, 0x62,
0x2f, 0x7a, 0x7a, 0xf0, 0x1b, 0x9d, 0x74, 0xe3, 0x0e, 0x22, 0x66, 0xe5, 0x2e, 0x5f, 0xc0, 0xe3,
0x45, 0xd8, 0x9e, 0xd3, 0xe8, 0xa6, 0xfd, 0x95, 0x04, 0x8b, 0x8f, 0x24, 0xab, 0xbd, 0xac, 0x65,
0xe0, 0x95, 0xf1, 0xa1, 0x24, 0xff, 0xdb, 0x5f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x60, 0x9b,
0xec, 0x89, 0xef, 0x05, 0x00, 0x00,
}

服務端實現proto中的介面:

package main

import (
"log"
//"net"

svcExport "../../common/exporttask" // 包含上面的pb生成的go包
"./model"
_ "github.com/go-sql-driver/mysql"
"golang.org/x/net/context"
//"google.golang.org/grpc"
//"google.golang.org/grpc/reflection"

)

// server 這個物件來實現 exporttask 包中的pb定義的rpc服務
// 實現的方式是將服務轉化成本地的資料庫操作
type server struct{}

func (s server) CreateTpl(ctx context.Context, in svcExport.WshExportTaskCreateTplReq) (res *svcExport.WshExportTaskCreateTplRes, err error) {
res = new(svcExport.WshExportTaskCreateTplRes)
res.Data = new(svcExport.WshExportTpl)

var etplId int32 = 0

etplId, err = model.CreateTpl(in.EtplName, in.EtplTag, in.EtplExpr, in.EtplOutputColumns, int32(in.EtplOutputFormat), in.OperatorId)
//res.Data, err = model.GetTpl(etplId)
res.Data.EtplId = etplId
return res, err

}

func (s server) ListTpl(ctx context.Context, in svcExport.WshExportTaskListTplReq) (*svcExport.WshExportTaskListTplRes, error) {
res := new(svcExport.WshExportTaskListTplRes)
entList, err := model.ListTpl(in.EtplId, in.EtplName, in.EtplTag)

if err != nil {
    res.ErrMsg = err.Error()
    res.ErrCode = "2"
}

for _, ent := range entList {
    t := new(svcExport.WshExportTpl)
    ent.CopyToPb(t)
    res.Data = append(res.Data, t)
}

return res, err

}

服務端main啟動服務main.go

/**

  • exporttask server main
  • $ go build exporttask.go
    */
    package main

import (
"log"
"net"

svcExport "../../common/exporttask"
//"./model"
_ "github.com/go-sql-driver/mysql"
//"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

)

const (
port = ":50051"
)

func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 生成一個rpc伺服器
s := grpc.NewServer()
// 使用pb包呼叫註冊已實現的rpc介面類server
svcExport.RegisterExportTaskServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

Golang gRPC客戶端

客戶端的程式碼相對比較簡單

package main

import (
"flag"
"log"
"os"

svcExport "../../common/exporttask"
"golang.org/x/net/context"
"google.golang.org/grpc"

)

const (
address = "127.0.0.1:50052"
defaultName = "world"
)

func main() {

// 發起連結
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
    log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

// 建立pb包的客戶端
c := svcExport.NewExportTaskClient(conn)

name := defaultName
if len(os.Args) > 1 {
    name = os.Args[1]
}

// 發起請求
var r2 *svcExport.WshExportTaskCreateTplRes
req := svcExport.WshExportTaskCreateTplReq{EtplName: name, EtplTag: "mall", EtplExpr: "select EtplName from welife_export_tpl", EtplOutputFormat: svcExport.WshExportFormat_FMT_CSV, EtplOutputColumns: ""}
r2, err = c.CreateTpl(context.Background(), &req)
// 列印結果
log.Println("create tpl: r=", r2, err)

}

php中使用gRPC

php需要安裝grpc擴充套件。
使用protoc命令生成對應的php程式碼:

protoc --php_out=plugins=grpc:./ exporttask.proto
1
2
生成程式碼包括:

Exporttask/
|-WshExportFormat.php
|-WshExportTaskCreateTplReq.php
|-WshExportTaskCreateTplRes.php
|-WshExportTaskListTplReq.php
|-WshExportTaskListTplRes.php
|-WshExportTpl.php
-WshExportTplStatus.php
GPBMetadata/
-Exporttask.php

每一個message對應生成一個類,在Exporttask名稱空間下。
這裡就不都貼出來了,只貼一個WshExportTpl.php:

<?php

Generated by the protocol buffer compiler. DO NOT EDIT!

source: exporttask.proto

namespace Exporttask;

use Google\Protobuf\Internal\GPBType;
use Google\Protobuf\Internal\RepeatedField;
use Google\Protobuf\Internal\GPBUtil;

/**

  • Protobuf type exporttask.WshExportTpl
    */
    class WshExportTpl extends \Google\Protobuf\Internal\Message
    {
    /**
    • string etplName = 1;
      */
      private $etplName = '';
      /**
    • string etplTag = 2;
      */
      private $etplTag = '';
      /**
    • .exporttask.WshExportFormat etplOutputFormat = 3;
      */
      private $etplOutputFormat = 0;
      /**
    • string etplOutputColumns = 4;
      */
      private $etplOutputColumns = '';
      /**
    • string etplExpr = 5;
      */
      private $etplExpr = '';
      /**
    • int32 etplId = 6;
      */
      private $etplId = 0;
      /**
    • int32 etplExecTimes = 7;
      */
      private $etplExecTimes = 0;
      /**
    • int32 etplExecOkTimes = 8;
      */
      private $etplExecOkTimes = 0;
      /**
    • int32 etplStatus = 9;
      */
      private $etplStatus = 0;
      /**
    • string etplCreated = 10;
      */
      private $etplCreated = '';
      /**
    • string etplUpdated = 11;
      */
      private $etplUpdated = '';
      /**
    • string etplDeleted = 12;
      */
      private $etplDeleted = '';
      /**
    • int32 operatorId = 13;
      */
      private $operatorId = 0;

    public function __construct() {
    \GPBMetadata\Exporttask::initOnce();
    parent::__construct();
    }

    /**
    • string etplName = 1;
      */
      public function getEtplName()
      {
      return $this->etplName;
      }
    /**
    • string etplName = 1;
      */
      public function setEtplName($var)
      {
      GPBUtil::checkString($var, True);
      $this->etplName = $var;
      }
      // ... 其他省略

<?php
$client = new \Exporttask\GreeterClient('127.0.0.1:50051', [
'credentials' => \Grpc\ChannelCredentials::createInsecure(),
]);
$request = new Exporttask\WshExportTaskCreateTplReq();
$request->setEtplName($name);
list($reply, $status) = $client->createTpl($request)->wait();
$message = $reply->getMessage();
var_dump($message);

gRPC服務發現與服務治理的方案

目前gRPC主流分散式方案有這麼幾種: etcd, zookeeper, consul.

1、集中式LB(Proxy Model)

在服務消費者和服務提供者之間有一個獨立的LB,通常是專門的硬體裝置如 F5,或者基於軟體如 LVS,HAproxy等實現。LB上有所有服務的地址對映表,通常由運維配置註冊,當服務消費方呼叫某個目標服務時,它向LB發起請求,由LB以某種策略,比如輪詢(Round-Robin)做負載均衡後將請求轉發到目標服務。LB一般具備健康檢查能力,能自動摘除不健康的服務例項。 該方案主要問題:

1、 單點問題,所有服務呼叫流量都經過LB,當服務數量和呼叫量大的時候,LB容易成為瓶頸,且一旦LB發生故障影響整個系統;
2、服務消費方、提供方之間增加了一級,有一定效能開銷。
2、程序內LB(Balancing-aware Client)

針對第一個方案的不足,此方案將LB的功能整合到服務消費方程序裡,也被稱為軟負載或者客戶端負載方案。服務提供方啟動時,首先將服務地址註冊到服務登錄檔,同時定期報心跳到服務登錄檔以表明服務的存活狀態,相當於健康檢查,服務消費方要訪問某個服務時,它通過內建的LB元件向服務登錄檔查詢,同時快取並定期重新整理目標服務地址列表,然後以某種負載均衡策略選擇一個目標服務地址,最後向目標服務發起請求。LB和服務發現能力被分散到每一個服務消費者的程序內部,同時服務消費方和服務提供方之間是直接呼叫,沒有額外開銷,效能比較好。該方案主要問題:

1、開發成本,該方案將服務呼叫方整合到客戶端的程序裡頭,如果有多種不同的語言棧,就要配合開發多種不同的客戶端,有一定的研發和維護成本;
2、另外生產環境中,後續如果要對客戶庫進行升級,勢必要求服務呼叫方修改程式碼並重新發布,升級較複雜。
3、獨立 LB 程序(External Load Balancing Service)

該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。
不同之處是將LB和服務發現功能從程序內移出來,變成主機上的一個獨立程序。主機上的一個或者多個服務要訪問目標服務時,他們都通過同一主機上的獨立LB程序做服務發現和負載均衡。該方案也是一種分散式方案沒有單點問題,一個LB程序掛了隻影響該主機上的服務呼叫方,服務呼叫方和LB之間是程序內呼叫效能好,同時該方案還簡化了服務呼叫方,不需要為不同語言開發客戶庫,LB的升級不需要服務呼叫方改程式碼。
該方案主要問題:部署較複雜,環節多,出錯除錯排查問題不方便。

服務發現負載均衡實現

gRPC開源元件官方並未直接提供服務註冊與發現的功能實現,但其設計文件已提供實現的思路,並在不同語言的gRPC程式碼API中已提供了命名解析和負載均衡介面供擴充套件。

其基本實現原理:

1、服務啟動後gRPC客戶端向命名伺服器發出名稱解析請求,名稱將解析為一個或多個IP地址,每個IP地址標示它是伺服器地址還是負載均衡器地址,以及標示要使用那個客戶端負載均衡策略或服務配置。
2、客戶端例項化負載均衡策略,如果解析返回的地址是負載均衡器地址,則客戶端將使用grpclb策略,否則客戶端使用服務配置請求的負載均衡策略。
3、負載均衡策略為每個伺服器地址建立一個子通道(channel)。
4、當有rpc請求時,負載均衡策略決定那個子通道即grpc伺服器將接收請求,當可用伺服器為空時客戶端的請求將被阻塞。
根據gRPC官方提供的設計思路,基於程序內LB方案(即第2個案,阿里開源的服務框架 Dubbo 也是採用類似機制),結合分散式一致的元件(如Zookeeper、Consul、Etcd),可找到gRPC服務發現和負載均衡的可行解決方案。接下來以GO語言為例,簡單介紹下基於Etcd3的關鍵程式碼實現:

1)命名解析實現:resolver.go

package etcdv3

import (
"errors"
"fmt"
"strings"

etcd3 "github.com/coreos/etcd/clientv3"
"google.golang.org/grpc/naming"

)

// resolver is the implementaion of grpc.naming.Resolver
type resolver struct {
serviceName string // service name to resolve
}

// NewResolver return resolver with service name
func NewResolver(serviceName string) *resolver {
return &resolver{serviceName: serviceName}
}

// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (re *resolver) Resolve(target string) (naming.Watcher, error) {
if re.serviceName == "" {
return nil, errors.New("grpclb: no service name provided")
}

// generate etcd client
client, err := etcd3.New(etcd3.Config{
    Endpoints: strings.Split(target, ","),
})
if err != nil {
    return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
}

// Return watcher
return &watcher{re: re, client: *client}, nil

}

2)服務發現實現:watcher.go

package etcdv3

import (
"fmt"
etcd3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
"github.com/coreos/etcd/mvcc/mvccpb"
)

// watcher is the implementaion of grpc.naming.Watcher
type watcher struct {
re *resolver // re: Etcd Resolver
client etcd3.Client
isInitialized bool
}

// Close do nothing
func (w *watcher) Close() {
}

// Next to return the updates
func (w watcher) Next() ([]naming.Update, error) {
// prefix is the etcd prefix/value to watch
prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)

// check if is initialized
if !w.isInitialized {
    // query addresses from etcd
    resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())
    w.isInitialized = true
    if err == nil {
        addrs := extractAddrs(resp)
        //if not empty, return the updates or watcher new dir
        if l := len(addrs); l != 0 {
            updates := make([]*naming.Update, l)
            for i := range addrs {
                updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
            }
            return updates, nil
        }
    }
}

// generate etcd Watcher
rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())
for wresp := range rch {
    for _, ev := range wresp.Events {
        switch ev.Type {
        case mvccpb.PUT:
            return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil
        case mvccpb.DELETE:
            return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil
        }
    }
}
return nil, nil

}

func extractAddrs(resp *etcd3.GetResponse) []string {
addrs := []string{}

if resp == nil || resp.Kvs == nil {
    return addrs
}

for i := range resp.Kvs {
    if v := resp.Kvs[i].Value; v != nil {
        addrs = append(addrs, string(v))
    }
}

return addrs

}

3)服務註冊實現:register.go

package etcdv3

import (
"fmt"
"log"
"strings"
"time"

etcd3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"

)

// Prefix should start and end with no slash
var Prefix = "etcd3_naming"
var client etcd3.Client
var serviceKey string

var stopSignal = make(chan bool, 1)

// Register
func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {
serviceValue := fmt.Sprintf("%s:%d", host, port)
serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)

// get endpoints for register dial address
var err error
client, err := etcd3.New(etcd3.Config{
    Endpoints: strings.Split(target, ","),
})
if err != nil {
    return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
}

go func() {
    // invoke self-register with ticker
    ticker := time.NewTicker(interval)
    for {
        // minimum lease TTL is ttl-second
        resp, _ := client.Grant(context.TODO(), int64(ttl))
        // should get first, if not exist, set it
        _, err := client.Get(context.Background(), serviceKey)
        if err != nil {
            if err == rpctypes.ErrKeyNotFound {
                if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                    log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())
                }
            } else {
                log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())
            }
        } else {
            // refresh set to true for not notifying the watcher
            if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())
            }
        }
        select {
        case <-stopSignal:
            return
        case <-ticker.C:
        }
    }
}()

return nil

}

// UnRegister delete registered service from etcd
func UnRegister() error {
stopSignal <- true
stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock
var err error;
if _, err := client.Delete(context.Background(), serviceKey); err != nil {
log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())
} else {
log.Printf("grpclb: deregister '%s' ok.", serviceKey)
}
return err
}

4)介面描述檔案:helloworld.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.midea.jr.test.grpc";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {
}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

5)實現服務端介面:helloworldserver.go

package main

import (
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"

grpclb "com.midea/jr/grpclb/naming/etcd/v3"
"com.midea/jr/grpclb/example/pb"

)

var (
serv = flag.String("service", "hello_service", "service name")
port = flag.Int("port", 50001, "listening port")
reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)

func main() {
flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
if err != nil {
    panic(err)
}

err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)
if err != nil {
    panic(err)
}

ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
    s := <-ch
    log.Printf("receive signal '%v'", s)
    grpclb.UnRegister()
    os.Exit(1)
}()

log.Printf("starting hello service at %d", *port)
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
s.Serve(lis)

}

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s server) SayHello(ctx context.Context, in pb.HelloRequest) (*pb.HelloReply, error) {
fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

6)實現客戶端介面:helloworldclient.go

package main

import (
"flag"
"fmt"
"time"

grpclb "com.midea/jr/grpclb/naming/etcd/v3"
"com.midea/jr/grpclb/example/pb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"strconv"

)

var (
serv = flag.String("service", "hello_service", "service name")
reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)

func main() {
flag.Parse()
r := grpclb.NewResolver(*serv)
b := grpc.RoundRobin(r)

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))
if err != nil {
    panic(err)
}

ticker := time.NewTicker(1 * time.Second)
for t := range ticker.C {
    client := pb.NewGreeterClient(conn)
    resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
    if err == nil {
        fmt.Printf("%v: Reply is %s\n", t, resp.Message)
    }
}

}