protobuf RPC實現
阿新 • • 發佈:2019-01-01
Server 結構
結構參照hadoop RPC結構,自己造輪子
傳輸的資料結構
1,abstract class Server 接收並且響應客戶端請求,把請求資料封裝成Call 交給之類實現
2,客戶端首次連線必須傳送頭”HEADER”+version+ServiceClass(Server 實現的之類)+auth
*|——–4byte——-|———-3byte—————–|
*|———————-|—-8 bit—-8 bit—–8bit——|
*|——–HEADER—–|–version ServiceClass auth—|
3,call結構
*|——–4byte———-|———————-dataLength———————————|
*|————————| –len(1~4byte)-|—data—— |–len(1~4byte)-|—data——–|
*|————————| –RpcRequestHeaderProto– |–?RequestProto—————|
*|—-請求資料的長度—-|—————具體的請求資料————————————|
Server處理client 資料(java)
- 首先讀取4個位元組的看看是不是RPC請求
- 然後讀取3個位元組比對版本,Server具體的實現類,認證資訊
- 迴圈開始讀取call
1.讀取4個位元組(一個Int)表示當前call的資料長度
2,讀取call的資料長度的資料
3,封裝成Call物件,放入Call佇列
處理Call佇列
- 迴圈讀取call
- 根據varint解壓縮讀取RpcRequestHeader的長度
- 通過長度讀取資料
- 通過RpcRequestHeaderProto.parseFrom 解析出RequestHeader
- 通過RequestHeader的protoName+version 找到具體處理類(BlockingService service)
- MethodDescriptor method = service.getDescriptorForType().findMethodByName(methodName); 找到執行的方法
- Message requestPrototype = service.getRequestPrototype(method);找到請求引數型別
- 根據varint解壓縮讀取request的長度
- 通過長度讀取資料
- requestPrototype.newBuilderForType().mergeFrom(資料).build()
- Message result= service.callBlockingMethod(method, null,requestPrototype.newBuilderForType().mergeFrom(request.theRequestRead).build()); 獲取返回物件
- 生成返回頭(RpcResponseHeaderProto)
- 把RpcResponseHeaderProto+result(同樣寫入總長度+header的長度+header+response的長度+response)放入響應佇列
響應佇列
- 讀取資料返回給客戶端
客戶端傳送資料
- c = ClientProtocolService_Stub(service.RpcChannel) service.RpcChannel表示繼承service.RpcChannel的例項,重寫CallMethod
- c.echo 實際就是呼叫service.RpcChanneld 的CallMethod方法,引數為 EchoRequestProto
- 根據方法產生RpcRequestHeaderProto例項
- 把(總資料長度+RpcRequestHeaderProto長度的varint壓縮+RpcRequestHeaderProto.SerializeToString+EchoRequestProto長度的varint壓縮+EchoRequestProto.SerializeToString)傳送到伺服器
- 等待資料返回,
1.讀取總返回的資料長度,固定4個位元組
2.根據長度讀取資料
- 根據varint解壓縮讀取RpcResponseHeaderProto的長度
- 讀取RpcResponseHeaderProto資料,responseHeader.ParseFromString解析資料
- 判斷是否成功,失敗顯示錯誤資訊退出
- 如果成功,根據varint解壓縮讀取EchoResponseProto的長度和資料
程式碼
伺服器是java(因為python沒找到select.select喚醒的方法所以使用java寫),客戶端是python
https://github.com/neo-hu/RPC
注意
資料的長度
java的writeDelimitedTo寫入的資料會先寫入這個Msg的資料的長度,是1~4位元組的資料
如果java解析 parseDelimitedFrom,
python沒這方法(也許是我沒找到)需要直接先寫入長度,在寫入資料
varint
def write_raw_varint(value):
"""
varint壓縮
"""
local_chr = _PY2 and chr or (lambda x: bytes((x,)))
pieces = []
write = lambda x: pieces.append(local_chr(x))
bits = value & 0x7f
value >>= 7
while value:
write(0x80 | bits)
bits = value & 0x7f
value >>= 7
write(bits)
return ''.join(pieces)
def read_raw_varint(buff):
"""
varint解壓縮,每次讀取8為,如果第一位是1表示還有資料,把後7位儲存為tmp,直到讀取到第一位為0的,然後把所有的tmp後7連線起來,倒序連線
"""
s1 = struct.Struct("!b")
shift = 0
result = 0
index = 0
while True:
tmp = buff[index]
index += 1
b, = s1.unpack(tmp)
result |= (b & 0x7f) << shift
if not (b & 0x80):
return index, result
shift += 7
if shift >= 64:
raise Exception("太多的位元組")