Thrift原理簡析(JAVA)
Apache Thrift是一個跨語言的服務框架,本質上為RPC,同時具有序列化、反序列化機制;當我們開發的service需要開放出去的時候,就會遇到跨語言呼叫的問題,JAVA語言開發了一個UserService用來提供獲取使用者資訊的服務,如果服務消費端有PHP/Python/C++等,我們不可能為所有的語言都適配出相應的呼叫方式,有時候我們會很無奈的使用Http來作為訪問協議;但是如果服務消費端不能使用HTTP,而且更加傾向於以操作本地API的方式來使用服務,那麼我們就需要Thrift來提供支援.
不過,如果你的JAVA服務,並沒有跨語言呼叫的需求,那麼使用thrift作為RPC框架,似乎不是最好的選擇,不管thrift是否效能優越,但是它使用起來確實沒有類似於Hessian/CXF等那樣便捷和易於使用.
本文以UserService為例,描述一下使用thrift的方式,以及其原理..
一. service.thrift
Java程式碼- struct User{
- 1:i64 id,
- 2:string name,
- 3:i64 timestamp,
- 4:bool vip
- }
- service UserService{
- User getById(1:i64 id)
- }
你可以將自己的JAVA服務通過".thrift"檔案描述出來,並提供給服務消費端,那麼消費端即可以生成自己的API檔案..Thrift框架目前已經支援大部分主流的語言,.需要注意,因為Thrift考慮到struct/service定義需要相容多種語言的"風格",所以它只支援一些基本的資料型別(比如i32,i64,string等),以及service定義的方法不能重名,即使引數列表不同.(並不是所有的語言都能像JAVA一樣支援過載)
二. 生成API檔案
首先下載和安裝thrift客戶端,比如在windows平臺下,下載thrift.exe,不過此處需要提醒,不同的thrift客戶端版本生成的API可能不相容.本例使用thrift-0.9.0.exe;通過"--gen"指定生成API所適配的語言.本例項為生成java客戶端API.
Java程式碼- //windows平臺下,將API檔案輸出在service目錄下(此目錄需要存在)
- > thrift.exe --gen java -o service service.thrift
需要明確的是:Thrift和其他RPC框架不同,thrift在生成的API檔案中,已經描述了"呼叫過程"(即硬編碼),而不是像其他RPC那樣在執行時(runtime)動態解析方法呼叫或者引數.
三. UserService實現類
Java程式碼- public class UserServiceImpl implements UserService.Iface {
- @Override
- public User getById(long id){
- System.out.println("invoke...id:" + id);
- return new User();//for test
- }
- }
實現類,需要放在Thrift server端.
四.原理簡析
1. User.java: thrift生成API的能力還是非常的有限,比如在struct中只能使用簡單的資料型別(不支援Date,Collection<?>等),不過我們能從User中看出,它生成的類實現了"Serializable"介面和"TBase"介面.
其中Serializable介面表明這個類的例項是需要序列化之後在網路中傳輸的,為了不干擾JAVA本身的序列化和反序列化機制,它還重寫了readObject和writeObject方法.不過這對thrift本身並沒有幫助.
TBase介面是thrift序列化和反序列化時使用的,它的兩個核心方法:read和write.在上述的thrift檔案中,struct定義的每個屬性都有一個序號,比如:1:id,那麼thrift在序列化時,將會根據序號的順序依次將屬性的"名稱 + 值"寫入inputStream中,反序列化也是如此.(具體參見read和write的實現).
Java程式碼- //read方法逐個讀取欄位,按照"索引",最終將"struct"物件封裝完畢.
- //write方法也非常類似,按照"索引"順序逐個輸出到流中.
- while (true){
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // ID
- if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.id = iprot.readI32();
- struct.setIdIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // NAME
- ..
- }
- }
因為thrift的序列化和反序列化例項資料時,是根據"屬性序號"進行,這可以保證資料在inputstream和outputstream中順序是嚴格的,此外每個struct中"序號"不能重複,但是可以不需要從"1"開始.如果"序號"有重複,將導致無法生成API檔案.這一點也要求API開發者,如果更改了thrift檔案中的struct定義,需要重新生成客戶端API,否則服務將無法繼續使用(可能報錯,也可能資料錯誤).thrift序列化/反序列化的過程和JAVA自帶的序列化機制不同,它將不會攜帶額外的class結構,此外thrift這種序列化機制更加適合網路傳輸,而且效能更加高效.
2. UserService.Client: 在生成的UserService中,有個Client靜態類,這個類就是一個典型的代理類,此類已經實現了UserService的所有方法.開發者需要使用Client類中的API方法與Thrift server端互動,它將負責與Thrift server的Socket連結中,傳送請求和接收響應.
需要注意的時,每次Client方法呼叫,都會在一個Socket連結中進行,這就意味著,在使用Client消費服務之前,需要和Thrift server建立有效的TCP連結.(稍後程式碼示例)
1) 傳送請求:
Java程式碼- //參見:TServiceClient
- //API方法呼叫時,傳送請求資料流
- protected void sendBase(String methodName, TBase args) throws TException {
- oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//首先寫入"方法名稱"和"seqid_"
- args.write(oprot_);//序列化引數
- oprot_.writeMessageEnd();
- oprot_.getTransport().flush();
- }
- protected void receiveBase(TBase result, String methodName) throws TException {
- TMessage msg = iprot_.readMessageBegin();//如果執行有異常
- if (msg.type == TMessageType.EXCEPTION) {
- TApplicationException x = TApplicationException.read(iprot_);
- iprot_.readMessageEnd();
- throw x;
- }//檢測seqid是否一致
- if (msg.seqid != seqid_) {
- throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
- }
- result.read(iprot_);//反序列化
- iprot_.readMessageEnd();
- }
Thrift提供了簡單的容錯方式:每次方法呼叫,都會在Client端標記一個seqid,這是一個自增的本地ID,在TCP請求時將此seqid追加到流中,同時Server端響應時,也將此seqid原樣返回過來;這樣客戶端就可以根據此值用來判斷"請求--響應"是對應的,如果出現亂序,將會導致此請求以異常的方式結束.
2) 響應
Java程式碼- //參考: TBaseProcessor.java
- @Override
- public boolean process(TProtocol in, TProtocol out) throws TException {
- TMessage msg = in.readMessageBegin();
- ProcessFunction fn = processMap.get(msg.name);//根據方法名,查詢"內部類"
- if (fn == null) {
- TProtocolUtil.skip(in, TType.STRUCT);
- in.readMessageEnd();
- TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
- out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
- x.write(out);//序列化響應結果,直接輸出
- out.writeMessageEnd();
- out.getTransport().flush();
- return true;
- }
- fn.process(msg.seqid, in, out, iface);
- return true;
- }
thrift生成的UserService.Processor類,就是server端用來處理請求過程的"代理類";server端從socket中讀取請求需要呼叫的"方法名" +引數列表,並交付給Processor類處理;和其他的RPC呼叫不同的時,thrift並沒有使用類似於"反射機制"的方式來呼叫方法,而是將UserService的每個方法生成一個"內部類":
Java程式碼- public static class getById<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getById_args> {
- public getById() {
- super("getById");//其中getById為識別符號
- }
- public getById_args getEmptyArgsInstance() {
- return new getById_args();
- }
- protected boolean isOneway() {
- return false;
- }
- //實際處理方法
- public getById_result getResult(I iface, getById_args args) throws org.apache.thrift.TException {
- getById_result result = new getById_result();
- result.success = iface.getById(args.id);//直接呼叫例項的具體方法,硬編碼
- return result;
- }
- }
這個"內部類",將會在Processor初始化的時候,放入到一個map中,此後即可以通過"方法名"查詢,然後呼叫其"getResult"方法了.由此可見,thrift客戶端與server端均沒有使用到“反射機制”,全程都是硬編碼實現,之所以這麼做的原因可能是考慮到效能,也可能是考慮到開發者可以在生成的程式碼基礎上任意調整以適應特殊的情況;如果使用反射機制,將會無法兼顧上述2個情況。
Java程式碼- public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
- public Processor(I iface) {
- super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
- }
- protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- super(iface, getProcessMap(processMap));
- }
- private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- //放入map
- processMap.put("getById", new getById());
- return processMap;
- }
- ....
- }
3) Server端Socket管理和執行策略
Java程式碼- TThreadPoolServer
- public void serve() {
- try {
- //啟動服務
- serverTransport_.listen();
- } catch (TTransportException ttx) {
- LOGGER.error("Error occurred during listening.", ttx);
- return;
- }
- // Run the preServe event
- if (eventHandler_ != null) {
- eventHandler_.preServe();
- }
- stopped_ = false;
- setServing(true);
- //迴圈,直到被關閉
- while (!stopped_) {
- int failureCount = 0;
- try {
- //accept客戶端Socket連結,
- //對於每個新連結,將會封裝成runnable,並提交給執行緒或者執行緒池中執行.
- TTransport client = serverTransport_.accept();
- WorkerProcess wp = new WorkerProcess(client);
- executorService_.execute(wp);
- } catch (TTransportException ttx) {
- if (!stopped_) {
- ++failureCount;
- LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
- }
- }
- }
- //....
- }
Thrift Server端,設計思路也非常的直接...當前Service server啟動之後,將會以阻塞的方式偵聽Socket連結(程式碼參考TThreadPoolServer),每建立一個Socket連結,都會將此Socket經過封裝之後,放入執行緒池中,本質上也是一個Socket連結對應一個Worker Thread.這個Thread只會處理此Socket中的所有資料請求,直到Socket關閉.
Java程式碼- //參考:WorkerProcess
- while (true) {
- if (eventHandler != null) {
- eventHandler.processContext(connectionContext, inputTransport, outputTransport);
- }
- if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
- break;
- }
- }
當有Socket連結不是很多的時候,TThreadPoolServer並不會有太大的效能問題,可以通過指定ThreadPool中執行緒的個數進行簡單的調優..如果Socket連結很多,我們只能使用TThreadedSelectorServer來做支撐,TThreadedSelectorServer內部基於NIO模式,具有非同步的特性,可以極大的提升server端的併發能力;不過在絕大多數情況下,在thrift中使用"非同步"似乎不太容易讓人接受,畢竟這意味著Client端需要阻塞,並且在高併發環境中這個阻塞時間是不可控的.但SelecorServer確實可以有效的提升Server的併發能力,而且在一定程度上可以提升吞吐能力,這或許是我們優化Thrift Server比較可靠的方式之一.
3. Client端程式碼示例
Java程式碼- public class UserServiceClient {
- public void startClient() {
- TTransport transport;
- try {
- transport = new TSocket("localhost", 1234);
- TProtocol protocol = new TBinaryProtocol(transport);
- UserService.Client client = new UserService.Client(protocol);
- transport.open();
- User user = client.getById(1000);
- ////
- transport.close();
- } catch (TTransportException e) {
- e.printStackTrace();
- } catch (TException e) {
- e.printStackTrace();
- }
- }
- }
4. Server端程式碼示例
Java程式碼- public class Server {
- public void startServer() {
- try {
- TServerSocket serverTransport = new TServerSocket(1234);
- UserService.Processor process = new Processor(new UserServiceImpl());//入口
- Factory portFactory = new TBinaryProtocol.Factory(true, true);
- Args args = new<