1. 程式人生 > >RPC中Callback Function與CountDownLatch的用法

RPC中Callback Function與CountDownLatch的用法

目錄

●What & Why

RPC(Remote Procedure Call),通俗地說,就是在一臺計算機上呼叫另一臺計算機提供的服務。這裡的服務對應RPC中的P(Procedure),表現形式通常是API介面,或者說好比一個原生代碼工程中的一個函式。那為什麼要用RPC呢?最主要的原因有兩點:1、符合低耦合、職責分離、可複用的開發原則,將不同的服務(模組、功能……什麼名字都好,理解其本質即可)放在不同的程式碼工程,甚至不同的計算機(伺服器)上,避免所有程式碼雜糅在一個工程中,難以開發與維護;2、緩解負載壓力,不同計算機(伺服器)提供不同的服務,各司其職,不用做所有事,降低資源耗盡的風險。

RPC其實沒有那麼高深,如果不考慮底層實現原理,則對於程式設計師來說幾乎完全透明,就是一套固定步驟的開發流程,和呼叫本地工程程式碼中的函式沒有差別。以筆者目前所做的專案為例,步驟大體為:準備對應的stub(筆者將其理解為對方所能提供函式,在我方的一個說明)、準備遠端呼叫的通道、控制器物件;開啟子執行緒,利用回撥函式Callback Function,等待處理響應,並用執行緒倒數控制器CountDownLatch讓主執行緒阻塞;開啟遠端呼叫。

●回撥函式Callback Function與倒數計數器CountDownLatch

筆者之前看過一些關於回撥函式的文章,概念上已經理解,即A類的函式a1呼叫B類的函式b1,b1中需要呼叫A類的函式a2,這個a2就是回撥函式。但是對於其存在的意義,或者說使用回撥函式的場景到底是什麼,還不太想得到。直到專案中接觸到了RPC,才真的體會到回撥函式的作用。

我們做如下安排,伺服器B是一臺提供許多實用功能/服務的機器,它對外提供的介面包括字串處理(strDeal)等,我們在伺服器A的程式碼中,去呼叫伺服器B所提供的函式。

伺服器A——

public class Client{
	public boolean checkBackMessage(BackMessage backMessage){
		return backMessage.equals("Done") ? true : false;
	}
	
	public BackMessage dealStr(String str) {
		//接收RPC響應訊息的物件
		final BackMessage backMessage = new BackMessage();
		//構造一個RPC通道,具體程式碼未貼出,根據不同框架、專案而異
		RpcChannelImpl channel = RpcChannelImpl.builderChannel();
		if (channel != null) {
			//構造一個RPC控制器
			RpcController controller = channel.newRpcController();
			//對方所能提供的服務/函式都通過stub進行描述
			Service.Stub stub = Service.newStub(channel);
			//構造RPC的請求
			DealStrRequest.Builder request = DealStrRequest.newBuilder();
			//設定請求引數
			request.setCmd(CmdUtil.newRequestCmd(Hpp.CmdId.SERVER_RESTART_REQ));
			request.setStr(str);
			//倒數計數器
			final CountDownLatch latch = new CountDownLatch(1);
			//回撥函式處理響應
			RpcCallback<DealStrResponse> done = new RpcCallback<DealStrResponse>() {	
				@Override
				public void run(DealStrResponse response) {
					try {
						//響應處理
						backMessage.setBackCode(response.getCmd().getResultCode());
						backMessage.setBackMessage(response.getCmd().getResultString());
						System.out.println("完成RPC呼叫,接收到響應,已進行設定")
					} catch (Exception e) {
						latch.countDown();
					}
					latch.countDown();
				}
			};
			//執行RPC呼叫
			stub.dealStr(controller, request.build(), done);
			//主執行緒阻塞等待響應處理完成
			try {
				latch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		} else {
			backMessage.setBackCode(-1);
		}
		return backMessage;
	}
}

在伺服器A中,我們需要準備一些物件,他們包括接收響應物件backMessage、RPC通道channel、PRC控制器controller、對方服務存根stub、RPC請求request。通過呼叫存根裡的函式介面就和呼叫本地的函式一樣。

我們對於響應的處理採用了比較精妙的方式:開啟一個子執行緒,定義回撥函式run(),並且用CountDownLatch去阻塞主執行緒,讓其等待子執行緒中回撥函式完成處理再繼續進行。

我們繼續看看伺服器B所提供的RPC的服務——

public class ServiceImpl extends Service{
	@HppMethod(commonId = 10001)
	public void dealStr(RpcController controller, DealStrRequest request, RpcCallback<DealStrResponse> done) {
		//對請求中的字串進行大寫轉換並存入資料庫
		request.getStr.toUpperCase().save();
		int ret=ErrorCode.CMS_SUCCESSED;
		DealStrResponse.Builder response = DealStrResponse.newBuilder().setCmd(CmdUtil.newResponseCmd(request.getCmd(), ResultUtil.getResultCode(ret)));
		//回撥函式!!
		done.run(response.build());
	}
	
	//提供的其他RPC服務/函式
	@HppMethod(commonId = 10002)
	public void method1(RpcController controller, method1tRequest request, RpcCallback<method1tResponse> done) {
		……
	}
	
	@HppMethod(commonId = 10003)
	public void method2(RpcController controller, method2tRequest request, RpcCallback<method1tResponse> done) {
		……
	}
	
	……
}

在其處理完字串後,回調了伺服器A的函式,即done.run(response.build()),利用回撥函式,把響應作為引數從伺服器B傳給伺服器A中,並在伺服器A的回撥函式中對響應進行處理(設定響應碼、響應訊息、列印日誌等操作)。

整個過程的資料流向就非常明晰了:伺服器A的字串通過請求發給伺服器B(走RPC呼叫),伺服器B處理完請求,生成響應物件,利用回撥函式將其交給伺服器A處理。怎麼樣,是不是似曾相識,好像在哪兒聽過?Bingo!沒錯!Ajax裡面也用到了回撥函式!頁面前端傳送請求(例如Post)給後端處理,後端生成響應回傳給頁面,Ajax非同步處理響應生成頁面內容。

function testAjax(){
	$.ajax({
		dataType:"text",
		type:"POST",
		cache:false,
		url: path+"/testAjax.action",
		data:{
		 	"textDetail" : str
		},
		success:function(response){			
			if(response != null){
				consolo.log(response);
		  	}						
		},
		error:function(response){
			tLayer.warning("Error!");
		}
	 });		
}

回到正題,我們再看看伺服器A中的回撥函式裡面用到的CountDownLatch,這也是一個非常精妙的設計,它的作用是通過倒數來控制執行緒的阻塞。我們設定倒數計數為1,當回撥函式所線上程完成了響應處理,則將其減1,而主執行緒則在latch.await()的地方等待CountDownLatch倒數變為0才繼續進行。可見,CountDownLatch適用於主執行緒中等待多個執行緒均執行完成後才繼續進行的場景。

筆者目前接觸的專案自己搭建的RPC架構,實際開發中,大家也可以選擇Thrift、Dubbo等第三方的框架進行使用。目前很火的微服務架構,就用到了RPC的相關思想,這一部分的知識還是值得掌握的。最後,給大家推薦兩篇文章:

今天,你學會了嗎?