1. 程式人生 > >protobuf RPC實現

protobuf RPC實現

Server 結構

結構參照hadoop RPC結構,自己造輪子
這裡寫圖片描述

傳輸的資料結構

這裡寫圖片描述

1,abstract class Server 接收並且響應客戶端請求,把請求資料封裝成Call 交給之類實現
2,客戶端首次連線必須傳送頭”HEADER”+version+ServiceClass(Server 實現的之類)+auth
*|——–4byte——-|———-3byte—————–|
*|———————-|—-8 bit—-8 bit—–8bit——|
*|——–HEADER—–|–version ServiceClass auth—|
3,call結構

*|——–4byte———-|———————-dataLength———————————|
*|————————| –len(1~4byte)-|—data—— |–len(1~4byte)-|—data——–|
*|————————| –RpcRequestHeaderProto– |–?RequestProto—————|
*|—-請求資料的長度—-|—————具體的請求資料————————————|

Server處理client 資料(java)

  1. 首先讀取4個位元組的看看是不是RPC請求
  2. 然後讀取3個位元組比對版本,Server具體的實現類,認證資訊
  3. 迴圈開始讀取call
    1.讀取4個位元組(一個Int)表示當前call的資料長度
    2,讀取call的資料長度的資料
    3,封裝成Call物件,放入Call佇列

處理Call佇列

  1. 迴圈讀取call
    1. 根據varint解壓縮讀取RpcRequestHeader的長度
    2. 通過長度讀取資料
    3. 通過RpcRequestHeaderProto.parseFrom 解析出RequestHeader
    4. 通過RequestHeader的protoName+version 找到具體處理類(BlockingService service)
    5. MethodDescriptor method = service.getDescriptorForType().findMethodByName(methodName); 找到執行的方法
    6. Message requestPrototype = service.getRequestPrototype(method);找到請求引數型別
    7. 根據varint解壓縮讀取request的長度
    8. 通過長度讀取資料
    9. requestPrototype.newBuilderForType().mergeFrom(資料).build()
    10. Message result= service.callBlockingMethod(method, null,requestPrototype.newBuilderForType().mergeFrom(request.theRequestRead).build()); 獲取返回物件
    11. 生成返回頭(RpcResponseHeaderProto)
    12. 把RpcResponseHeaderProto+result(同樣寫入總長度+header的長度+header+response的長度+response)放入響應佇列

響應佇列

  1. 讀取資料返回給客戶端

客戶端傳送資料

  1. c = ClientProtocolService_Stub(service.RpcChannel) service.RpcChannel表示繼承service.RpcChannel的例項,重寫CallMethod
  2. c.echo 實際就是呼叫service.RpcChanneld 的CallMethod方法,引數為 EchoRequestProto
  3. 根據方法產生RpcRequestHeaderProto例項
  4. 把(總資料長度+RpcRequestHeaderProto長度的varint壓縮+RpcRequestHeaderProto.SerializeToString+EchoRequestProto長度的varint壓縮+EchoRequestProto.SerializeToString)傳送到伺服器
  5. 等待資料返回,
    1.讀取總返回的資料長度,固定4個位元組
    2.根據長度讀取資料
    1. 根據varint解壓縮讀取RpcResponseHeaderProto的長度
    2. 讀取RpcResponseHeaderProto資料,responseHeader.ParseFromString解析資料
    3. 判斷是否成功,失敗顯示錯誤資訊退出
    4. 如果成功,根據varint解壓縮讀取EchoResponseProto的長度和資料

程式碼

伺服器是java(因為python沒找到select.select喚醒的方法所以使用java寫),客戶端是python
https://github.com/neo-hu/RPC

注意

資料的長度

java的writeDelimitedTo寫入的資料會先寫入這個Msg的資料的長度,是1~4位元組的資料
如果java解析 parseDelimitedFrom,
python沒這方法(也許是我沒找到)需要直接先寫入長度,在寫入資料

varint

def write_raw_varint(value):
    """
    varint壓縮
    """
    local_chr = _PY2 and chr or (lambda x: bytes((x,)))
    pieces = []
    write = lambda x: pieces.append(local_chr(x))
    bits = value & 0x7f
    value >>= 7
    while value:
        write(0x80 | bits)
        bits = value & 0x7f
        value >>= 7
    write(bits)
    return ''.join(pieces)


def read_raw_varint(buff):
    """
        varint解壓縮,每次讀取8為,如果第一位是1表示還有資料,把後7位儲存為tmp,直到讀取到第一位為0的,然後把所有的tmp後7連線起來,倒序連線
    """
    s1 = struct.Struct("!b")
    shift = 0
    result = 0
    index = 0
    while True:
        tmp = buff[index]
        index += 1
        b, = s1.unpack(tmp)
        result |= (b & 0x7f) << shift
        if not (b & 0x80):
            return index, result
        shift += 7
        if shift >= 64:
            raise Exception("太多的位元組")