原始碼分析Dubbo非同步呼叫與事件回撥機制
本文將詳細分析Dubbo服務非同步呼叫與事件回撥機制。
1、非同步呼叫與事件回撥機制
1.1 非同步回撥
1.2 事件回撥
2、原始碼分析非同步呼叫與事件回撥機制
在Dubbo中,引入特定的過濾器FutureFilter來處理非同步呼叫相關的邏輯,其定義如下:
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
}
group=CONSUMER說明該過濾器屬於消費端過濾器。
接下來從從invoke方法詳細分析其實現邏輯。
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // @1
fireInvokeCallback(invoker, invocation); // @2
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
Result result = invoker.invoke(invocation); // @3
if (isAsync) {
asyncCallback(invoker, invocation); // @4
} else {
syncCallback(invoker, invocation, result); // @5
}
return result;
}
程式碼@1:首先從URL中獲取是否是非同步呼叫標誌,其配置屬性為< dubbo:service async=”“/>獲取其子標籤< dubbo:method async=”“/>。
程式碼@2:同步呼叫oninvoke事件,執行invoke方法之前的事件。
程式碼@3:繼續沿著呼叫鏈呼叫,最終會到具體的協議Invoker,例如DubboInvoker,發生具體的服務呼叫,跟蹤一下同步、非同步呼叫的實現細節。
程式碼@4:如果呼叫方式是非同步模式,則非同步呼叫onreturn或onthrow事件。
程式碼@5:如果呼叫方式是同步模式,則同步呼叫onreturn或onthrow事件。
2.1 原始碼分析FutureFilter#fireInvokeCallback
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(),
Constants.ON_INVOKE_METHOD_KEY)); // @1
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(),
Constants.ON_INVOKE_INSTANCE_KEY)); // @2
if (onInvokeMethod == null && onInvokeInst == null) { // @3
return;
}
if (onInvokeMethod == null || onInvokeInst == null) { // @4
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" :
"instance") + " found. url:" + invoker.getUrl());
}
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params); // @5
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException()); // @6
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e); // @7
}
}
程式碼@1:StaticContext.getSystemContext()中根據key:serviceKey + “.” + method + “.” + “oninvoke.method” 獲取配置的oninvoke.method方法名。其中serviceKey為[group]/interface:[version],其中group與version可能為空,忽略。
程式碼@2:同樣根據key:serviceKey + “.” + method + “.” + “oninvoke.instance” 從StaticContext.getSystemContext()獲取oninvoke.method方法所在的例項名物件,也就是說該呼叫哪個物件的oninvoke.method指定的方法。這裡就有一個疑問,這些資料是在什麼時候存入StaticContext中的呢?下文會詳細分析。
程式碼@3、@4:主要檢測< dubbo:method oninvoke=”“/>配置的正確性,其正確的配置方式如下:“例項名.方法名”,例如:
程式碼@5:根據發射機制,呼叫oninvoke中指定的例項的指定方法,注意,這裡傳入的引數為呼叫遠端RPC服務的引數。
注意:從這裡可以看出,如果要實現事件通知,也即在呼叫遠端RPC服務之前,之後、丟擲異常時執行回撥函式,該回調事件的方法的引數列表需要與被呼叫服務的引數列表一致。
程式碼@6、@7,如果在執行呼叫前方法(oninvoke)事件方法失敗,則會同步呼叫onthrow中定義的方法(如有定義)。關於dubbo:method oninvoke屬性的解析以及在什麼時候會向StaticContext.getSystemContext()中新增資訊,將在下文統一介紹。
2.2 原始碼分析DubboInvoker關於同步非同步呼叫處理
在上文提到FutureFilter#invoke中的第三步呼叫invoker.invoker方法時,我們應該會有興趣瞭解一下真實的invoker是如何處理同步、非同步請求的。
我們以dubbo協議DubboInvoker來重點分析一下其實現原理:
DubboInvoker#doInvoke
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // @1
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent); // @2
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout); // @3
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null); // @4
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
程式碼@1:首先獲取async屬性,如果為true表示非同步請求,如果配置了return=”false”表示呼叫模式為oneway,只發呼叫,不關注其呼叫結果。
程式碼@2:處理oneway的情況。如果設定了sent=true,表示等待網路資料發出才返回,如果sent=false,只是將待發送資料發到IO寫快取區就返回。
程式碼@3:處理非同步的情況,程式碼@4處理同步呼叫的情況,細看其實都是通過呼叫網路客戶端client的request,最終呼叫HeaderExchangeChannel#request方法:
這裡是通過Future模式來實現非同步呼叫的,同步呼叫也是通過非同步呼叫來實現,只是同步呼叫發起後,直接呼叫future#get的方法來同步等待結果的返回,而非同步呼叫只返回Future Response,在使用者需要關心其結果時才呼叫get方法。
2.3 原始碼分析asyncCallback與syncCallback
前面介紹了方法執行之前oninvoker事件的呼叫分析,接下來分析RPC服務呼叫完成後,onreturn和onthrow方法的呼叫邏輯。
非同步回撥與同步回撥的區別就是呼叫onreturn(fireReturnCallback)和onthrow(fireThrowCallback)呼叫的地方不同,如果是同步呼叫,也就是在完成RPC服務呼叫後,立即呼叫相關的回撥方法,如果是非同步呼叫的話,RPC服務完成後,通過Future模式非同步執行。其實關於onreturn、onthrow屬性的解析,執行與oninvoker屬性的解析完全一樣,再這裡也就不重複介紹了。