python client 與 java netty server , 用protobuf通訊的問題
阿新 • • 發佈:2019-01-04
此時如果客戶端也是netty,那麼通訊沒有問題,但如果是非java語言,那麼如果直接傳送/接受protobuf序列化後的二進位制包,會報以下異常:
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
服務端採用了:
pipeline.addLast(“frameDecoder”, new ProtobufVarint32FrameDecoder());
pipeline.addLast(“frameEncoder”,
new ProtobufVarint32LengthFieldPrepender());
在對每個protobuf訊息進行傳送前,會加一個訊息的長度,同時解碼的時候,也需要先解析這個長度再解析訊息,以免出現半包、粘包問題。
以python為例
//把protobuf物件傳過來
def sendSig(self,sig): //把protobuf資料反序列化 print("=>", sig.SerializeToString()) value = len(sig.SerializeToString()) bits = value & 0x7f value >>= 7 while value: self.send_queue.put(six.int2byte(0x80 | bits)) bits = value & 0x7f value >>= 7 self.send_queue.put(six.int2byte(bits)) self.send_queue.put(sig.SerializeToString())
詳解 self.send_queue.put 這個可以理解是個佇列,首先要定義一個佇列把資料存入進去! 具體編碼細節可以搜尋Varint32!在socket傳送時候 取得時候會有一個bug!有時候回去三次或兩次,在傳送時候請判斷好
至於接受訊息,湊合著用吧
class FFRespReciver(Thread): PARSING_LEN = 0 PARSING_MSG = 1 def __init__(self,ffconnector): super(FFRespReciver,self).__init__() self.ffconnector = ffconnector self.data_buffer = b"" self.parse_status = FFRespReciver.PARSING_LEN self.msg_len = 0 def run(self): while True: data = self.ffconnector.recv_queue.get() for b in data: if self.parse_status == FFRespReciver.PARSING_LEN: self.data_buffer += six.int2byte(b) if not (b & 0x80): self.msg_len = DecodeVarint(self.data_buffer) self.parse_status = FFRespReciver.PARSING_MSG self.data_buffer = b"" continue elif self.parse_status == FFRespReciver.PARSING_MSG: self.data_buffer += six.int2byte(b) if len(self.data_buffer) == self.msg_len: sig = SIG.Sig() sig.ParseFromString(self.data_buffer) self.process(sig) self.data_buffer = b"" self.msg_len = 0 self.parse_status = FFRespReciver.PARSING_LEN def process(self,sig): print(sig) //do the dirty job def DecodeVarint(buffer): mask = (1 << 32) - 1 result_type = int result = 0 shift = 0 for b in buffer: result |= ((b & 0x7f) << shift) shift += 7 if shift >= 64: raise Exception('Too many bytes when decoding varint.') result &= mask result = result_type(result) return result
即用個狀態機,先接受訊息長度,解析後,接受訊息並處理,然後繼續接受訊息長度。