搞懂Dubbo服務釋出與服務註冊
一.前言
本文講服務釋出與服務註冊,服務提供者本地釋出服務,然後向註冊中心註冊服務,將服務實現類以服務介面的形式提供出去,以便服務消費者從註冊中心查閱並呼叫服務。
本文原始碼分析基於org.apache.dubbo:dubbo:2.7.2,服務端程式碼例子是上文的例子
如果沒有Dubbo SPI的基礎知識,建議先看Dubbo SPI,否則原始碼怎麼跳轉的將毫無頭緒
Dubbo SPI:https://www.cnblogs.com/GrimMjx/p/10970643.html
二.服務釋出
呼叫順序
首先講一下大致的服務釋出的呼叫順序圖,藍色方法不分析,主要是起一個netty服務
-org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent
-org.apache.dubbo.config.ServiceConfig#export
-org.apache.dubbo.config.ServiceConfig#doExport
-org.apache.dubbo.config.ServiceConfig#doExportUrls
-org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
-org.apache.dubbo.registry.integration.RegistryProtocol#export
-org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
-org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
-org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
-org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
-org.apache.dubbo.remoting.transport.netty.NettyTransporter#bind
...
原始碼分析
1.org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent,這個方法就不多說了,註釋寫的很詳細了
1 /** 2 * ServiceBean實現了ApplicationListener介面 3 * 在IOC的容器的啟動過程,當所有的bean都已經處理完成之後,spring ioc容器會發布ContextRefreshedEvent事件。 4 * 此處就是接收到事件處理的邏輯,開始服務釋出之旅 5 * 6 * @param event 7 */ 8 @Override 9 public void onApplicationEvent(ContextRefreshedEvent event) { 10 // 是否已釋出 && 是否已經被取消釋出 11 if (!isExported() && !isUnexported()) { 12 if (logger.isInfoEnabled()) { 13 logger.info("The service ready on spring started. service: " + getInterface()); 14 } 15 // 釋出 16 export(); 17 } 18 }
2.org.apache.dubbo.config.ServiceConfig#export,這裡的checkAndUpdateSubConfigs主要做的就是檢測標籤合法,檢測各種物件是否為空,為空則建立。之後判斷是否可以釋出和是否需要延遲釋出,需要則延遲再doExport
1 public synchronized void export() { 2 // 檢測<dubbo:service>的interface是否合法 3 // 檢查provider為空 4 // 檢查各種物件是否為空,為空則建立 5 checkAndUpdateSubConfigs(); 6 if (!shouldExport()) { 7 return; 8 } 9 // 是否延遲? 10 if (shouldDelay()) { 11 // 延遲 12 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); 13 } else { 14 doExport(); 15 } 16 }
3.org.apache.dubbo.config.ServiceConfig#doExport,註釋寫的很清楚,還是沒有走到核心邏輯
1 protected synchronized void doExport() { 2 // 是否已經被取消釋出 3 if (unexported) { 4 throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); 5 } 6 // 是否已經發布,注意這個變數volatile修飾 7 if (exported) { 8 return; 9 } 10 exported = true; 11 if (StringUtils.isEmpty(path)) { 12 path = interfaceName; 13 } 14 doExportUrls(); 15 }
4.org.apache.dubbo.config.ServiceConfig#doExportUrls,先載入要註冊的url,然後遍歷所有協議,釋出服務並註冊
1 @SuppressWarnings({"unchecked", "rawtypes"}) 2 private void doExportUrls() { 3 // 載入要註冊的url 4 // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&pid=10516®istry=zookeeper×tamp=1559889491339 5 List<URL> registryURLs = loadRegistries(true); 6 // for迴圈每個協議,釋出服務並註冊到註冊中心 7 for (ProtocolConfig protocolConfig : protocols) { 8 String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); 9 ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); 10 ApplicationModel.initProviderModel(pathKey, providerModel); 11 doExportUrlsFor1Protocol(protocolConfig, registryURLs); 12 } 13 }
5.org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol,看方法名字也知道,單一協議多匯出服務。這個方法做的事情比較多,我這邊拆成4個部分,第一部分、第二部分和第三部分都是在做填充map的事情,第三部分最後生成匯出url。第四部分開始釋出服務,裡面會判斷到底是釋出服務並且註冊到註冊中心呢還是僅釋出服務。
1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { 2 /** 第一部分開始 **/ 3 String name = protocolConfig.getName(); 4 // 為空或者為空字串,預設dubbo協議 5 if (StringUtils.isEmpty(name)) { 6 name = DUBBO; 7 } 8 9 Map<String, String> map = new HashMap<String, String>(); 10 map.put(SIDE_KEY, PROVIDER_SIDE); 11 12 appendRuntimeParameters(map); 13 appendParameters(map, metrics); 14 appendParameters(map, application); 15 appendParameters(map, module); 16 // remove 'default.' prefix for configs from ProviderConfig 17 // appendParameters(map, provider, Constants.DEFAULT_KEY); 18 appendParameters(map, provider); 19 appendParameters(map, protocolConfig); 20 appendParameters(map, this); 21 // 上面的程式碼就是將版本,方法,各種配置放到map裡去 22 // 這裡給出debug的時候的map物件 23 // map.toString() = {side=provider, application=dubbo-server, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368} 24 /** 第一部分結束 **/ 25 26 /** 第二部分開始 **/ 27 // 這段if裡做的事情主要是檢測<dubbo:method> 標籤中的配置資訊,填充map 28 if (CollectionUtils.isNotEmpty(methods)) { 29 for (MethodConfig method : methods) { 30 // 新增MethodConfig到map中,key=方法名.屬性 value=屬性值 31 // ex sayHello.retries:2 32 appendParameters(map, method, method.getName()); 33 String retryKey = method.getName() + ".retry"; 34 if (map.containsKey(retryKey)) { 35 String retryValue = map.remove(retryKey); 36 if ("false".equals(retryValue)) { 37 map.put(method.getName() + ".retries", "0"); 38 } 39 } 40 41 // 獲取ArgumentConfig列表 42 List<ArgumentConfig> arguments = method.getArguments(); 43 if (CollectionUtils.isNotEmpty(arguments)) { 44 for (ArgumentConfig argument : arguments) { 45 // convert argument type 46 if (argument.getType() != null && argument.getType().length() > 0) { 47 Method[] methods = interfaceClass.getMethods(); 48 // visit all methods 49 if (methods != null && methods.length > 0) { 50 for (int i = 0; i < methods.length; i++) { 51 String methodName = methods[i].getName(); 52 // target the method, and get its signature 53 if (methodName.equals(method.getName())) { 54 Class<?>[] argtypes = methods[i].getParameterTypes(); 55 // one callback in the method 56 if (argument.getIndex() != -1) { 57 if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { 58 // 新增ArgumentConfig資訊到map中 59 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 60 } else { 61 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 62 } 63 } else { 64 // multiple callbacks in the method 65 for (int j = 0; j < argtypes.length; j++) { 66 Class<?> argclazz = argtypes[j]; 67 if (argclazz.getName().equals(argument.getType())) { 68 appendParameters(map, argument, method.getName() + "." + j); 69 if (argument.getIndex() != -1 && argument.getIndex() != j) { 70 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 71 } 72 } 73 } 74 } 75 } 76 } 77 } 78 } else if (argument.getIndex() != -1) { 79 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 80 } else { 81 throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); 82 } 83 84 } 85 } 86 } // end of methods for 87 } 88 /** 第二部分結束 **/ 89 90 /** 第三部分開始 **/ 91 if (ProtocolUtils.isGeneric(generic)) { 92 map.put(GENERIC_KEY, generic); 93 map.put(METHODS_KEY, ANY_VALUE); 94 } else { 95 String revision = Version.getVersion(interfaceClass, version); 96 if (revision != null && revision.length() > 0) { 97 map.put(REVISION_KEY, revision); 98 } 99 100 // 生成包裝類 101 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 102 // 新增方法到map中 103 if (methods.length == 0) { 104 logger.warn("No method found in service interface " + interfaceClass.getName()); 105 map.put(METHODS_KEY, ANY_VALUE); 106 } else { 107 map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 108 } 109 } 110 111 // 新增token 112 if (!ConfigUtils.isEmpty(token)) { 113 if (ConfigUtils.isDefault(token)) { 114 map.put(TOKEN_KEY, UUID.randomUUID().toString()); 115 } else { 116 map.put(TOKEN_KEY, token); 117 } 118 } 119 120 // export service 121 // 此處map.toString() = {side=provider, application=dubbo-server, methods=hello, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368} 122 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); 123 Integer port = this.findConfigedPorts(protocolConfig, name, map); 124 // 此處url=dubbo://192.168.5.16:20880/com.grimmjx.edu.HelloService?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=com.grimmjx.edu.HelloService&methods=hello&pid=11917&side=provider&timeout=100×tamp=1559973693109 125 URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); 126 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { 127 url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 128 .getExtension(url.getProtocol()).getConfigurator(url).configure(url); 129 } 130 /** 第三部分結束 **/ 131 132 /** 第四部分開始 **/ 133 // 開始釋出服務 134 String scope = url.getParameter(SCOPE_KEY); 135 // don't export when none is configured 136 // //配置為none不暴露 137 if (!SCOPE_NONE.equalsIgnoreCase(scope)) { 138 139 // export to local if the config is not remote (export to remote only when config is remote) 140 // 配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠端服務) 141 if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { 142 exportLocal(url); 143 } 144 // export to remote if the config is not local (export to local only when config is local) 145 // 如果配置不是local則暴露為遠端服務.(配置為local,則表示只暴露遠端服務) 146 if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { 147 if (!isOnlyInJvm() && logger.isInfoEnabled()) { 148 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); 149 } 150 if (CollectionUtils.isNotEmpty(registryURLs)) { 151 // 釋出服務 152 for (URL registryURL : registryURLs) { 153 //if protocol is only injvm ,not register 154 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { 155 continue; 156 } 157 url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); 158 URL monitorUrl = loadMonitor(registryURL); 159 if (monitorUrl != null) { 160 url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); 161 } 162 if (logger.isInfoEnabled()) { 163 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); 164 } 165 166 // For providers, this is used to enable custom proxy to generate invoker 167 String proxy = url.getParameter(PROXY_KEY); 168 if (StringUtils.isNotEmpty(proxy)) { 169 registryURL = registryURL.addParameter(PROXY_KEY, proxy); 170 } 171 172 // 生成Invoker 173 // Invoker是十分重要的物件,可向它發起invoke呼叫 174 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); 175 // 持有this和invoker 176 // 此處的invoker.getUrl()=registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D10738%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559895851366&pid=10738®istry=zookeeper×tamp=1559895851283 177 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 178 179 // 釋出,並生成Exporter 180 Exporter<?> exporter = protocol.export(wrapperInvoker); 181 exporters.add(exporter); 182 } 183 184 // 沒有註冊中心,僅釋出服務 185 } else { 186 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); 187 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 188 189 Exporter<?> exporter = protocol.export(wrapperInvoker); 190 exporters.add(exporter); 191 } 192 /** 193 * @since 2.7.0 194 * ServiceData Store 195 */ 196 MetadataReportService metadataReportService = null; 197 if ((metadataReportService = getMetadataReportService()) != null) { 198 metadataReportService.publishProvider(url); 199 } 200 } 201 } 202 this.urls.add(url); 203 /** 第四部分結束 **/ 204 }
6.org.apache.dubbo.registry.integration.RegistryProtocol#export,為什麼是RegistryProtocol?註釋裡寫的很清楚了,這裡的url是registry://開頭的。
1 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 // 獲取註冊地址 3 URL registryUrl = getRegistryUrl(originInvoker); 4 // url to export locally 5 // 獲取provider url 6 URL providerUrl = getProviderUrl(originInvoker); 7 // Subscribe the override data 8 // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call 9 // the same service. Because the subscribed is cached key with the name of the service, it causes the 10 // subscription information to cover. 11 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); 12 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 13 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 14 // 要註冊的url 15 providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); 16 //export invoker 17 // 匯出服務 18 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); 19 // url to registry
.... 38 }
之後上面呼叫順序圖的藍色部分的程式碼不做分析,主要是建立一個NettyServer(預設)。可自行研究
三.服務註冊
呼叫順序
首先講一下大致的服務註冊的呼叫順序圖,我們只分析紅色部分。
-org.apache.dubbo.registry.integration.RegistryProtocol#register
-org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
-org.apache.dubbo.registry.support.FailbackRegistry#register
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
原始碼分析
1.org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry,具體還要看下一步
1 @Override 2 public Registry getRegistry(URL url) { 3 url = URLBuilder.from(url) 4 .setPath(RegistryService.class.getName()) 5 .addParameter(INTERFACE_KEY, RegistryService.class.getName()) 6 .removeParameters(EXPORT_KEY, REFER_KEY) 7 .build(); 8 String key = url.toServiceStringWithoutResolving(); 9 // Lock the registry access process to ensure a single instance of the registry 10 LOCK.lock(); 11 try { 12 // 快取中獲取 13 Registry registry = REGISTRIES.get(key); 14 if (registry != null) { 15 return registry; 16 } 17 18 //create registry by spi/ioc 19 // 用Dubbo SPI建立Registry 20 registry = createRegistry(url); 21 if (registry == null) { 22 throw new IllegalStateException("Can not create registry " + url); 23 } 24 // 寫入快取 25 REGISTRIES.put(key, registry); 26 return registry; 27 } finally { 28 // Release the lock 29 LOCK.unlock(); 30 } 31 }
2.org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry,建立一個ZooKeeperRegistry例項並返回
1 @Override 2 public Registry createRegistry(URL url) { 3 return new ZookeeperRegistry(url, zookeeperTransporter); 4 }
3.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry,主要做的就是利用zk建立Zk客戶端
1 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 2 super(url); 3 if (url.isAnyHost()) { 4 throw new IllegalStateException("registry address == null"); 5 } 6 String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); 7 if (!group.startsWith(PATH_SEPARATOR)) { 8 group = PATH_SEPARATOR + group; 9 } 10 this.root = group; 11 zkClient = zookeeperTransporter.connect(url); 12 zkClient.addStateListener(state -> { 13 if (state == StateListener.RECONNECTED) { 14 try { 15 recover(); 16 } catch (Exception e) { 17 logger.error(e.getMessage(), e); 18 } 19 } 20 }); 21 }
4.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister,連線好zk後,是不是就要建立服務提供者的節點了?所以這一步就是註冊服務
1 @Override 2 public void doRegister(URL url) { 3 try { 4 // toUrlPath(url) = /dubbo/com.grimmjx.edu.HelloService/providers/dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D11917%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559973693109 5 zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); 6 } catch (Throwable e) { 7 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 8 } 9 }
這一步之後,我們用zkCli來連線zk看一下節點資料
&n