1. 程式人生 > 其它 >Dubbo原始碼-11-服務引用流程

Dubbo原始碼-11-服務引用流程

一 入口

public static void main(String[] args) {
        // 引用遠端服務 此例項很重 封裝了與註冊中心的連線以及與提供者的連線
        ReferenceConfig<DemoService> reference = new ReferenceConfig<DemoService>();
        reference.setApplication(new ApplicationConfig("native-consumer")); // 配置應用資訊
        reference.setRegistry(new RegistryConfig("zookeeper://localhost:2181")); // 配置註冊中心資訊
        reference.setInterface(DemoService.class); // 引用的遠端服務的介面抽象
        // 和本地bean一樣使用service 此代理物件內部封裝了所有通訊細節 服務端獲得了遠端服務的代理物件 像呼叫本地方法一樣
        DemoService demoService = reference.get();
        String ret = demoService.sayHello("world");
        System.out.println("ret=" + ret);
    }

二 配置資訊

public synchronized T get() {
        if (this.destroyed) throw new IllegalStateException("Already destroyed!");
        if (this.ref == null)
            this.init();
        return ref;
    }
private void init() {
        if (this.initialized) return; // 判斷是否已經完成遠端服務的初始化
        this.initialized = true; // 標識完成遠端服務的初始化
        if (interfaceName == null || interfaceName.length() == 0)
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        // get consumer's global configuration
        this.checkDefault(); // 嘗試在VM引數中載入ConsumerConfig配置項
        appendProperties(this); // 嘗試在VM引數中載入ReferenceConfig配置項
        if (super.getGeneric() == null && this.getConsumer() != null) {
            super.setGeneric(this.getConsumer().getGeneric());
        }
        if (ProtocolUtils.isGeneric(super.getGeneric())) { // 泛化呼叫
            this.interfaceClass = GenericService.class;
        } else {
            try {
                this.interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            super.checkInterfaceAndMethods(interfaceClass, this.methods); // 如果methods不為空 就校驗interfaceClass是否存在對應的實現
        }
        String resolve = System.getProperty(interfaceName);
        String resolveFile = null;
        if (resolve == null || resolve.length() == 0) {
            resolveFile = System.getProperty("dubbo.resolve.file");
            if (resolveFile == null || resolveFile.length() == 0) {
                File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
                if (userResolveFile.exists()) {
                    resolveFile = userResolveFile.getAbsolutePath();
                }
            }
            if (resolveFile != null && resolveFile.length() > 0) {
                Properties properties = new Properties();
                FileInputStream fis = null;
                try {
                    fis = new FileInputStream(new File(resolveFile));
                    properties.load(fis);
                } catch (IOException e) {
                    throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
                } finally {
                    try {
                        if (null != fis) fis.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
                resolve = properties.getProperty(interfaceName);
            }
        }
        if (resolve != null && resolve.length() > 0) {
            url = resolve;
            if (logger.isWarnEnabled()) {
                if (resolveFile != null) {
                    logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
                } else {
                    logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
                }
            }
        }
        // 當前引用配置部分配置項如果缺失嘗試把consumer的配置項賦值過來
        if (this.consumer != null) {
            if (this.application == null)
                this.application = this.consumer.getApplication();
            if (this.module == null)
                module = this.consumer.getModule();
            if (this.registries == null)
                registries = this.consumer.getRegistries();
            if (monitor == null)
                monitor = consumer.getMonitor();
        }
        if (this.module != null) {
            if (this.registries == null)
                this.registries = this.module.getRegistries();
            if (this.monitor == null)
                this.monitor = this.module.getMonitor();
        }
        if (this.application != null) {
            if (this.registries == null)
                this.registries = this.application.getRegistries();
            if (this.monitor == null)
                this.monitor = this.application.getMonitor();
        }
        super.checkApplication();
        checkStub(this.interfaceClass);
        checkMock(this.interfaceClass);
        Map<String, String> map = new HashMap<String, String>();
        Map<Object, Object> attributes = new HashMap<Object, Object>();
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        if (!super.isGeneric()) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(this.interfaceClass).getMethodNames(); // 遠端服務介面中的方法定義
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        map.put(Constants.INTERFACE_KEY, interfaceName);
        appendParameters(map, this.application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        String prefix = StringUtils.getServiceKey(map); // com.alibaba.dubbo.demo.DemoService
        if (this.methods != null && !this.methods.isEmpty()) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                appendAttributes(attributes, method, prefix + "." + method.getName());
                checkAndConvertImplicitConfig(method, map, attributes);
            }
        }

        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

        //attributes are stored by system context.
        StaticContext.getSystemContext().putAll(attributes);
        /**
         * <p>map就是一個配置項 包含了遠端服務的配置資訊 根據配置資訊構建遠端服務的代理物件<ul>
         *     <li>side -> consumer</li>
         *     <li>application -> demo-service</li>
         *     <li>register.ip -> 192.168.0.3</li>
         *     <li>methods -> sayHello</li>
         *     <li>qos.port -> 33333</li>
         *     <li>dubbo -> 2.0.2</li>
         *     <li>pid -> 37888</li>
         *     <li>interface -> com.alibaba.dubbo.demo.DemoService</li>
         *     <li>timestamp -> 1652886744792</li>
         * </ul></p>
         */
        this.ref = this.createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }

三 createProxy(...)

/**
     * - 建立Invoker物件
     *     - 遠端方式
     *         - 註冊中心
     *             - zk
     *                 - RegistryProtocol持有ZookeeperRegistry監聽指定路徑
     *                     - configurators
     *                     - routers
     *                     - providers
     *                 - RegistryProtocol獲取到providers的子路徑構建URL
     *                 - DubboProtocol構建Invoker物件
     * - 根據Invoker建立目標服務的代理物件
     */
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer; // false
        if (super.isInjvm() == null) {
            if (this.url != null && this.url.length() > 0) { // if a url is specified, don't do local reference // 指定了url配置 不做本地引用
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { // 檢測生產者自定義的配置是否需要本地引用
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) { // 本地引用 生成本地引用URL 構建InjvmInvoker例項
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else { // 遠端引用
            if (this.url != null && this.url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. // 遠端直連方式呼叫生產者服務
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration // 遠端呼叫 通過註冊中心
                /**
                 * 載入註冊中心的url
                 * 啟動消費者時指定的遠端註冊中心配置封裝成URL
                 * 可能存在多協議/多註冊中心
                 * url
                 *     - registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-consumer&dubbo=2.0.2&pid=82343&qos.port=33333&registry=zookeeper&timestamp=1669709199146
                 */
                List<URL> us = super.loadRegistries(false); //
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = super.loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // refer引數
                        this.urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (this.urls.size() == 1) { // 服務直連或者配置了單個註冊中心
                /**
                 * Protocol自適應擴充套件
                 *     - 預設實現dubbo
                 *     - URL協議型別
                 * 這裡url
                 *     - registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-consumer&dubbo=2.0.2&pid=82516&qos.port=33333&refer=application%3Dnative-consumer%26dubbo%3D2.0.2%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D82516%26qos.port%3D33333%26register.ip%3D198.18.0.1%26side%3Dconsumer%26timestamp%3D1669709307576&registry=zookeeper&timestamp=1669709314764
                 * 因此實現是RegistryProtocol
                 */
                this.invoker = refprotocol.refer(interfaceClass, this.urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = super.check;
        if (c == null && this.consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) { // invoker可用性檢測
            // make it possible for consumer to retry later if provider is temporarily unavailable
            initialized = false; // com.alibaba.dubbo.demo.DemoService
            final String serviceKey = (this.group == null ? "" : group + "/") + this.interfaceName + (this.version == null ? "" : ":" + this.version);
            Set<ConsumerInvokerWrapper> consumerInvoker = ProviderConsumerRegTable.getConsumerInvoker(serviceKey);
            if (consumerInvoker != Collections.<ConsumerInvokerWrapper>emptySet()) {
                //since create proxy error , so we must be the first consumer. Simply clear ConcurrentHashSet
                consumerInvoker.clear();
            }
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + serviceKey + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // create service proxy
        /**
         * 根據invoker生成代理類
         *
         * ProxyFactory自適應擴充套件
         *     - 預設實現javassist
         *     - URL中配置項proxy
         * invoker中的URL
         *     - zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?anyhost=true&application=native-consumer&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=82897&qos.port=33333&register.ip=198.18.0.1&remote.timestamp=1669707577585&side=consumer&timestamp=1669709866593
         * 實現是JavassistProxyFactory
         */
        return (T) proxyFactory.getProxy(this.invoker);
    }

兩個大步驟

  • 建立Invoker物件
  • 根據Invoker物件建立目標服務的代理物件

1 建立Invoker物件

建立Invoker物件(Protocol的RegistryProtocol實現中)

2 代理物件

通過Invoker物件建立目標服務的代理物件(ProxyFactory的JavassistProxyFactory)

四 流程圖