11.原始碼分析---SOFARPC資料透傳是實現的?
先把栗子放上,讓大家方便測試用:
Service端
public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 設定一個協議,預設bolt .setPort(12200) // 設定一個埠,預設12200 .setDaemon(false); // 非守護執行緒 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定介面 .setRef(new HelloServiceImpl()) // 指定實現 .setServer(serverConfig); // 指定服務端 providerConfig.export(); // 釋出服務 } public class HelloServiceImpl implements HelloService { private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class); @Override public String sayHello(String string) { LOGGER.info("Server receive: " + string); // 獲取請求透傳資料並列印 System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag")); // 設定響應透傳資料到當前執行緒的上下文中 RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c"); return "hello " + string + " !"; } }
client端
public static void main(String[] args) { ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定介面 .setProtocol("bolt") // 指定協議 .setDirectUrl("bolt://127.0.0.1:12200") // 指定直連地址 .setConnectTimeout(10 * 1000); RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb"); HelloService helloService = consumerConfig.refer(); while (true) { System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag")); try { LOGGER.info(helloService.sayHello("world")); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
通過上面的栗子我們可以看出整個流程應該是:
- 客戶端把需要透傳的資料放入到requestBaggage中,然後呼叫服務端
- 服務端在HelloServiceImpl中獲取請求透傳資料並列印,並把響應資料放入到responseBaggage中
- 客戶端收到透傳資料
所以下面我們從客戶端開始原始碼講解。
客戶端資料透傳給服務端
首先客戶端在引用之前要設定putRequestBaggage
,然後在客戶端引用的時候會呼叫ClientProxyInvoker#invoke方法。
如下:
ClientProxyInvoker#invoke
public SofaResponse invoke(SofaRequest request) throws SofaRpcException { .... // 包裝請求 decorateRequest(request); .... }
通過呼叫decorateRequest會呼叫到子類DefaultClientProxyInvoker的decorateRequest方法。
DefaultClientProxyInvoker#decorateRequest
protected void decorateRequest(SofaRequest request) {
....
RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
RpcInternalContext internalContext = RpcInternalContext.getContext();
if (invokeCtx != null) {
....
// 如果使用者指定了透傳資料
if (RpcInvokeContext.isBaggageEnable()) {
// 需要透傳
BaggageResolver.carryWithRequest(invokeCtx, request);
internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);
}
}
....
}
在decorateRequest方法裡首先會校驗有沒有開啟透傳資料,如果開啟了,那麼就呼叫BaggageResolver#carryWithRequest,把要透傳的資料放入到request裡面
BaggageResolver#carryWithRequest
public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) {
if (context != null) {
//獲取所有的透傳資料
Map<String, String> requestBaggage = context.getAllRequestBaggage();
if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透傳
request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage);
}
}
}
這個方法裡面要做的就是獲取所有的透傳資料,然後放置到RequestProp裡面,這樣在傳送請求的時候就會傳送到服務端。
服務端接受透傳資料
服務端的呼叫流程如下:
BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker
所以從上面的呼叫鏈可以知道,在服務端引用的時候會經過ProviderBaggageFilter過濾器,我們下面看看這個過濾器做了什麼事情:
ProviderBaggageFilter#invoke
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
SofaResponse response = null;
try {
//從request中獲取透傳資料存入到requestBaggage中
BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true);
response = invoker.invoke(request);
} finally {
if (response != null) {
BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response);
}
}
return response;
}
ProviderBaggageFilter會呼叫BaggageResolver#pickupFromRequest
從request中獲取資料
BaggageResolver#pickupFromRequest
public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) {
if (context == null && !init) {
return;
}
// 解析請求
Map<String, String> requestBaggage = (Map<String, String>) request
.getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE);
if (CommonUtils.isNotEmpty(requestBaggage)) {
if (context == null) {
context = RpcInvokeContext.getContext();
}
context.putAllRequestBaggage(requestBaggage);
}
}
最後會在ProviderBaggageFilter invoke方法的finally裡面呼叫BaggageResolver#carryWithResponse
把響應透傳資料回寫到response裡面。
public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) {
if (context != null) {
Map<String, String> responseBaggage = context.getAllResponseBaggage();
if (CommonUtils.isNotEmpty(responseBaggage)) {
String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
response.addResponseProp(prefix + entry.getKey(), entry.getValue());
}
}
}
}
客戶端收到響應透傳資料
最後客戶端會在ClientProxyInvoker#invoke方法裡呼叫decorateResponse獲取response回寫的資料。
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
....
// 包裝響應
decorateResponse(response);
....
}
decorateResponse是在子類DefaultClientProxyInvoker實現的:
DefaultClientProxyInvoker#decorateResponse
protected void decorateResponse(SofaResponse response) {
....
//如果開啟了透傳
if (RpcInvokeContext.isBaggageEnable()) {
BaggageResolver.pickupFromResponse(invokeCtx, response, true);
}
....
}
這個方法裡面會呼叫BaggageResolver#pickupFromResponse
public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) {
if (context == null && !init) {
return;
}
Map<String, String> responseBaggage = response.getResponseProps();
if (CommonUtils.isNotEmpty(responseBaggage)) {
String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
if (context == null) {
context = RpcInvokeContext.getContext();
}
//因為entry的key裡面會包含rpc_resp_baggage,所以需要擷取掉
context.putResponseBaggage(entry.getKey().substring(prefix.length()),
entry.getValue());
}
}
}
}
這個方法裡面response獲取所有的透傳資料,然後放入到ResponseBaggage中。
到這裡SOFARPC資料透傳就分析完畢