Dubbo原始碼-11-服務引用流程
阿新 • • 發佈:2022-12-02
一 入口
public static void main(String[] args) { // 引用遠端服務 此例項很重 封裝了與註冊中心的連線以及與提供者的連線 ReferenceConfig<DemoService> reference = new ReferenceConfig<DemoService>(); reference.setApplication(new ApplicationConfig("native-consumer")); // 配置應用資訊 reference.setRegistry(new RegistryConfig("zookeeper://localhost:2181")); // 配置註冊中心資訊 reference.setInterface(DemoService.class); // 引用的遠端服務的介面抽象 // 和本地bean一樣使用service 此代理物件內部封裝了所有通訊細節 服務端獲得了遠端服務的代理物件 像呼叫本地方法一樣 DemoService demoService = reference.get(); String ret = demoService.sayHello("world"); System.out.println("ret=" + ret); }
二 配置資訊
public synchronized T get() {
if (this.destroyed) throw new IllegalStateException("Already destroyed!");
if (this.ref == null)
this.init();
return ref;
}
private void init() { if (this.initialized) return; // 判斷是否已經完成遠端服務的初始化 this.initialized = true; // 標識完成遠端服務的初始化 if (interfaceName == null || interfaceName.length() == 0) throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!"); // get consumer's global configuration this.checkDefault(); // 嘗試在VM引數中載入ConsumerConfig配置項 appendProperties(this); // 嘗試在VM引數中載入ReferenceConfig配置項 if (super.getGeneric() == null && this.getConsumer() != null) { super.setGeneric(this.getConsumer().getGeneric()); } if (ProtocolUtils.isGeneric(super.getGeneric())) { // 泛化呼叫 this.interfaceClass = GenericService.class; } else { try { this.interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } super.checkInterfaceAndMethods(interfaceClass, this.methods); // 如果methods不為空 就校驗interfaceClass是否存在對應的實現 } String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { url = resolve; if (logger.isWarnEnabled()) { if (resolveFile != null) { logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); } else { logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); } } } // 當前引用配置部分配置項如果缺失嘗試把consumer的配置項賦值過來 if (this.consumer != null) { if (this.application == null) this.application = this.consumer.getApplication(); if (this.module == null) module = this.consumer.getModule(); if (this.registries == null) registries = this.consumer.getRegistries(); if (monitor == null) monitor = consumer.getMonitor(); } if (this.module != null) { if (this.registries == null) this.registries = this.module.getRegistries(); if (this.monitor == null) this.monitor = this.module.getMonitor(); } if (this.application != null) { if (this.registries == null) this.registries = this.application.getRegistries(); if (this.monitor == null) this.monitor = this.application.getMonitor(); } super.checkApplication(); checkStub(this.interfaceClass); checkMock(this.interfaceClass); Map<String, String> map = new HashMap<String, String>(); Map<Object, Object> attributes = new HashMap<Object, Object>(); map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (!super.isGeneric()) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(this.interfaceClass).getMethodNames(); // 遠端服務介面中的方法定義 if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } map.put(Constants.INTERFACE_KEY, interfaceName); appendParameters(map, this.application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); String prefix = StringUtils.getServiceKey(map); // com.alibaba.dubbo.demo.DemoService if (this.methods != null && !this.methods.isEmpty()) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } appendAttributes(attributes, method, prefix + "." + method.getName()); checkAndConvertImplicitConfig(method, map, attributes); } } String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); if (hostToRegistry == null || hostToRegistry.length() == 0) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //attributes are stored by system context. StaticContext.getSystemContext().putAll(attributes); /** * <p>map就是一個配置項 包含了遠端服務的配置資訊 根據配置資訊構建遠端服務的代理物件<ul> * <li>side -> consumer</li> * <li>application -> demo-service</li> * <li>register.ip -> 192.168.0.3</li> * <li>methods -> sayHello</li> * <li>qos.port -> 33333</li> * <li>dubbo -> 2.0.2</li> * <li>pid -> 37888</li> * <li>interface -> com.alibaba.dubbo.demo.DemoService</li> * <li>timestamp -> 1652886744792</li> * </ul></p> */ this.ref = this.createProxy(map); ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); }
三 createProxy(...)
/** * - 建立Invoker物件 * - 遠端方式 * - 註冊中心 * - zk * - RegistryProtocol持有ZookeeperRegistry監聽指定路徑 * - configurators * - routers * - providers * - RegistryProtocol獲取到providers的子路徑構建URL * - DubboProtocol構建Invoker物件 * - 根據Invoker建立目標服務的代理物件 */ @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; // false if (super.isInjvm() == null) { if (this.url != null && this.url.length() > 0) { // if a url is specified, don't do local reference // 指定了url配置 不做本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { // 檢測生產者自定義的配置是否需要本地引用 // by default, reference local service if there is isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { // 本地引用 生成本地引用URL 構建InjvmInvoker例項 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { // 遠端引用 if (this.url != null && this.url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. // 遠端直連方式呼叫生產者服務 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // 遠端呼叫 通過註冊中心 /** * 載入註冊中心的url * 啟動消費者時指定的遠端註冊中心配置封裝成URL * 可能存在多協議/多註冊中心 * url * - registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-consumer&dubbo=2.0.2&pid=82343&qos.port=33333®istry=zookeeper×tamp=1669709199146 */ List<URL> us = super.loadRegistries(false); // if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = super.loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // refer引數 this.urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (this.urls.size() == 1) { // 服務直連或者配置了單個註冊中心 /** * Protocol自適應擴充套件 * - 預設實現dubbo * - URL協議型別 * 這裡url * - registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-consumer&dubbo=2.0.2&pid=82516&qos.port=33333&refer=application%3Dnative-consumer%26dubbo%3D2.0.2%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D82516%26qos.port%3D33333%26register.ip%3D198.18.0.1%26side%3Dconsumer%26timestamp%3D1669709307576®istry=zookeeper×tamp=1669709314764 * 因此實現是RegistryProtocol */ this.invoker = refprotocol.refer(interfaceClass, this.urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = super.check; if (c == null && this.consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && !invoker.isAvailable()) { // invoker可用性檢測 // make it possible for consumer to retry later if provider is temporarily unavailable initialized = false; // com.alibaba.dubbo.demo.DemoService final String serviceKey = (this.group == null ? "" : group + "/") + this.interfaceName + (this.version == null ? "" : ":" + this.version); Set<ConsumerInvokerWrapper> consumerInvoker = ProviderConsumerRegTable.getConsumerInvoker(serviceKey); if (consumerInvoker != Collections.<ConsumerInvokerWrapper>emptySet()) { //since create proxy error , so we must be the first consumer. Simply clear ConcurrentHashSet consumerInvoker.clear(); } throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + serviceKey + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // create service proxy /** * 根據invoker生成代理類 * * ProxyFactory自適應擴充套件 * - 預設實現javassist * - URL中配置項proxy * invoker中的URL * - zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?anyhost=true&application=native-consumer&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=82897&qos.port=33333®ister.ip=198.18.0.1&remote.timestamp=1669707577585&side=consumer×tamp=1669709866593 * 實現是JavassistProxyFactory */ return (T) proxyFactory.getProxy(this.invoker); }
兩個大步驟
- 建立Invoker物件
- 根據Invoker物件建立目標服務的代理物件
1 建立Invoker物件
建立Invoker物件(Protocol的RegistryProtocol實現中)
2 代理物件
通過Invoker物件建立目標服務的代理物件(ProxyFactory的JavassistProxyFactory)