1. 程式人生 > >python client 與 java netty server , 用protobuf通訊的問題

python client 與 java netty server , 用protobuf通訊的問題

此時如果客戶端也是netty,那麼通訊沒有問題,但如果是非java語言,那麼如果直接傳送/接受protobuf序列化後的二進位制包,會報以下異常:
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
服務端採用了:
pipeline.addLast(“frameDecoder”, new ProtobufVarint32FrameDecoder());
pipeline.addLast(“frameEncoder”,
new ProtobufVarint32LengthFieldPrepender());
在對每個protobuf訊息進行傳送前,會加一個訊息的長度,同時解碼的時候,也需要先解析這個長度再解析訊息,以免出現半包、粘包問題。
以python為例
//把protobuf物件傳過來

	def sendSig(self,sig):
	       //把protobuf資料反序列化
	        print("=>", sig.SerializeToString())
	        value = len(sig.SerializeToString())
	        bits = value & 0x7f
	        value >>= 7
	        while value:
	            self.send_queue.put(six.int2byte(0x80 | bits))
	            bits = value & 0x7f
	            value >>= 7
	        self.send_queue.put(six.int2byte(bits))
	        self.send_queue.put(sig.SerializeToString())

詳解 self.send_queue.put 這個可以理解是個佇列,首先要定義一個佇列把資料存入進去! 具體編碼細節可以搜尋Varint32!在socket傳送時候 取得時候會有一個bug!有時候回去三次或兩次,在傳送時候請判斷好
至於接受訊息,湊合著用吧

class FFRespReciver(Thread):

PARSING_LEN = 0
PARSING_MSG = 1

def __init__(self,ffconnector):
    super(FFRespReciver,self).__init__()
    self.ffconnector = ffconnector
    self.data_buffer = b""
    self.parse_status = FFRespReciver.PARSING_LEN
    self.msg_len = 0

def run(self):
    while True:
        data = self.ffconnector.recv_queue.get()
        for b in data:
            if self.parse_status == FFRespReciver.PARSING_LEN:
                self.data_buffer += six.int2byte(b)
                if not (b & 0x80):
                    self.msg_len = DecodeVarint(self.data_buffer)
                    self.parse_status = FFRespReciver.PARSING_MSG
                    self.data_buffer = b""
                    continue
            elif self.parse_status == FFRespReciver.PARSING_MSG:
                self.data_buffer += six.int2byte(b)
                if len(self.data_buffer) == self.msg_len:
                    sig = SIG.Sig()
                    sig.ParseFromString(self.data_buffer)
                    self.process(sig)
                    self.data_buffer = b""
                    self.msg_len = 0
                    self.parse_status = FFRespReciver.PARSING_LEN

def process(self,sig):
    print(sig)
    //do the dirty job
def DecodeVarint(buffer):
    mask = (1 << 32) - 1
    result_type = int
    result = 0
    shift = 0
    for b in buffer:
        result |= ((b & 0x7f) << shift)
        shift += 7
        if shift >= 64:
            raise Exception('Too many bytes when decoding varint.')
    result &= mask
    result = result_type(result)
    return result

即用個狀態機,先接受訊息長度,解析後,接受訊息並處理,然後繼續接受訊息長度。