1. 程式人生 > >HBase的RPC原始碼分析

HBase的RPC原始碼分析

RPC服務是指跨網路的服務呼叫,客戶端發出服務請求,經過網路傳輸到服務端。服務端解析該請求,呼叫本地方法獲取結果,然後將結果作為響應包通過網路傳送回客戶端,這樣客戶端在呼叫遠端方法時就會像呼叫本地方法一樣簡單。

RPC呼叫時有兩個問題需要解決,其一是client端與server端採用何種方式通訊,其二是請求資訊和結果以何種格式在網路上傳輸,也就是RPC通訊協議和RPC通訊框架。通訊協議需要client端和server端約定好請求引數的型別、引數順序以及相應結果的型別等等,通訊框架則定義了兩端通訊的方式,可選的包括TCP/UDP通訊,HTTP等。除此之外,RPC框架還需要解決server端服務呼叫超時、client端的重試、服務端限流和server端的排程等問題。

HBase中的master/regionserver/client等元件在RPC中的關係如下圖中所示:


其中,client與master之間的通訊主要是hbase的DML操作,包括table schema的更改,table region的遷移合併、region server的上線和下線以及叢集負載均衡、Table的快照管理等功能;client與regionserver之間的通訊用於實現資料讀寫請求,如get、multiGet、mutate、scan、bulkLoadHfile、執行coprocessor等;master與region server之間的通訊用於region server向master彙報自身的狀態,包括自身管理的region和自身的當前狀態等等。


下面分別從客戶端和服務端的角度分析hbase中rpc呼叫的實現。在客戶端中,hbase的rpc服務可以拆分為兩個主要流程,分別是請求構建和請求處理。下面我們以hbase的get請求為例,以求清晰解釋客戶端的rpc流程。首先講解請求是如何構建的。


上面這段程式碼就是get請求的構建流程,其將get請求封裝進了一個RegionServerCallable物件,最後呼叫rpcCallerFactory中的callWithRetries方法開始rpc呼叫。這裡有一個需要注意的伏筆,就是get請求最終是落在了下面這一句:

ClientProtos.GetResponse response = getStub().get(controller, request)

也就是說getStub中返回的ClientService.BlockingInterface管理了最終請求的處理。

上面的先按下不表,接著進入callWithRetries方法,其第一個try塊中的程式碼就是其主要邏輯。如下圖中所示:


簡而言之就是分別呼叫了callable的prepare方法和call方法,call方法的定義我們在上面看到了,那麼prepare方法做了什麼事呢,進入prepare方法,分析其程式碼:


prepare方法接收一個reload引數以決定其每次連結regionserver失敗時是否會重新定位region所在的位置,一般設為true。上面程式碼的關鍵是最後一句:

setStub(getConnection().getClient(this.location.getServerName()))

setStub實際上是設定了callable中的ClientService.BlockingInterface型別成員變數。至此,呼應了前面埋下的伏筆。ClientService.BlockingInterface關聯了rpc請求的處理。客戶端發往服務端rpc請求可以劃分兩類,分別是涉及叢集管理的DML操作請求和涉及資料讀寫的DDL操作請求,這兩類操作請求分別由AdminService.BlockingInterface&ClientService.BlockingInterface兩個類來處理。

getClient方法的主要程式碼如下圖中所示:


getClient主要做了兩件事,首先根據埠名,主機名&呼叫方法名構造了key,接著呼叫了rpcClient的createBlockingRpcChannel方法。Hbase中有多種不同型別的rpcClient,我們以最常見的AsyncRpcClient為例繼續講解。

在createBlockingRpcChannel內部例項化了一個BlockingRpcChannelImplementation類。該類呼叫了rpcClient的CallBlockingMethod方法,這個方法實際上是一個callable方法,不同的rpcClient中定義了不同的call實現,在AsyncRpcClient的call實現中的主要流程如下:

上圖展示了主要程式碼,後面的都是錯誤處理的程式碼。可以看出這裡用到了netty提供的非同步模型Future/Promise方法。在這個模型中,Future表示一個可能還沒有實際完成的非同步任務的結果,針對這個結果可以新增Callback以便在任務執行成功或失敗後作出對應的操作,而Promise交由任務執行者,任務執行者可以通過Promise標記任務完成或者失敗。

這裡的非同步任務是經過callMethod方法包裝的,callMethod方法接受一個MethodDescriptor型別的引數method,並將其封裝為AsyncCall型別的call,然後呼叫writeRequest()。writeRequest()方法位於AsyncRpcChannel.java中,其將請求call的引數封裝好寫入到netty的channel中,並新增一個CallWriteListener用於監聽channel的寫入結果。

上述程式碼主要位於AsyncRpcChannel.java、AbstractRpcClient.java&AsyncRpcClient.java三個原始檔中。通過分析上述程式碼,可以看到hbase的非同步rpc請求在實現上是依賴了netty提供的promise/future模型的。

此外,還有一個問題就是client端與server端的socket連線是在什麼時候建立起來的?

上面說了AbstractRpcClient這個類是個抽象類,各個rpcClient會覆寫這個類中的方法,在這個類的callBlockingMethod中建立起了client端與server端的連線,具體是在下面一句:

val = call(pcrc,md,param,returnType,ticket,isa);

在call方法中依次呼叫了connection.tracedWriteRequest->writeRequest->setupIOstreams->setupConnection,最終在setupConnection中建立起了本次rpc與服務端的連線,建立連線的主要程式碼主要程式碼如下:
while(true){
   try{
      this.socket=socketFactory.createSocket();
      this.socket.setTcpNoDelay(tcpNoDelay);
      this.socket.setKeepAlive(tcpKeepAlive);
      if(localAddr != null) {
         this.socket.bind(localAddr);
      }
      NetUtils.connect(this.socket, remoteId.getAddress(),connectTO);
      this.socket.setSoTimeout(readTO);
      return;
   }catch(SocketTimeoutException toe){
      handleConnectionFailure(timeoutFailures++,maxRetries,toe);
   }catch(IOException ie){
      handleConnectionFailure(ioFailures++,maxRetries,toe);
   }
}

maxRetries定義了建連的重試次數,從原始碼可以看出,對待io錯誤和socket超時這裡是分別計算重試次數的。

至此,rpc請求的處理簡要敘述完畢。筆者水平有限,上述文中不妥的地方歡迎大家一起討論。

參考資料: