[dubbo 原始碼之 ]2. 服務消費方如何啟動服務
啟動流程
消費者在啟動之後,會通過ReferenceConfig#get()
來生成遠端呼叫代理類。在get
方法中,會啟動一系列呼叫函式,我們來一個個解析。
配置同樣包含2種:
- XML
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <dubbo:application name="first-consumer-xml"/> <dubbo:registry address="zookeeper://127.0.0.1:2181/"/> <dubbo:reference proxy="javassist" scope="remote" id="demoService" generic="false" check="false" async="false" group="dubbo-sxzhongf-group" version="1.0.0" interface="com.sxzhongf.deep.in.dubbo.api.service.IGreetingService"> <dubbo:method name="sayHello" retries="3" timeout="5000" mock="false" /> <dubbo:method name="testGeneric" retries="3" timeout="5000" mock="false" /> </dubbo:reference> </beans>
- Java API
public class ApiConsumerApplication { public static void main(String[] args) { // 1. 建立服務引用物件例項 ReferenceConfig<IGreetingService> referenceConfig = new ReferenceConfig<IGreetingService>(); // 2. 設定應用程式資訊 referenceConfig.setApplication(new ApplicationConfig("deep-in-dubbo-first-consumer")); // 3. 設定註冊中心 referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181/")); // 4. 設定服務介面和超時時間 referenceConfig.setInterface(IGreetingService.class); // 預設重試3次 referenceConfig.setTimeout(5000); // 5 設定服務分組和版本 referenceConfig.setGroup("dubbo-sxzhongf-group"); referenceConfig.setVersion("1.0.0"); // 6. 引用服務 IGreetingService greetingService = referenceConfig.get(); // 7. 設定隱式引數 RpcContext.getContext().setAttachment("company", "sxzhongf"); // 獲取provider端傳遞的隱式引數(FIXME: 需要後續追蹤) // System.out.println("年齡是:" + RpcContext.getContext().getAttachment("age")); //8. 呼叫服務 System.out.println(greetingService.sayHello("pan")); } }
1. new ReferenceConfig
在此階段,會初始化org.apache.dubbo.config.AbstractConfig
& org.apache.dubbo.config.ReferenceConfig
的靜態變數以及靜態程式碼塊。
2. ReferenceConfig#get
ReferenceConfig#init
- 通過
DubboBootstrap
啟動dubbo。
- 繼而初始化服務的元資料資訊,
URL.buildKey(interfaceName, group, version)
這段用來生成唯一服務的key,所以我們之前說dubbo的唯一標識是通過全路徑
和group以及version來決定的。 - 接下來通過
org.apache.dubbo.config.utils.ConfigValidationUtils#checkMock
來檢查我們mock是否設定正確。 - 設定一系列要用的引數(系統執行引數、是否為consumer、是否為泛型呼叫等等),檢查
dubbo
的註冊地址,預設為當前主機IP
- 通過
ReferenceConfig#createProxy
建立呼叫代理開始
ReferenceConfig#shouldJvmRefer
首先判斷是否為Injvm
呼叫- 如果不為
injvm
,判斷是否為peer to peer
端對端設定,如果為p2p,那麼就直連url - 檢查註冊中心是否存在(註冊中心有可能有多個)
- 迴圈檢查註冊中心是否有效
配置轉換
URL
json registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=deep-in-dubbo-first-consumer&dubbo=2.0.2&pid=9780&refer=application%3Ddeep-in-dubbo-first-consumer%26dubbo%3D2.0.2%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D9780%26register.ip%3D192.168.14.99%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D5000%26timestamp%3D1582959441066%26version%3D1.0.0®istry=zookeeper&release=2.7.5×tamp=1582961922459
如果只有一個註冊中心,執行
REF_PROTOCOL.refer(interfaceClass, urls.get(0));
來將URL
轉為Invoker
物件,因為private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
是擴充套件是Adaptive
,因此在這裡會執行Protocol$Adaptive#refer
方法,又由於protocol
引數值為registry
,因此會只是RegistryProtocol#refer
,又由於被Wrapper
類裝配,因此會先執行三個Wrapper類,最後才能執行到RegistryProtocol#refer -> RegistryProtocol#doRefer
,在doRefer
方法中會訂閱服務提供者地址,最後返回Invoker
物件。!
那麼究竟是如何生成的Invoker
物件呢?我們來看下具體程式碼:
java private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 1.可以查尋RegistryDirectory & StaticDirectory 區別 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); //2. 封裝訂閱所用URL URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } //3.build路由規則鏈 directory.buildRouterChain(subscribeUrl); //4.訂閱服務提供者地址 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //5.封裝叢集策略到虛擬invoker Invoker invoker = cluster.join(directory); return invoker; }
上述程式碼中,步驟1根據URL生成了一個RegistryDirectory
(關於Directory介面的作用,可以自行查詢一些,直白一些就是將一堆Invoker物件封裝成一個ListRegistryDirectory & StaticDirectory
,從命名可看出一個是動態可變,一個不可變),程式碼2 封裝了訂閱所要使用的引數資訊,程式碼3則是封裝繫結路由規則鏈,程式碼4訂閱。程式碼5呼叫 Cluster$Adaptive#join
方法生成Invoker
物件。
在程式碼2種從zk獲取服務提供者資訊:
一旦zk返回服務提供者列表之後,就會呼叫RegistryDirectory#notify
,如下:
在org.apache.dubbo.common.utils.UrlUtils#isMatch
中對provider和consumer的api進行匹配校驗。繼續跟蹤:RegistryDirectory#notify -> RegistryDirectory#refreshOverrideAndInvoker -> RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokers
在toInvokers
正式將URL轉換為Invoker
,通過invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
在這裡protocol#refer
同樣執行順序如:
(dubbo 2.7.5) protocol#refer -> protocol$Adaptive#refer -> QosProtocolWrapper#refer -> ProtocolListenerWrapper#refer -> ProtocolFilterWrapper#refer ->AbstractProtocol#refer->DubboProtocol#protocolBindingRefer
,呼叫程式碼如下:
```java
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
```
關注getClients
,其中執行了DubboProtocol#getSharedClient -> DubboProtocol#initClient
建立netty client進行連線。
因為這裡使用的是明確的DubboInvoker
,在回撥的時候,Wrapper會對該Invoker進行包裝,執行效果如下:
因此,會執行到ProtocolFilterWrapper#buildInvokerChain
,該函式會對服務進行呼叫鏈跟蹤:
```java
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 獲取所有在MATA-INF檔案中配置的啟用的責任連結口
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
} else {// Deprecated!
filter.onResponse(r, invoker, invocation);
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
```
所有的負載均衡、容錯策略等都是在這裡繫結的。
7.如果有多個註冊中心,將會迴圈執行步驟6,將URL轉換為Invoker
物件,然後新增到一個List,分別進行註冊之後,然後判斷最後一個註冊中心url
是否有效,針對多訂閱的場景,URL中新增cluster
引數,預設使用org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster
策略,使用org.apache.dubbo.rpc.cluster.Cluster#join
將多個Invoker
物件封裝一個虛擬的Invoker
中,否則如果最後一個註冊中心是無效的,直接封裝Invoker
物件。
8.建立服務代理ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>)
,因為ProxyFactory
是一個適配類。那麼同樣這裡會呼叫ProxyFactory$Adaptive#getProxy
,這裡最終就是返回一個代理服務的Invoker物件。
至此,我們的消費端的繫結遠端zk的服務就已經結束了。
那麼,我們在呼叫伺服器方法的時候伺服器端和客戶端都是如何處理的呢?下節我們將繼續分析。