rpc之thrift入門與TBinaryProtocol原始碼追蹤
thrift是一個支援多語言進行RPC的軟體庫,開發者通過定義資料型別和服務介面,經由thrift的程式碼生成引擎就可以構建RPC客戶端和服務端所需要的程式碼。它的核心元件如下:
-
Types。thrift支援的資料型別,由thrift自身定義,定義的型別是所有程式語言中都會用到的關鍵型別
-
Transport。做資料傳輸用,抽象了對網路的一些操作,比如 read/write
-
Protocol。資料經由client和server進行編碼或者解碼的格式,比如 JSON、binary
-
Versioning。client和server對於介面變動的處理方式,比如server在舊介面上新增了一個欄位,但是client還是舊的
-
Processors。服務端收到RPC後的處理邏輯,負責將讀到的內容交由server處理,並將結果寫回輸出流
Type
-
支援的基本型別為
-
bool 取值為true/false
-
byte 有符號的1個位元組
-
i16 16位元的有符號integer
-
i32 32位元的有符號integer
-
i64 64位元的有符號integer
-
double 64位元的浮點數
-
string 文字字串
-
-
複雜型別,比如java中的自定義物件,可以通過 struct 來組織
-
集合目前支援 list/set/map,可以對應成java的ArrayList/HashSet/HashMap
-
異常通過 exception 來標識,類似 struct
-
介面定義使用 service 來標識
-
列舉使用 enum 來標識
Thrift介面描述語言(IDL)
Thrift IDL檔案是開發者自定義邏輯內容的地方,它會被Thrift 的程式碼生成器處理成目標語言的程式碼
以下以java來類比
-
namespace java paxi.maokitty.verify.service
namespace 相當於java中宣告瞭當前檔案的包名,java則是表示namespace的適用語言 -
include 'myException.thrift'
include相當於import,引入別的包,以便使用它的內容 -
typedef myException.myException myException
定義別名,方便寫 -
1:i32 code
定義欄位 code,型別是i32,序號是1 -
string say(1:string msg)throws(1:myException e)
定義方法say,返回值是string,引數是string,丟擲自定義異常 myException(這裡是別名)
Transport
-
TTransport 提供了一些網路方法的抽象 open/close/read/write/flush ;
-
TServerTransport 則是負責接受建立連線 open/listen/accept/close
-
TSocket 對TTransport的整合實現,提供sokcet
Protocol
protocol提供了一種將記憶體資料對映到一種特定結構的機制,thrift的設計是要支援流式協議
-
資料整個被編碼成一系列的欄位,每個欄位都有一個型別和一個唯一的識別碼(序號)
-
structs的結束則會自帶一個 STOP 的標記
這種設計方式使得thrift協議能夠自我界定,而不需要關心具體的編碼方式
當然如果固定格式是一種優勢或者流處理不是必須的,可以使用TFrameTransport來支援
Versioning
Versioning在thrift中的實現是通過欄位識別碼完成的,每個宣告的欄位前面都會有一個唯一的欄位識別碼
如果沒有人為的新增,thrift自身會從-1開始遞減,自動加上
當Server讀到資料之後,會根據欄位識別碼來識別是否有根據定義檔案存放了這個欄位,如果沒有,就會跳過無法識別欄位的長度,而不用丟擲異常。如果這個欄位是有的,而且必須要的,則在真的使用這個欄位前,先通過isset檢查是否真的存在
新老版本可能存在的情況分析
-
添加了新的欄位,老的客戶端,新的server。此時老的客戶端不會傳送新的欄位,新的server發現欄位沒有set,就按照過期請求的預設行為處理
-
刪除了欄位,老的客戶端,新的server。老的客戶端會傳送已經刪掉的欄位,新的server會直接無視
-
添加了新的欄位,新的客戶端,老的server。新客戶端會傳送新的欄位,老的server則是直接無視這個欄位,按現有邏輯處理
-
刪了欄位,新的客戶端,老的server。建議不要出現這種情況
Processor
對每個service,都會對應的生成一個Processor,它主要是負責從網路得到的資料中提取出內容,然後代理請求真正的方法,再把結果寫入輸出的Prococol
thrift命令使用
thrift -out ../src/main/java --gen java:private-members ./myException.thrift
複製程式碼
-out
指定輸出的目錄
--gen java:private-members ./myException.thrift
表示目標語言是java,欄位使用private
thrift程式碼
根據thrift的IDL語法,寫下自己要實現的函式功能
service DemoService{string say(1:string msg)throws(1:myException e),}
複製程式碼
以使用TBinaryProtocol
為例,指定好埠、建立Transport、發起請求的客戶端
//1:網路請求相關設定
transport=new TSocket("127.0.0.1",9000,1000);
//2:傳輸資料的編碼方式
TProtocol protocol=new TBinaryProtocol(transport);
//3:建立連線
transport.open();
//4:建立客戶端
DemoService.Client client=new DemoService.Client(protocol);
//5:發起請求
String say = client.say("i am client");
複製程式碼
在服務端則對應設定接收請求的埠,然後等待連線的到來
//1:建立等待連線的serverSocket
TServerSocket serverSocket=new TServerSocket(9000);
//2:構建server所需要的引數
TServer.Args serverArgs=new TServer.Args(serverSocket);
//3:邏輯處理
TProcessor processor=new DemoService.Processor<DemoService.Iface>(new DemoServiceImpl());
//4:解析協議
serverArgs.protocolFactory(new TBinaryProtocol.Factory());
serverArgs.processor(processor);
//5:組織元件完成功能
TServer server=new TSimpleServer(serverArgs);
LOG.info("main server start ... ");
//6:等待連線到來
server.serve();
複製程式碼
TBinaryProtocol原始碼追蹤
服務端啟動後,等待連線的到來
@Trace(
index = 9,originClassName = "org.apache.thrift.server.TSimpleServer",function = "public void serve() "
)
public void serve(){
//...
Code.SLICE.source("client = serverTransport_.accept();")
.interpretation("底層就是ServerSocket的accept函式,它將返回的結果封裝成TSocket返回");
//..
Code.SLICE.source(" processor = processorFactory_.getProcessor(client);\n" +
" inputTransport = inputTransportFactory_.getTransport(client);\n" +
" outputTransport = outputTransportFactory_.getTransport(client);\n" +
" inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);\n" +
" outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);\n" +
" while (processor.process(inputProtocol,outputProtocol)) {}")
.interpretation("processor即thrift根據使用者寫的程式碼實現類的processor,其餘四個引數則是得到的請求中獲取的協議處理器,用來讀取資料和返回資料,拿到後交由處理器處理");
}
複製程式碼
客戶端則是在主動發起請求的時候,按照TBinaryProtocol的協議格式寫入資料
Code.SLICE.source("oprot_.writeMessageBegin(new TMessage(methodName,TMessageType.CALL,++seqid_));")
.interpretation("opprot即初始化 DemoService.client 時傳入的 TBinaryProtocol,seqid預設值為0")
.interpretation("Begin部分的寫入首先是按照位元組數寫入版本、然後是方法名的長度,再是方法名,最後寫入序列號,按照特定的規則寫入資料");
Code.SLICE.source("args.write(oprot_);")
.interpretation("負責將引數寫入Buffer,它會按照引數的順序寫入,每個引數又是按照型別、序號、值的順序寫入");
//..
Code.SLICE.source("oprot_.getTransport().flush();")
.interpretation("資料已經寫入了緩衝區,把沒有寫完的資料寫入對應的檔案描述符");
複製程式碼
當收到請求後服務端則交由服務端的實現具體的處理邏輯然後再回寫內容
Code.SLICE.source("T args = getEmptyArgsInstance();")
.interpretation("拿到引數的型別,這裡就是 say_args");
//..
Code.SLICE.source("args.read(iprot);")
.interpretation("從say_args的scheme(say_argsStandardScheme)中讀取引數");
//..
Code.SLICE.source("TBase result = getResult(iface,args);")
.interpretation("呼叫實現類,去執行使用者自己寫的邏輯,並得到對應的結果");
//...
Code.SLICE.source("oprot.writeMessageBegin(new TMessage(getMethodName(),TMessageType.REPLY,seqid));" +
" result.write(oprot);\n" +
" oprot.writeMessageEnd();\n" +
" oprot.getTransport().flush();")
.interpretation("開始往返回Stream中寫入資料,表明這是對那個方法的返回值,然後寫入返回的結果,最後輸入socket");
複製程式碼
TBinaryProtocol原始碼總結
client會按照位元組的寫入規則嚴格的寫入和讀取。底層通訊實際上就是socket,服務端接收到請求後,交由對應使用者的實現介面來呼叫實現類,再將結果寫入輸出流, 客戶端等結果返回後再按照規則讀取結果,完成1次rpc的呼叫