1. 程式人生 > >搞懂Dubbo服務釋出與服務註冊

搞懂Dubbo服務釋出與服務註冊

一.前言

  本文講服務釋出與服務註冊,服務提供者本地釋出服務,然後向註冊中心註冊服務,將服務實現類以服務介面的形式提供出去,以便服務消費者從註冊中心查閱並呼叫服務。

  本文原始碼分析基於org.apache.dubbo:dubbo:2.7.2,服務端程式碼例子是上文的例子

  如果沒有Dubbo SPI的基礎知識,建議先看Dubbo SPI,否則原始碼怎麼跳轉的將毫無頭緒

  Dubbo SPI:https://www.cnblogs.com/GrimMjx/p/10970643.html

 

二.服務釋出

呼叫順序

  首先講一下大致的服務釋出的呼叫順序圖,藍色方法不分析,主要是起一個netty服務

-org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent
  -org.apache.dubbo.config.ServiceConfig#export
    -org.apache.dubbo.config.ServiceConfig#doExport
      -org.apache.dubbo.config.ServiceConfig#doExportUrls
        -org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol

          -org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
            -org.apache.dubbo.registry.integration.RegistryProtocol#export
              -org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
                -org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
                  -org.apache.dubbo.remoting.exchange.Exchangers#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler)
                    -org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
                      -org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
                        -org.apache.dubbo.remoting.transport.netty.NettyTransporter#bind

                          ...

原始碼分析

1.org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent,這個方法就不多說了,註釋寫的很詳細了

 1 /**
 2  * ServiceBean實現了ApplicationListener介面
 3  * 在IOC的容器的啟動過程,當所有的bean都已經處理完成之後,spring ioc容器會發布ContextRefreshedEvent事件。
 4  * 此處就是接收到事件處理的邏輯,開始服務釋出之旅
 5  *
 6  * @param event
 7  */
 8 @Override
 9 public void onApplicationEvent(ContextRefreshedEvent event) {
10     // 是否已釋出  &&  是否已經被取消釋出
11     if (!isExported() && !isUnexported()) {
12         if (logger.isInfoEnabled()) {
13             logger.info("The service ready on spring started. service: " + getInterface());
14         }
15         // 釋出
16         export();
17     }
18 }

2.org.apache.dubbo.config.ServiceConfig#export,這裡的checkAndUpdateSubConfigs主要做的就是檢測標籤合法,檢測各種物件是否為空,為空則建立。之後判斷是否可以釋出和是否需要延遲釋出,需要則延遲再doExport

 1 public synchronized void export() {
 2     // 檢測<dubbo:service>的interface是否合法
 3     // 檢查provider為空
 4     // 檢查各種物件是否為空,為空則建立
 5     checkAndUpdateSubConfigs();
 6     if (!shouldExport()) {
 7         return;
 8     }
 9     // 是否延遲?
10     if (shouldDelay()) {
11         // 延遲
12         DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
13     } else {
14         doExport();
15     }
16 }

3.org.apache.dubbo.config.ServiceConfig#doExport,註釋寫的很清楚,還是沒有走到核心邏輯

 1 protected synchronized void doExport() {
 2     // 是否已經被取消釋出
 3     if (unexported) {
 4         throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
 5     }
 6     // 是否已經發布,注意這個變數volatile修飾
 7     if (exported) {
 8         return;
 9     }
10     exported = true;
11     if (StringUtils.isEmpty(path)) {
12         path = interfaceName;
13     }
14     doExportUrls();
15 }

4.org.apache.dubbo.config.ServiceConfig#doExportUrls,先載入要註冊的url,然後遍歷所有協議,釋出服務並註冊

 1 @SuppressWarnings({"unchecked", "rawtypes"})
 2 private void doExportUrls() {
 3     // 載入要註冊的url
 4     // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&pid=10516&registry=zookeeper&timestamp=1559889491339
 5     List<URL> registryURLs = loadRegistries(true);
 6     // for迴圈每個協議,釋出服務並註冊到註冊中心
 7     for (ProtocolConfig protocolConfig : protocols) {
 8         String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
 9         ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
10         ApplicationModel.initProviderModel(pathKey, providerModel);
11         doExportUrlsFor1Protocol(protocolConfig, registryURLs);
12     }
13 }

5.org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol,看方法名字也知道,單一協議多匯出服務。這個方法做的事情比較多,我這邊拆成4個部分,第一部分、第二部分和第三部分都是在做填充map的事情,第三部分最後生成匯出url。第四部分開始釋出服務,裡面會判斷到底是釋出服務並且註冊到註冊中心呢還是僅釋出服務。

  1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
  2     /** 第一部分開始 **/
  3     String name = protocolConfig.getName();
  4     // 為空或者為空字串,預設dubbo協議
  5     if (StringUtils.isEmpty(name)) {
  6         name = DUBBO;
  7     }
  8 
  9     Map<String, String> map = new HashMap<String, String>();
 10     map.put(SIDE_KEY, PROVIDER_SIDE);
 11 
 12     appendRuntimeParameters(map);
 13     appendParameters(map, metrics);
 14     appendParameters(map, application);
 15     appendParameters(map, module);
 16     // remove 'default.' prefix for configs from ProviderConfig
 17     // appendParameters(map, provider, Constants.DEFAULT_KEY);
 18     appendParameters(map, provider);
 19     appendParameters(map, protocolConfig);
 20     appendParameters(map, this);
 21     // 上面的程式碼就是將版本,方法,各種配置放到map裡去
 22     // 這裡給出debug的時候的map物件
 23     // map.toString() = {side=provider, application=dubbo-server, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368}
 24     /** 第一部分結束 **/
 25 
 26     /** 第二部分開始 **/
 27     // 這段if裡做的事情主要是檢測<dubbo:method> 標籤中的配置資訊,填充map
 28     if (CollectionUtils.isNotEmpty(methods)) {
 29         for (MethodConfig method : methods) {
 30             // 新增MethodConfig到map中,key=方法名.屬性 value=屬性值
 31             // ex   sayHello.retries:2
 32             appendParameters(map, method, method.getName());
 33             String retryKey = method.getName() + ".retry";
 34             if (map.containsKey(retryKey)) {
 35                 String retryValue = map.remove(retryKey);
 36                 if ("false".equals(retryValue)) {
 37                     map.put(method.getName() + ".retries", "0");
 38                 }
 39             }
 40             
 41             // 獲取ArgumentConfig列表
 42             List<ArgumentConfig> arguments = method.getArguments();
 43             if (CollectionUtils.isNotEmpty(arguments)) {
 44                 for (ArgumentConfig argument : arguments) {
 45                     // convert argument type
 46                     if (argument.getType() != null && argument.getType().length() > 0) {
 47                         Method[] methods = interfaceClass.getMethods();
 48                         // visit all methods
 49                         if (methods != null && methods.length > 0) {
 50                             for (int i = 0; i < methods.length; i++) {
 51                                 String methodName = methods[i].getName();
 52                                 // target the method, and get its signature
 53                                 if (methodName.equals(method.getName())) {
 54                                     Class<?>[] argtypes = methods[i].getParameterTypes();
 55                                     // one callback in the method
 56                                     if (argument.getIndex() != -1) {
 57                                         if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
 58                                             // 新增ArgumentConfig資訊到map中
 59                                             appendParameters(map, argument, method.getName() + "." + argument.getIndex());
 60                                         } else {
 61                                             throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
 62                                         }
 63                                     } else {
 64                                         // multiple callbacks in the method
 65                                         for (int j = 0; j < argtypes.length; j++) {
 66                                             Class<?> argclazz = argtypes[j];
 67                                             if (argclazz.getName().equals(argument.getType())) {
 68                                                 appendParameters(map, argument, method.getName() + "." + j);
 69                                                 if (argument.getIndex() != -1 && argument.getIndex() != j) {
 70                                                     throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
 71                                                 }
 72                                             }
 73                                         }
 74                                     }
 75                                 }
 76                             }
 77                         }
 78                     } else if (argument.getIndex() != -1) {
 79                         appendParameters(map, argument, method.getName() + "." + argument.getIndex());
 80                     } else {
 81                         throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
 82                     }
 83 
 84                 }
 85             }
 86         } // end of methods for
 87     }
 88     /** 第二部分結束 **/
 89 
 90     /** 第三部分開始 **/
 91     if (ProtocolUtils.isGeneric(generic)) {
 92         map.put(GENERIC_KEY, generic);
 93         map.put(METHODS_KEY, ANY_VALUE);
 94     } else {
 95         String revision = Version.getVersion(interfaceClass, version);
 96         if (revision != null && revision.length() > 0) {
 97             map.put(REVISION_KEY, revision);
 98         }
 99 
100         // 生成包裝類
101         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
102         // 新增方法到map中
103         if (methods.length == 0) {
104             logger.warn("No method found in service interface " + interfaceClass.getName());
105             map.put(METHODS_KEY, ANY_VALUE);
106         } else {
107             map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
108         }
109     }
110 
111     // 新增token
112     if (!ConfigUtils.isEmpty(token)) {
113         if (ConfigUtils.isDefault(token)) {
114             map.put(TOKEN_KEY, UUID.randomUUID().toString());
115         } else {
116             map.put(TOKEN_KEY, token);
117         }
118     }
119 
120     // export service
121     // 此處map.toString() = {side=provider, application=dubbo-server, methods=hello, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368}
122     String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
123     Integer port = this.findConfigedPorts(protocolConfig, name, map);
124     // 此處url=dubbo://192.168.5.16:20880/com.grimmjx.edu.HelloService?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=com.grimmjx.edu.HelloService&methods=hello&pid=11917&side=provider&timeout=100&timestamp=1559973693109
125     URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
126     if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
127         url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
128                 .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
129     }
130     /** 第三部分結束 **/
131 
132     /** 第四部分開始 **/
133     // 開始釋出服務
134     String scope = url.getParameter(SCOPE_KEY);
135     // don't export when none is configured
136     // //配置為none不暴露
137     if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
138 
139         // export to local if the config is not remote (export to remote only when config is remote)
140         // 配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠端服務)
141         if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
142             exportLocal(url);
143         }
144         // export to remote if the config is not local (export to local only when config is local)
145         // 如果配置不是local則暴露為遠端服務.(配置為local,則表示只暴露遠端服務)
146         if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
147             if (!isOnlyInJvm() && logger.isInfoEnabled()) {
148                 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
149             }
150             if (CollectionUtils.isNotEmpty(registryURLs)) {
151                 // 釋出服務
152                 for (URL registryURL : registryURLs) {
153                     //if protocol is only injvm ,not register
154                     if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
155                         continue;
156                     }
157                     url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
158                     URL monitorUrl = loadMonitor(registryURL);
159                     if (monitorUrl != null) {
160                         url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
161                     }
162                     if (logger.isInfoEnabled()) {
163                         logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
164                     }
165 
166                     // For providers, this is used to enable custom proxy to generate invoker
167                     String proxy = url.getParameter(PROXY_KEY);
168                     if (StringUtils.isNotEmpty(proxy)) {
169                         registryURL = registryURL.addParameter(PROXY_KEY, proxy);
170                     }
171 
172                     // 生成Invoker
173                     // Invoker是十分重要的物件,可向它發起invoke呼叫
174                     Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
175                     // 持有this和invoker
176                     // 此處的invoker.getUrl()=registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D10738%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559895851366&pid=10738&registry=zookeeper&timestamp=1559895851283
177                     DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
178 
179                     // 釋出,並生成Exporter
180                     Exporter<?> exporter = protocol.export(wrapperInvoker);
181                     exporters.add(exporter);
182                 }
183 
184                 // 沒有註冊中心,僅釋出服務
185             } else {
186                 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
187                 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
188 
189                 Exporter<?> exporter = protocol.export(wrapperInvoker);
190                 exporters.add(exporter);
191             }
192             /**
193              * @since 2.7.0
194              * ServiceData Store
195              */
196             MetadataReportService metadataReportService = null;
197             if ((metadataReportService = getMetadataReportService()) != null) {
198                 metadataReportService.publishProvider(url);
199             }
200         }
201     }
202     this.urls.add(url);
203     /** 第四部分結束 **/
204 }

6.org.apache.dubbo.registry.integration.RegistryProtocol#export,為什麼是RegistryProtocol?註釋裡寫的很清楚了,這裡的url是registry://開頭的。

 1 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2     // 獲取註冊地址
 3     URL registryUrl = getRegistryUrl(originInvoker);
 4     // url to export locally
 5     // 獲取provider url
 6     URL providerUrl = getProviderUrl(originInvoker);
 7     // Subscribe the override data
 8     // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
 9     //  the same service. Because the subscribed is cached key with the name of the service, it causes the
10     //  subscription information to cover.
11     final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
12     final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
13     overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
14     // 要註冊的url
15     providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
16     //export invoker
17     // 匯出服務
18     final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
19     // url to registry
    .... 38 }

  之後上面呼叫順序圖的藍色部分的程式碼不做分析,主要是建立一個NettyServer(預設)。可自行研究

 

三.服務註冊

呼叫順序

  首先講一下大致的服務註冊的呼叫順序圖,我們只分析紅色部分。

-org.apache.dubbo.registry.integration.RegistryProtocol#register

  -org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry

    -org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry

      -org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry

  -org.apache.dubbo.registry.support.FailbackRegistry#register

    -org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister

原始碼分析

1.org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry,具體還要看下一步

 1 @Override
 2 public Registry getRegistry(URL url) {
 3     url = URLBuilder.from(url)
 4             .setPath(RegistryService.class.getName())
 5             .addParameter(INTERFACE_KEY, RegistryService.class.getName())
 6             .removeParameters(EXPORT_KEY, REFER_KEY)
 7             .build();
 8     String key = url.toServiceStringWithoutResolving();
 9     // Lock the registry access process to ensure a single instance of the registry
10     LOCK.lock();
11     try {
12         // 快取中獲取
13         Registry registry = REGISTRIES.get(key);
14         if (registry != null) {
15             return registry;
16         }
17         
18         //create registry by spi/ioc
19         // 用Dubbo SPI建立Registry
20         registry = createRegistry(url);
21         if (registry == null) {
22             throw new IllegalStateException("Can not create registry " + url);
23         }
24         // 寫入快取
25         REGISTRIES.put(key, registry);
26         return registry;
27     } finally {
28         // Release the lock
29         LOCK.unlock();
30     }
31 }

2.org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry,建立一個ZooKeeperRegistry例項並返回

1 @Override
2 public Registry createRegistry(URL url) {
3     return new ZookeeperRegistry(url, zookeeperTransporter);
4 }

3.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry,主要做的就是利用zk建立Zk客戶端

 1 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
 2     super(url);
 3     if (url.isAnyHost()) {
 4         throw new IllegalStateException("registry address == null");
 5     }
 6     String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
 7     if (!group.startsWith(PATH_SEPARATOR)) {
 8         group = PATH_SEPARATOR + group;
 9     }
10     this.root = group;
11     zkClient = zookeeperTransporter.connect(url);
12     zkClient.addStateListener(state -> {
13         if (state == StateListener.RECONNECTED) {
14             try {
15                 recover();
16             } catch (Exception e) {
17                 logger.error(e.getMessage(), e);
18             }
19         }
20     });
21 }

4.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister,連線好zk後,是不是就要建立服務提供者的節點了?所以這一步就是註冊服務

1 @Override
2 public void doRegister(URL url) {
3     try {
4         // toUrlPath(url) = /dubbo/com.grimmjx.edu.HelloService/providers/dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D11917%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559973693109
5         zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
6     } catch (Throwable e) {
7         throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
8     }
9 }

這一步之後,我們用zkCli來連線zk看一下節點資料

&n