dubbo原始碼分析-consumer端3-Invoker建立流程
從前面一篇建立註冊中心的流程當中,我們知道在從註冊中心獲取到provider的連線資訊後,會通過連線建立Invoker。程式碼見com.alibaba.dubbo.registry.integration.RegistryDirectory的toInvokers方法:
此處url的protocol為dubbo,因此protocol.refer最終會呼叫com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.refer,同時Protocol存在兩個wrapper類,分別為:// protocol實現為com.alibaba.dubbo.rpc.Protocol$Adpative, // 之前已經講過,這是dubbo在執行時動態建立的一個類; // serviceType為服務類的class, 如demo中的com.alibaba.dubbo.demo.DemoService; // providerUrl為服務提供方註冊的連線; // url為providerUrl與消費方引數的合併 invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper、
com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper。在dubbo中存在wrapper類的類會被wrapper例項包裝後返回,因此在protocol.refer方法呼叫的時候,會先經過wrapper類。由於這裡的複雜性,我們先不講wrapper類裡的refer實現,直接跳到DubboProtocol.refer。
url的demo如下:
dubbo://30.33.47.127:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&....
DubboProtocol的refer程式碼如下:
可以看到client建立由com.alibaba.dubbo.remoting.exchange.Exchanges處理,其程式碼如下:public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // 建立一個DubboInvoker DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); // 將invoker加入到invokers這個Set中 invokers.add(invoker); return invoker; } // 建立連線Client,該Client主要負責建立連線,傳送資料等 private ExchangeClient[] getClients(URL url){ //是否共享連線 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 如果connections不配置,則共享連線,否則每服務每連線, // 共享連線的意思是對於同一個ip+port的所有服務只建立一個連線, // 如果是非共享連線則每個服務+(ip+port)建立一個連線 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; } /** *獲取共享連線 */ private ExchangeClient getSharedClient(URL url){ // 以address(ip:port)為key進行快取 String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if ( client != null ){ // 如果連線存在了則引用數加1,引用數表示有多少個服務使用了此client, // 當某個client呼叫close()時,引用數減一, // 如果引用數大於0,表示還有服務在使用此連線, 不會真正關閉client // 如果引用數為0,表示沒有服務在用此連線,此時連線徹底關閉 if ( !client.isClosed()){ client.incrementAndGetCount(); return client; } else { logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client)); referenceClientMap.remove(key); } } // 呼叫initClient來初始化Client ExchangeClient exchagneclient = initClient(url); // 使用ReferenceCountExchangeClient進行包裝 client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); return client; } /** * 建立新連線. */ private ExchangeClient initClient(URL url) { // 獲取client引數的值,為空則獲取server引數的值,預設為netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); // 如果是1.0.x版本,需要相容 boolean compatible = (version != null && version.startsWith("1.0.")); // 加入codec引數,預設為dubbo,即DubboCodec url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //預設開啟心跳,預設每60s傳送一次心跳包 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO存在嚴重效能問題,暫時不允許使用 if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client ; try { //設定連線應該是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){ client = new LazyConnectExchangeClient(url ,requestHandler); } else { client = Exchangers.connect(url ,requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 預設通過HeaderExchanger.connect建立
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
// 預設type為header,因此預設的Exchanger為com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
HeaderExchanger的connect程式碼如下:public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
這裡簡單介紹下這些類的作用:
HeaderExchangeHandler: ExchangeHandler的代理,HeaderExchangeHandler將資料封裝後呼叫ExchangeHandler的連線/斷開/傳送請求/接收返回資料/捕獲異常等方法;
DecodeHandler: 也是一個代理,在HeaderExchangeHandler的功能之上加入瞭解碼功能;
Transporters.connect預設得到的是NettyTransporter:建立NettyClient, 該client是真正的發起通訊的類;
NettyClient在初始化的時候會做一些比較重要的事情,我們先看下:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){
// 設定threadName, 設定預設的threadpool型別,
//
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
// 對handler再次進行包裝
return ChannelHandlers.wrap(handler, url);
}
我們知道前面得到的包裝物件DecodeHandler,而ChannelHandlers.wrap對該Handler再次進行包裝:
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
這些包裝類在之前handler的基礎上加入的功能:
dispatch生成的物件AllChannelHandler:加入執行緒池,所有方法都非同步的呼叫;
HeartbeatHeandler: 心跳包的傳送和接收到心跳包後的處理;
MultiMessageHandler:如果接收到的訊息為MultiMessage,則將其拆分為單個Message給後面的Handler處理;
再看看NettyClient在構造方法中還做了哪些操作:
// 呼叫了父類com.alibaba.dubbo.remoting.transport.AbstractClient的構造方法
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
...省略部分程式碼...
try {
//
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
// 如果check為false,則連線失敗時Invoker依然可以建立
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t){
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
...省略部分程式碼...
}
可以看到在構造方法處已經開始建立連線,netty如何建立連線此處不再詳細介紹,可以看看之前的netty介紹。需要注意的時連線失敗的時候,如果check引數為false則Invoker依然可以建立,否則在初始化階段會報異常。
回過頭來看看HeaderExchangeClient,改類建立了一個傳送心跳包的定時任務:
public HeaderExchangeClient(Client client){
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
// 預設為60秒發一次心跳包,如果連續3個心跳包無響應則表示連線斷開
this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 );
this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 );
if ( heartbeatTimeout < heartbeat * 2 ) {
throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" );
}
startHeatbeatTimer();
}
private void startHeatbeatTimer() {
stopHeartbeatTimer();
if ( heartbeat > 0 ) {
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList( HeaderExchangeClient.this );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS );
}
}
我們知道,在socket通訊時,資料傳送方和接收方必須建立連線,而建立的連線是否可用,為了探測連線是否可用,可以通過傳送簡單的通訊包並看是否收到回包的方式,這就是心跳。如果沒有心跳包,則很有可能連線的一方已經斷開或者中間線路故障,雙方都不知道這種情況。 因此心跳包很有必要引入。心跳包的實現比較簡單,這裡簡單介紹下,不再貼具體程式碼:通過攔截(代理)所有的傳送/接收資料的方法,記錄下最後一次read(接收資料)、write(傳送資料)的時間,如果都大於心跳的時間閾值(如上面的60s)則傳送一條資料給對方,該資料的格式不重要,只要有心跳的標識(即對方可以解析出這是一個心跳包)即可,對方接收到資料以後也會返回一個應答的包,如果傳送方接收到回包,則最後一次read時間將會被充值為當前時間,表示連線未斷開。如果傳送方一直未收到回包,則指定時間(如上面的60s)後再次傳送心跳包。如果多次(如上面的3次)傳送均未收到回包(心跳超時),則判斷連線已經斷開。此時根據應用的需求斷開連線或者重新連線。在dubbo中,如果心跳超時則進行重連。
除了心跳以外,我們可以看到HeaderExchangeChannel對client再次進行了封裝,它的作用是將要傳送的實際資料封裝成com.alibaba.dubbo.remoting.exchange.Request物件。
最終獲得的HeaderExchangeChannel被封裝到HeaderExchangeClient中,傳入到DubboInvoker,最終DubboProtocol.refer返回了DubboInvoker。但流程還未結束,還記得我們一開頭提起的wrapper類吧。下面來看看這兩個類還做了哪些操作。
DubboProtocol.refer執行後,進入到ProtocolFilterWrapper,其refer程式碼如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// protocol為dubbo時執行到這裡
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
// 初始的last為剛剛建立的DubboInvoker
Invoker<T> last = invoker;
// 載入group為consumer的Filter,載入到的Filter依次為:
// com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
// com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
// com.alibaba.dubbo.monitor.support.MonitorFilter
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
// filter從最後一個開始依次封裝,最終形成一個鏈,呼叫順序為filters的順序
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
再看看ProtocolListenerWrapper:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
// ListenerInvokerWrapper構造方法
public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners){
if (invoker == null) {
throw new IllegalArgumentException("invoker == null");
}
this.invoker = invoker;
this.listeners = listeners;
if (listeners != null && listeners.size() > 0) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
// 直接觸發referred方法
listener.referred(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
listener在consumer初始化和destroy時生效,不影響正常的執行,預設情況下listeners為空。
到這裡InvokerDelegete的生成基本上完成了,結合第一篇consumer的介紹,我們可以得到下圖(後續我們再講講各個類的具體實現):