3. 原始碼分析---SOFARPC客戶端服務呼叫
阿新 • • 發佈:2019-12-31
SOFARPC原始碼解析系列:
6.原始碼分析---和dubbo相比SOFARPC是如何實現負載均衡的?
7.原始碼分析---SOFARPC是如何實現連線管理與心跳?
8.原始碼分析---從設計模式中看SOFARPC中的EventBus?
我們首先看看BoltClientProxyInvoker的關係圖
所以當我們用BoltClientProxyInvoker#invoke的時候實際上是呼叫了父類的invoke方法 ClientProxyInvoker#invoke
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
SofaResponse response = null;
Throwable throwable = null;
try {
RpcInternalContext.pushContext();
RpcInternalContext context = RpcInternalContext.getContext();
context.setProviderSide(false );
// 包裝request請求
decorateRequest(request);
try {
// 產生開始呼叫事件
if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
EventBus.post(new ClientStartInvokeEvent(request));
}
// 得到結果
response = cluster.invoke(request);
} catch (SofaRpcException e) {
throwable = e;
throw e;
} finally {
// 產生呼叫結束事件
if (!request.isAsync()) {
if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
EventBus.post(new ClientEndInvokeEvent(request,response,throwable));
}
}
}
// 包裝響應
decorateResponse(response);
return response;
} finally {
RpcInternalContext.removeContext();
RpcInternalContext.popContext();
}
}
複製程式碼
這個方法主要做了幾件事:
- 包裝request請求,設定必要的引數
- 呼叫FailOverCluster的invoke方法,將reques請求傳送出去,並得到response相應
- 包裝response響應
我們在呼叫FailOverCluster的時候實際上是呼叫的父類AbstractCluster的invoker方法,FailOverCluster關係圖如下:
所以我們進入到AbstractCluster的invoker方法中:
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
SofaResponse response = null;
try {
// 做一些初始化檢查,例如未連線可以連線
checkClusterState();
// 開始呼叫
countOfInvoke.incrementAndGet(); // 計數+1
response = doInvoke(request);
return response;
} catch (SofaRpcException e) {
// 客戶端收到異常(客戶端自己的異常)
throw e;
} finally {
countOfInvoke.decrementAndGet(); // 計數-1
}
}
複製程式碼
checkClusterState方法主要是用來校驗是否已銷燬了,或是呼叫了init方法進行初始化了。 然後會在呼叫之前記一下數。 然後我們進入到doInvoke方法中:
public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
String methodName = request.getMethodName();
int retries = consumerConfig.getMethodRetries(methodName);
int time = 0;
SofaRpcException throwable = null;// 異常日誌
List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
do {
//負載均衡
ProviderInfo providerInfo = select(request,invokedProviderInfos);
try {
//呼叫過濾器鏈
SofaResponse response = filterChain(providerInfo,request);
if (response != null) {
if (throwable != null) {
if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
LOGGER.warnWithApp(consumerConfig.getAppName(),LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,throwable.getClass() + ":" + throwable.getMessage(),invokedProviderInfos));
}
}
return response;
} else {
throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,"Failed to call " + request.getInterfaceName() + "." + methodName
+ " on remote server " + providerInfo + ",return null");
time++;
}
} catch (SofaRpcException e) { // 服務端異常+ 超時異常 才發起rpc異常重試
if (e.getErrorType() == RpcErrorType.SERVER_BUSY
|| e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
throwable = e;
time++;
} else {
throw e;
}
} catch (Exception e) { // 其它異常不重試
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,"Failed to call " + request.getInterfaceName() + "." + request.getMethodName()
+ " on remote server: " + providerInfo + ",cause by unknown exception: "
+ e.getClass().getName() + ",message is: " + e.getMessage(),e);
} finally {
if (RpcInternalContext.isAttachmentEnable()) {
RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,time + 1); // 重試次數
}
}
invokedProviderInfos.add(providerInfo);
} while (time <= retries);
throw throwable;
}
複製程式碼
這個方法裡面主要做了這這件事:
- 如果失敗的話就迴圈呼叫
- 負載均衡,選取provider
- 通過過濾器鏈呼叫服務端,並返回結果
- 異常處理
接著我們進入到filterChain方法中,根據過濾器鏈最後會跳到ConsumerInvoker中的invoke方法
@Override
public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
// 設定下伺服器應用
ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
if (StringUtils.isNotEmpty(appName)) {
sofaRequest.setTargetAppName(appName);
}
// 目前只是通過client傳送給服務端
return consumerBootstrap.getCluster().sendMsg(providerInfo,sofaRequest);
}
複製程式碼
consumerBootstrap.getCluster()會返回FailOverCluster例項,然後呼叫父類AbstractCluster的sendMsg方法
public SofaResponse sendMsg(ProviderInfo providerInfo,SofaRequest request) throws SofaRpcException {
ClientTransport clientTransport = connectionHolder.getAvailableClientTransport(providerInfo);
if (clientTransport != null && clientTransport.isAvailable()) {
return doSendMsg(providerInfo,clientTransport,request);
} else {
throw unavailableProviderException(request.getTargetServiceUniqueName(),providerInfo.getOriginUrl());
}
}
protected SofaResponse doSendMsg(ProviderInfo providerInfo,ClientTransport transport,SofaRequest request) throws SofaRpcException {
RpcInternalContext context = RpcInternalContext.getContext();
// 新增呼叫的服務端遠端地址
RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(),providerInfo.getPort());
try {
checkProviderVersion(providerInfo,request); // 根據服務端版本特殊處理
String invokeType = request.getInvokeType();
int timeout = resolveTimeout(request,consumerConfig,providerInfo);
SofaResponse response = null;
// 同步呼叫
if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
long start = RpcRuntimeContext.now();
try {
response = transport.syncSend(request,timeout);
} finally {
if (RpcInternalContext.isAttachmentEnable()) {
long elapsed = RpcRuntimeContext.now() - start;
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE,elapsed);
}
}
}
// 單向呼叫
else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
long start = RpcRuntimeContext.now();
try {
transport.oneWaySend(request,timeout);
response = buildEmptyResponse(request);
} finally {
if (RpcInternalContext.isAttachmentEnable()) {
long elapsed = RpcRuntimeContext.now() - start;
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE,elapsed);
}
}
}
// Callback呼叫
else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
// 呼叫級別回撥監聽器
SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
if (sofaResponseCallback == null) {
SofaResponseCallback methodResponseCallback = consumerConfig
.getMethodOnreturn(request.getMethodName());
if (methodResponseCallback != null) { // 方法的Callback
request.setSofaResponseCallback(methodResponseCallback);
}
}
// 記錄傳送開始時間
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME,RpcRuntimeContext.now());
// 開始呼叫
transport.asyncSend(request,timeout);
response = buildEmptyResponse(request);
}
// Future呼叫
else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
// 記錄傳送開始時間
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME,RpcRuntimeContext.now());
// 開始呼叫
ResponseFuture future = transport.asyncSend(request,timeout);
// 放入執行緒上下文
RpcInternalContext.getContext().setFuture(future);
response = buildEmptyResponse(request);
} else {
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,"Unknown invoke type:" + invokeType);
}
return response;
} catch (SofaRpcException e) {
throw e;
} catch (Throwable e) { // 客戶端其它異常
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,e);
}
}
複製程式碼
sendMsg方法最後會呼叫到doSendMsg。 soSendMsg裡面主要做了如下幾件事:
- 如果是同步呼叫,則直接返回封裝好的引數
- 如果是單向呼叫,則呼叫buildEmptyResponse方法,返回一個空的response
- 如果是callback呼叫asyncSend,RPC在獲取到服務端的結果後會自動執行該回撥實現。
- 服務端返回響應結果被 RPC 快取,當客戶端需要響應結果的時候需要主動獲取結果,獲取結果的過程阻塞執行緒。