RPC中Callback Function與CountDownLatch的用法
目錄
●What & Why
RPC(Remote Procedure Call),通俗地說,就是在一臺計算機上呼叫另一臺計算機提供的服務。這裡的服務對應RPC中的P(Procedure),表現形式通常是API介面,或者說好比一個原生代碼工程中的一個函式。那為什麼要用RPC呢?最主要的原因有兩點:1、符合低耦合、職責分離、可複用的開發原則,將不同的服務(模組、功能……什麼名字都好,理解其本質即可)放在不同的程式碼工程,甚至不同的計算機(伺服器)上,避免所有程式碼雜糅在一個工程中,難以開發與維護;2、緩解負載壓力,不同計算機(伺服器)提供不同的服務,各司其職,不用做所有事,降低資源耗盡的風險。
RPC其實沒有那麼高深,如果不考慮底層實現原理,則對於程式設計師來說幾乎完全透明,就是一套固定步驟的開發流程,和呼叫本地工程程式碼中的函式沒有差別。以筆者目前所做的專案為例,步驟大體為:準備對應的stub(筆者將其理解為對方所能提供函式,在我方的一個說明)、準備遠端呼叫的通道、控制器物件;開啟子執行緒,利用回撥函式Callback Function,等待處理響應,並用執行緒倒數控制器CountDownLatch讓主執行緒阻塞;開啟遠端呼叫。
●回撥函式Callback Function與倒數計數器CountDownLatch
筆者之前看過一些關於回撥函式的文章,概念上已經理解,即A類的函式a1呼叫B類的函式b1,b1中需要呼叫A類的函式a2,這個a2就是回撥函式。但是對於其存在的意義,或者說使用回撥函式的場景到底是什麼,還不太想得到。直到專案中接觸到了RPC,才真的體會到回撥函式的作用。
我們做如下安排,伺服器B是一臺提供許多實用功能/服務的機器,它對外提供的介面包括字串處理(strDeal)等,我們在伺服器A的程式碼中,去呼叫伺服器B所提供的函式。
伺服器A——
public class Client{ public boolean checkBackMessage(BackMessage backMessage){ return backMessage.equals("Done") ? true : false; } public BackMessage dealStr(String str) { //接收RPC響應訊息的物件 final BackMessage backMessage = new BackMessage(); //構造一個RPC通道,具體程式碼未貼出,根據不同框架、專案而異 RpcChannelImpl channel = RpcChannelImpl.builderChannel(); if (channel != null) { //構造一個RPC控制器 RpcController controller = channel.newRpcController(); //對方所能提供的服務/函式都通過stub進行描述 Service.Stub stub = Service.newStub(channel); //構造RPC的請求 DealStrRequest.Builder request = DealStrRequest.newBuilder(); //設定請求引數 request.setCmd(CmdUtil.newRequestCmd(Hpp.CmdId.SERVER_RESTART_REQ)); request.setStr(str); //倒數計數器 final CountDownLatch latch = new CountDownLatch(1); //回撥函式處理響應 RpcCallback<DealStrResponse> done = new RpcCallback<DealStrResponse>() { @Override public void run(DealStrResponse response) { try { //響應處理 backMessage.setBackCode(response.getCmd().getResultCode()); backMessage.setBackMessage(response.getCmd().getResultString()); System.out.println("完成RPC呼叫,接收到響應,已進行設定") } catch (Exception e) { latch.countDown(); } latch.countDown(); } }; //執行RPC呼叫 stub.dealStr(controller, request.build(), done); //主執行緒阻塞等待響應處理完成 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } else { backMessage.setBackCode(-1); } return backMessage; } }
在伺服器A中,我們需要準備一些物件,他們包括接收響應物件backMessage、RPC通道channel、PRC控制器controller、對方服務存根stub、RPC請求request。通過呼叫存根裡的函式介面就和呼叫本地的函式一樣。
我們對於響應的處理採用了比較精妙的方式:開啟一個子執行緒,定義回撥函式run(),並且用CountDownLatch去阻塞主執行緒,讓其等待子執行緒中回撥函式完成處理再繼續進行。
我們繼續看看伺服器B所提供的RPC的服務——
public class ServiceImpl extends Service{
@HppMethod(commonId = 10001)
public void dealStr(RpcController controller, DealStrRequest request, RpcCallback<DealStrResponse> done) {
//對請求中的字串進行大寫轉換並存入資料庫
request.getStr.toUpperCase().save();
int ret=ErrorCode.CMS_SUCCESSED;
DealStrResponse.Builder response = DealStrResponse.newBuilder().setCmd(CmdUtil.newResponseCmd(request.getCmd(), ResultUtil.getResultCode(ret)));
//回撥函式!!
done.run(response.build());
}
//提供的其他RPC服務/函式
@HppMethod(commonId = 10002)
public void method1(RpcController controller, method1tRequest request, RpcCallback<method1tResponse> done) {
……
}
@HppMethod(commonId = 10003)
public void method2(RpcController controller, method2tRequest request, RpcCallback<method1tResponse> done) {
……
}
……
}
在其處理完字串後,回調了伺服器A的函式,即done.run(response.build()),利用回撥函式,把響應作為引數從伺服器B傳給伺服器A中,並在伺服器A的回撥函式中對響應進行處理(設定響應碼、響應訊息、列印日誌等操作)。
整個過程的資料流向就非常明晰了:伺服器A的字串通過請求發給伺服器B(走RPC呼叫),伺服器B處理完請求,生成響應物件,利用回撥函式將其交給伺服器A處理。怎麼樣,是不是似曾相識,好像在哪兒聽過?Bingo!沒錯!Ajax裡面也用到了回撥函式!頁面前端傳送請求(例如Post)給後端處理,後端生成響應回傳給頁面,Ajax非同步處理響應生成頁面內容。
function testAjax(){
$.ajax({
dataType:"text",
type:"POST",
cache:false,
url: path+"/testAjax.action",
data:{
"textDetail" : str
},
success:function(response){
if(response != null){
consolo.log(response);
}
},
error:function(response){
tLayer.warning("Error!");
}
});
}
回到正題,我們再看看伺服器A中的回撥函式裡面用到的CountDownLatch,這也是一個非常精妙的設計,它的作用是通過倒數來控制執行緒的阻塞。我們設定倒數計數為1,當回撥函式所線上程完成了響應處理,則將其減1,而主執行緒則在latch.await()的地方等待CountDownLatch倒數變為0才繼續進行。可見,CountDownLatch適用於主執行緒中等待多個執行緒均執行完成後才繼續進行的場景。
筆者目前接觸的專案自己搭建的RPC架構,實際開發中,大家也可以選擇Thrift、Dubbo等第三方的框架進行使用。目前很火的微服務架構,就用到了RPC的相關思想,這一部分的知識還是值得掌握的。最後,給大家推薦兩篇文章:
今天,你學會了嗎?