5.原始碼分析---SOFARPC呼叫服務
阿新 • • 發佈:2019-08-05
我們這一次來接著上一篇文章《4. 原始碼分析---SOFARPC服務端暴露》講一下服務暴露之後被客戶端呼叫之後服務端是怎麼返回資料的。
示例我們還是和上篇文章一樣使用一樣的bolt協議來講:
public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 設定一個協議,預設bolt .setPort(12200) // 設定一個埠,預設12200 .setDaemon(false); // 非守護執行緒 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定介面 .setRef(new HelloServiceImpl()) // 指定實現 .setServer(serverConfig); // 指定服務端 providerConfig.export(); // 釋出服務 }
在Bolt協議下面,當服務端被呼叫的時候一個服務的流程如下所示:
BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker
BoltServerProcessor#handleRequest
@Override public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) { // RPC內建上下文 RpcInternalContext context = RpcInternalContext.getContext(); context.setProviderSide(true); String appName = request.getTargetAppName(); if (appName == null) { // 預設全域性appName appName = (String) RpcRuntimeContext.get(RpcRuntimeContext.KEY_APPNAME); } // 是否鏈路非同步化中 boolean isAsyncChain = false; try { // 這個 try-finally 為了保證Context一定被清理 processingCount.incrementAndGet(); // 統計值加1 context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 遠端地址 context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 遠端返回的通道 if (RpcInternalContext.isAttachmentEnable()) { InvokeContext boltInvokeCtx = bizCtx.getInvokeContext(); if (boltInvokeCtx != null) { putToContextIfNotNull(boltInvokeCtx, InvokeContext.BOLT_PROCESS_WAIT_TIME, context, RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME); // rpc執行緒池等待時間 Long } } if (EventBus.isEnable(ServerReceiveEvent.class)) { EventBus.post(new ServerReceiveEvent(request)); } // 開始處理 SofaResponse response = null; // 響應,用於返回 Throwable throwable = null; // 異常,用於記錄 ProviderConfig providerConfig = null; String serviceName = request.getTargetServiceUniqueName(); try { // 這個try-catch 保證一定有Response invoke: { if (!boltServer.isStarted()) { // 服務端已關閉 throwable = new SofaRpcException(RpcErrorType.SERVER_CLOSED, LogCodes.getLog( LogCodes.WARN_PROVIDER_STOPPED, SystemInfo.getLocalHost() + ":" + boltServer.serverConfig.getPort())); response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage()); break invoke; } if (bizCtx.isRequestTimeout()) { // 加上丟棄超時的請求的邏輯 throwable = clientTimeoutWhenReceiveRequest(appName, serviceName, bizCtx.getRemoteAddress()); break invoke; } // 查詢服務 //在server.registerProcessor方法中設定 ProviderProxyInvoker Invoker invoker = boltServer.findInvoker(serviceName); if (invoker == null) { throwable = cannotFoundService(appName, serviceName); response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage()); break invoke; } if (invoker instanceof ProviderProxyInvoker) { providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig(); // 找到服務後,列印服務的appName appName = providerConfig != null ? providerConfig.getAppName() : null; } // 查詢方法 String methodName = request.getMethodName(); //在server.registerProcessor方法中設定 Method serviceMethod = ReflectCache.getOverloadMethodCache(serviceName, methodName, request.getMethodArgSigs()); if (serviceMethod == null) { throwable = cannotFoundServiceMethod(appName, methodName, serviceName); response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage()); break invoke; } else { request.setMethod(serviceMethod); } // 真正呼叫 response = doInvoke(serviceName, invoker, request); if (bizCtx.isRequestTimeout()) { // 加上丟棄超時的響應的邏輯 throwable = clientTimeoutWhenSendResponse(appName, serviceName, bizCtx.getRemoteAddress()); break invoke; } } } catch (Exception e) { // 服務端異常,不管是啥異常 LOGGER.errorWithApp(appName, "Server Processor Error!", e); throwable = e; response = MessageBuilder.buildSofaErrorResponse(e.getMessage()); } // Response不為空,代表需要返回給客戶端 if (response != null) { RpcInvokeContext invokeContext = RpcInvokeContext.peekContext(); isAsyncChain = CommonUtils.isTrue(invokeContext != null ? (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null); // 如果是服務端非同步代理模式,特殊處理,因為該模式是在業務程式碼自主非同步返回的 if (!isAsyncChain) { // 其它正常請求 try { // 這個try-catch 保證一定要記錄tracer asyncCtx.sendResponse(response); } finally { if (EventBus.isEnable(ServerSendEvent.class)) { EventBus.post(new ServerSendEvent(request, response, throwable)); } } } } } catch (Throwable e) { // 可能有返回時的異常 if (LOGGER.isErrorEnabled(appName)) { LOGGER.errorWithApp(appName, e.getMessage(), e); } } finally { processingCount.decrementAndGet(); if (!isAsyncChain) { if (EventBus.isEnable(ServerEndHandleEvent.class)) { EventBus.post(new ServerEndHandleEvent()); } } RpcInvokeContext.removeContext(); RpcInternalContext.removeAllContext(); } }
這個方法主要做了如下幾件事:
- 設定上下文引數
- 從快取中得到服務暴露時設定的invoker
- 為request設定method引數
- 呼叫doInvoke返回response
- 將response返回給客戶端
BoltServerProcessor#doInvoke
我們直接進入到doInvoke方法中,看是如何生成response物件的。
private SofaResponse doInvoke(String serviceName, Invoker invoker, SofaRequest request) throws SofaRpcException { // 開始呼叫,先記下當前的ClassLoader ClassLoader rpcCl = Thread.currentThread().getContextClassLoader(); try { // 切換執行緒的ClassLoader到 服務 自己的ClassLoader ClassLoader serviceCl = ReflectCache.getServiceClassLoader(serviceName); Thread.currentThread().setContextClassLoader(serviceCl); return invoker.invoke(request); } finally { Thread.currentThread().setContextClassLoader(rpcCl); } }
這裡主要是為了獲取快取裡面載入被暴露服務的類載入器,這樣可以防止不同的類載入器之間一個類被載入多次。
然後呼叫過濾器鏈,最後進入到ProviderInvoker中
ProviderInvoker#invoke
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
SofaResponse sofaResponse = new SofaResponse();
long startTime = RpcRuntimeContext.now();
try {
// 反射 真正呼叫業務程式碼
Method method = request.getMethod();
if (method == null) {
throw new SofaRpcException(RpcErrorType.SERVER_FILTER, "Need decode method first!");
}
Object result = method.invoke(providerConfig.getRef(), request.getMethodArgs());
sofaResponse.setAppResponse(result);
} catch (IllegalArgumentException e) { // 非法引數,可能是實現類和介面類不對應)
sofaResponse.setErrorMsg(e.getMessage());
} catch (IllegalAccessException e) { // 如果此 Method 物件強制執行 Java 語言訪問控制,並且底層方法是不可訪問的
sofaResponse.setErrorMsg(e.getMessage());
} catch (InvocationTargetException e) { // 業務程式碼丟擲異常
cutCause(e.getCause());
sofaResponse.setAppResponse(e.getCause());
} finally {
if (RpcInternalContext.isAttachmentEnable()) {
long endTime = RpcRuntimeContext.now();
RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE,
endTime - startTime);
}
}
return sofaResponse;
}
到最後我們發現,服務端會通過反射呼叫被暴露服務的方法,封裝成Response類返回。
我們再次回到BoltServerProcessor#handleRequest方法中
....//忽略其他內容
// Response不為空,代表需要返回給客戶端
if (response != null) {
RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
isAsyncChain = CommonUtils.isTrue(invokeContext != null ?
(Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
// 如果是服務端非同步代理模式,特殊處理,因為該模式是在業務程式碼自主非同步返回的
if (!isAsyncChain) {
// 其它正常請求
try { // 這個try-catch 保證一定要記錄tracer
asyncCtx.sendResponse(response);
} finally {
if (EventBus.isEnable(ServerSendEvent.class)) {
EventBus.post(new ServerSendEvent(request, response, throwable));
}
}
}
}
....//忽略其他內容
最後我們的response例項會使用netty傳給客戶端