1. 程式人生 > >Dubbo中暴露服務的過程解析

Dubbo中暴露服務的過程解析

dubbo 服務暴露過程

Dubbo會在Spring例項化完bean之後,在重新整理容器最後一步釋出ContextRefreshEvent事件的時候,通知實現了ApplicationListener的ServiceBean類進行回撥onApplicationEvent 事件方法,dubbo會在這個方法中呼叫ServiceBean父類ServiceConfig的export方法,而該方法真正實現了服務的(非同步或者非非同步)釋出。

載入dubbo配置

Spring容器在啟動的時候,會讀取到Spring預設的一些schema以及Dubbo自定義的schema,每個schema都會對應一個自己的NamespaceHandler,NamespaceHandler裡面通過BeanDefinitionParser來解析配置資訊並轉化為需要載入的bean物件。

遇到dubbo名稱空間 ,首先會呼叫DubboNamespaceHandler類的 init方法 進行初始化操作。

根據名稱空間去獲取具體的處理器NamespaceHandler。那具體的處理器是在哪定義的呢,在”META-INF/spring.handlers”檔案中,Spring在會自動載入該檔案中所有內容。

META-INF/spring.handlers

http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

META-INF/spring.schemas

http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd

根據不同的XML節點,會委託NamespaceHandlerSupport 類找出合適的BeanDefinitionParser,其中Dubbo所有的標籤都使用 DubboBeanDefinitionParser進行解析,基於一對一屬性對映,將XML標籤解析為Bean物件。

DubboNamespaceHandler.java 類程式碼如下

package com.alibaba.dubbo.config.spring.schema;

import
org.springframework.beans.factory.xml.NamespaceHandlerSupport; public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }

由於DubboBeanDefinitionParser 類中 parse轉換的過程程式碼還是比較複雜,只抽離出來bean的註冊這一塊的程式碼如下

DubboBeanDefinitionParser.java 類程式碼如下

package com.alibaba.dubbo.config.spring.schema;

public class DubboBeanDefinitionParser implements BeanDefinitionParser {

    @SuppressWarnings("unchecked")
    private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {

    RootBeanDefinition beanDefinition = new RootBeanDefinition();
        beanDefinition.setBeanClass(beanClass);
        beanDefinition.setLazyInit(false);
        String id = element.getAttribute("id");
        //省略......
         if(id != null && id.length() > 0) {
            if(parserContext.getRegistry().containsBeanDefinition(id)) {
                throw new IllegalStateException("Duplicate spring bean id " + id);
            }
            //registerBeanDefinition 註冊Bean的定義
            //具體的id如下 applicationProvider.xml解析後的顯示 id,
            //如id="dubbo_provider"  beanDefinition = "ApplicationConfig"
            parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
            beanDefinition.getPropertyValues().addPropertyValue("id", id);
        }
     }  
}

通過DubboBeanDefinitionParser 類的 parse方法會將class資訊封裝成BeanDefinition,然後將BeanDefinition再放進DefaultListableBeanFactory的beanDefinitionMap中。

最後通過Spring bean 的載入機制進行載入。

服務暴露過程

Dubbo會在Spring例項化完bean之後,在重新整理容器最後一步釋出ContextRefreshEvent事件的時候,通知實現了ApplicationListener的ServiceBean類進行回撥onApplicationEvent 事件方法,dubbo會在這個方法中呼叫ServiceBean父類ServiceConfig的export方法,而該方法真正實現了服務的(非同步或者非非同步)釋出。

服務暴露入口

由服務配置類 ServiceConfig 進行初始化工作及服務暴露入口,首先進去執行該類的export()方法。

ServiceConfig.java 類的 export 方法

export的步驟簡介

  1. 首先會檢查各種配置資訊,填充各種屬性,總之就是保證我在開始暴露服務之前,所有的東西都準備好了,並且是正確的。
  2. 載入所有的註冊中心,因為我們暴露服務需要註冊到註冊中心中去。
  3. 根據配置的所有協議和註冊中心url分別進行匯出。
  4. 進行匯出的時候,又是一波屬性的獲取設定檢查等操作。
  5. 如果配置的不是remote,則做本地匯出。
  6. 如果配置的不是local,則暴露為遠端服務。
  7. 不管是本地還是遠端服務暴露,首先都會獲取Invoker。
  8. 獲取完Invoker之後,轉換成對外的Exporter,快取起來。

export方法先判斷是否需要延遲暴露(這裡我們使用的是不延遲暴露),然後執行doExport方法。

doExport方法先執行一系列的檢查方法,然後呼叫doExportUrls方法。檢查方法會檢測dubbo的配置是否在Spring配置檔案中宣告,沒有的話讀取properties檔案初始化。

doExportUrls方法先呼叫loadRegistries獲取所有的註冊中心url,然後遍歷呼叫doExportUrlsFor1Protocol方法。對於在標籤中指定了registry屬性的Bean,會在載入BeanDefinition的時候就載入了註冊中心。

ServiceConfig.java 類的 export 方法

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

public synchronized void export() {
    if (provider != null) {
        if (export == null) {
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    if (export != null && !export) {
        return;
    }

    if (delay != null && delay > 0) {
        delayExportExecutor.schedule(new Runnable() {
            public void run() {
                doExport();
            }
        }, delay, TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }
}

可以看出釋出釋出是支援延遲暴露釋出服務的,這樣可以用於當我們釋出的服務非常多,影響到應用啟動的問題,前提是應用允許服務釋出的延遲特性。

接下來就進入到 ServiceConfig.java 類的 doExport() 方法。

檢查DUBBO配置的合法性

ServiceConfig.java 類的 doExport方法。檢查DUBBO配置的合法性,並呼叫doExportUrls 方法。

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
        }
        checkDefault();
        if (provider != null) {
            if (application == null) {
                application = provider.getApplication();
            }
            if (module == null) {
                module = provider.getModule();
            }
            if (registries == null) {
                registries = provider.getRegistries();
            }
            if (monitor == null) {
                monitor = provider.getMonitor();
            }
            if (protocols == null) {
                protocols = provider.getProtocols();
            }
        }
        if (module != null) {
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor();
            }
        }
        if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            checkInterfaceAndMethods(interfaceClass, methods);
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        if (local != null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local";
            }
            Class<?> localClass;
            try {
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(localClass)) {
                throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);
            }
        }
        if (stub != null) {
            if ("true".equals(stub)) {
                stub = interfaceName + "Stub";
            }
            Class<?> stubClass;
            try {
                stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(stubClass)) {
                throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);
            }
        }
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        doExportUrls();
    }
}

我們可以看出該方法的實現的邏輯包含了根據配置的優先順序將ProviderConfig,ModuleConfig,MonitorConfig,ApplicaitonConfig等一些配置資訊進行組裝和合並。還有一些邏輯是檢查配置資訊的合法性。最後又呼叫了doExportUrls方法。

服務多協議暴露過程

ServiceConfig.java 類的 doExportUrls() 方法

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
}

該方法第一步是載入註冊中心列表

loadRegistries(true);載入註冊中心列表響應示例

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.5.6&file=/data/dubbo/cache/dubbo-provider&pid=21448&registry=zookeeper&timestamp=1524134852031

第二部是將服務釋出到多種協議的url上,並且攜帶註冊中心列表的引數,從這裡我們可以看出dubbo是支援同時將一個服務釋出成為多種協議的,這個需求也是很正常的,客戶端也需要支援多協議,根據不同的場景選擇合適的協議。

ServiceConfig.java 類的 doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) 方法。

package com.alibaba.dubbo.config;

public class ServiceConfig<T> extends AbstractServiceConfig {

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            String name = protocolConfig.getName();
            if (name == null || name.length() == 0) {
                name = "dubbo";
            }

            String host = protocolConfig.getHost();
            //host = 10.4.81.95
            if (provider != null && (host == null || host.length() == 0)) {
                host = provider.getHost();
            }
            boolean anyhost = false;
            if (NetUtils.isInvalidLocalHost(host)) {
                anyhost = true;
                try {
                    host = InetAddress.getLocalHost().getHostAddress();
                } catch (UnknownHostException e) {
                    logger.warn(e.getMessage(), e);
                }
                if (NetUtils.isInvalidLocalHost(host)) {
                    if (registryURLs != null && registryURLs.size() > 0) {
                        for (URL registryURL : registryURLs) {
                            try {
                                Socket socket = new Socket();
                                try {
                                    SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
                                    socket.connect(addr, 1000);
                                    host = socket.getLocalAddress().getHostAddress();
                                    break;
                                } finally {
                                    try {
                                        socket.close();
                                    } catch (Throwable e) {
                                    }
                                }
                            } catch (Exception e) {
                                logger.warn(e.getMessage(), e);
                            }
                        }
                    }
                    if (NetUtils.isInvalidLocalHost(host)) {
                        host = NetUtils.getLocalHost();
                    }
                }
            }

            Integer port = protocolConfig.getPort();
            if (provider != null && (port == null || port == 0)) {
                port = provider.getPort();
            }
            final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
            if (port == null || port == 0) {
                port = defaultPort;
            }
            if (port == null || port <= 0) {
                port = getRandomPort(name);
                if (port == null || port < 0) {
                    port = NetUtils.getAvailablePort(defaultPort);
                    putRandomPort(name, port);
                }
                logger.warn("Use random available port(" + port + ") for protocol " + name);
            }

            Map<String, String> map = new HashMap<String, String>();
            if (anyhost) {
                map.put(Constants.ANYHOST_KEY, "true");
            }
            map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
            map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
            map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
            if (ConfigUtils.getPid() > 0) {
                map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
            }
            appendParameters(map, application);
            appendParameters(map, module);
            appendParameters(map, provider, Constants.DEFAULT_KEY);
            appendParameters(map, protocolConfig);
            appendParameters(map, this);
            if (methods != null && methods.size() > 0) {
                for (MethodConfig method : methods) {
                    appendParameters(map, method, method.getName());
                    String retryKey = method.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(method.getName() + ".retries", "0");
                        }
                    }
                    List<ArgumentConfig> arguments = method.getArguments();
                    if (arguments != null && arguments.size() > 0) {
                        for (ArgumentConfig argument : arguments) {
                            //型別自動轉換.
                            if (argument.getType() != null && argument.getType().length() > 0) {
                                Method[] methods = interfaceClass.getMethods();
                                //遍歷所有方法
                                if (methods != null && methods.length > 0) {
                                    for (int i = 0; i < methods.length; i++) {
                                        String methodName = methods[i].getName();
                                        //匹配方法名稱,獲取方法簽名.
                                        if (methodName.equals(method.getName())) {
                                            Class<?>[] argtypes = methods[i].getParameterTypes();
                                            //一個方法中單個callback
                                            if (argument.getIndex() != -1) {
                                                if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                                } else {
                                                    throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            } else {
                                                //一個方法中多個callback
                                                for (int j = 0; j < argtypes.length; j++) {
                                                    Class<?> argclazz = argtypes[j];
                                                    if (argclazz.getName().equals(argument.getType())) {
                                                        appendParameters(map, argument, method.getName() + "." + j);
                                                        if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                            throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            } else if (argument.getIndex() != -1) {
                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                            } else {
                                throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                            }

                        }
                    }
                } // end of methods for
            }

            if (ProtocolUtils.isGeneric(generic)) {
                map.put("generic", generic);
                map.put("methods", Constants.ANY_VALUE);
            } else {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put("revision", revision);
                }

                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("NO method found in service interface " + interfaceClass.getName());
                    map.put("methods", Constants.ANY_VALUE);
                } else {
                    map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
            if (!ConfigUtils.isEmpty(token)) {
                if (ConfigUtils.isDefault(token)) {
                    map.put("token", UUID.randomUUID().toString());
                } else {
                    map.put("token", token);
                }
            }
            if ("injvm".equals(protocolConfig.getName())) {
                protocolConfig.setRegister(false);
                map.put("notify", "false");
            }
            // 服務釋出 map 中的一些關鍵配置資訊
            /*
            map = {[email protected]}  size = 17
             0 = {[email protected]} "side" -> "provider"
             1 = {[email protected]} "default.version" -> "1.0"
             2 = {[email protected]} "methods" -> "sayHello"
             3 = {[email protected]} "dubbo" -> "2.5.6"
             4 = {[email protected]} "threads" -> "500"
             5 = {[email protected]} "pid" -> "21448"
             6 = {[email protected]} "interface" -> "io.ymq.dubbo.api.DemoService"
             7 = {[email protected]} "threadpool" -> "fixed"
             8 = {[email protected]} "generic" -> "false"
             9 = {[email protected]} "default.retries" -> "0"
             10 = {[email protected]} "delay" -> "-1"
             11 = {[email protected]} "application" -> "dubbo-provider"
             12 = {[email protected]} "default.connections" -> "5"
             13 = {[email protected]} "default.delay" -> "-1"
             14 = {[email protected]} "default.timeout" -> "10000"
             15 = {[email protected]} "anyhost" -> "true"
             16 = {[email protected]} "timestamp" -> "1524135271940"
            */
            // 匯出服務
            String contextPath = protocolConfig.getContextpath();
            if ((contextPath == null || contextPath.length() == 0) && provider != null) {
                contextPath = provider.getContextpath();
            }
            URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
            // url
            /*
            dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
            anyhost=true&
            application=dubboprovider&
            default.connections=5&
            default.delay=-1&
            default.retries=0&
            default.timeout=10000&
            default.version=1.0&
            delay=-1&
            dubbo=2.5.6&
            generic=false&
            interface=io.ymq.dubbo.api.DemoService&
            methods=sayHello&
            pid=21448&
            side=provider&
            threadpool=fixed&
            threads=500&
            timestamp=1524135271940
            */

            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }

            String scope = url.getParameter(Constants.SCOPE_KEY);
            //配置為none不暴露
            if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

                //配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠端服務)
                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                    exportLocal(url);
                }
                //如果配置不是local則暴露為遠端服務.(配置為local,則表示只暴露本地服務)
                if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (registryURLs != null && registryURLs.size() > 0
                            && url.getParameter("register", true)) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                            Exporter<?> exporter = protocol.export(invoker);
                            exporters.add(exporter);
                        }
                    } else {
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                }
            }
            this.urls.add(url);
        }
}

拼裝dubbo服務URL

該方法的邏輯是先根據服務配置、協議配置、釋出服務的伺服器資訊、方法列表、dubbo版本等等資訊組裝成一個釋出的URL物件。

主要根據之前map裡的資料組裝成URL。

例如

dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
anyhost=true&
application=dubbo-provider&
default.connections=5&
default.delay=-1&
default.retries=0&
default.timeout=10000&
default.version=1.0&
delay=-1&
dubbo=2.5.6&
generic=false&
interface=io.ymq.dubbo.api.DemoService&
methods=sayHello&
pid=21448&
side=provider&
threadpool=fixed&
threads=500&
timestamp=1524135271940

本地暴露和遠端暴露

  1. 如果服務配置的scope是釋出範圍,配置為none不暴露服務,則會停止釋出操作;
  2. 如果配置不是remote的情況下先做本地暴露,則呼叫本地暴露exportLocal方法;
  3. 如果配置不是local則暴露為遠端服務,則註冊服務registryProcotol;
//配置為none不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

    //配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠端服務)
    if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
        exportLocal(url);
    }
    //如果配置不是local則暴露為遠端服務.(配置為local,則表示只暴露本地服務)
    if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
        省略更多
    }
}

本地暴露服務

@SuppressWarnings({"unchecked", "rawtypes"})
private void exportLocal(URL url) {
    // 入參 URL =
    /*
    dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
    anyhost=true&
    application=dubbo-provider&
    default.connections=5&
    default.delay=-1&
    default.retries=0&
    default.timeout=10000&
    default.version=1.0&
    delay=-1&
    dubbo=2.5.6&
    generic=false&
    interface=io.ymq.dubbo.api.DemoService&
    methods=sayHello&
    pid=11104&
    scope=local&
    side=provider&
    threadpool=fixed&
    threads=500&
    timestamp=1524193840984
    */
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {

        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(NetUtils.LOCALHOST)
                .setPort(0);

    //這時候轉成本地暴露的協議 url:         
    /* 
    injvm://127.0.0.1/io.ymq.dubbo.api.DemoService?
    anyhost=true&
    application=dubbo-provider&
    default.connections=5&
    default.delay=-1&
    default.retries=0&
    default.timeout=10000&
    default.version=1.0&
    delay=-1&
    dubbo=2.5.6&
    generic=false&
    interface=io.ymq.dubbo.api.DemoService&
    methods=sayHello&
    pid=11104&
    scope=local&
    side=provider&
    threadpool=fixed&
    threads=500&
    timestamp=1524193840984
    */

    //首先還是先獲得Invoker
    //然後匯出成Exporter,並快取
    //這裡的proxyFactory實際是JavassistProxyFactory
    //有關詳細的獲得Invoke以及exporter會在下面的流程解析,在本地暴露這個流程就不再說明。

        Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
    }
}
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();

ServiceConfig.exportLocal(URL url)方法中對url進行本地暴露,首先將URL的協議更改為了“injvm”、IP更改為了本地埠,埠變更為0。

主要有代理工廠建立Invoker代理、Protocol暴露Invoker從而生成Exporter兩個處理邏輯。

  1. 獲得代理工廠建立的Invoker代理,呼叫getInvoker方法,根據URL中的proxy引數選擇具體的代理工廠,預設選擇JavassistProxyFactory代理工廠進行處理。
  2. 呼叫protocl.export()方法完成,在protocol物件的export(Invoker)方法中建立的Exporter物件存入ServiceConfig物件的List

暴露為遠端服務

接下來是暴露為遠端服務,跟本地暴露的流程一樣還是先獲取Invoker,然後匯出成Exporter。


private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();


//如果配置不是local則暴露為遠端服務.(配置為local,則表示只暴露本地服務)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
    if (logger.isInfoEnabled()) {
        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
    }
    if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) {
        for (URL registryURL : registryURLs) {
            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
            URL monitorUrl = loadMonitor(registryURL);
            if (monitorUrl != null) {
                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
            }

            // url 引數
            /**
            dubbo://10.4.81.95:20880/io.ymq.dubbo.api.DemoService?
            anyhost=true&
            application=dubbo-provider&
            default.connections=5&
            default.delay=-1&
            default.retries=0&
            default.timeout=10000&
            default.version=1.0&
            delay=-1&
            dubbo=2.5.6&
            generic=false&
            interface=io.ymq.dubbo.api.DemoService&methods=sayHello&
            monitor=dubbo%3A%2F%2F127.0.0.1%3A2181%2Fcom.alibaba.dubbo.registry.RegistryService%3Fapplication%3Ddubbo-provider%26dubbo%3D2.5.6%26file%3D%2Fdata%2Fdubbo%2Fcache%2Fdubbo-provider%26pid%3D26440%26protocol%3Dregistry%26refer%3Ddubbo%253D2.5.6%2526interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526pid%253D26440%2526timestamp%253D1524212561737%26registry%3Dzookeeper%26timestamp%3D1524212160841&
            pid=26440&
            side=provider&
            threadpool=fixed&
            threads=500&
            timestamp=1524212161760

            */

            //根據服務具體實現,實現介面,以及registryUrl通過ProxyFactory將DemoService封裝成一個本地執行的Invoker代理
            //invoker是對具體實現的一種代理。
            //這裡proxyFactory是上面列出的生成的程式碼

            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

            //使用Protocol將invoker匯出成一個Exporter
            //暴露封裝服務invoker
            //呼叫Protocol生成的適配類的export方法
            //這裡的protocol是上面列出的生成的程式碼

            Exporter<?> exporter = protocol.export(invoker);
            exporters.add(exporter);
        }
    } else {
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

        Exporter<?> exporter = protocol.export(invoker);
        exporters.add(exporter);
    }
}

關於Invoker,Exporter等的解釋參見最下面的內容

暴露遠端服務時的獲取Invoker過程

服務實現類轉換成Invoker,大概的步驟是:

  1. 根據上面生成的proxyFactory方法呼叫具體的ProxyFactory實現類的getInvoker方法獲取Invoker。
  2. getInvoker的過程是,首先對實現類做一個包裝,生成一個包裝後的類。
  3. 然後新建立一個Invoker例項,這個Invoker中包含著生成的Wrapper類,Wrapper類中有具體的實現類。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
package com.alibaba.dubbo.rpc.proxy.wrapper;

public class StubProxyFactoryWrapper implements ProxyFactory {

    private final ProxyFactory proxyFactory;

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return proxyFactory.getInvoker(proxy, type, url);
    }
}

這行程式碼中包含服務實現類轉換成Invoker的過程,其中proxyFactory是上面列出的動態生成的程式碼,其中getInvoker的程式碼為(做了精簡)。

JavassistProxyFactory.java 類的 getInvoker(T proxy, Class<T> type, URL url) 方法。

package com.alibaba.dubbo.rpc.proxy.javassist;

public class JavassistProxyFactory extends AbstractProxyFactory {

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {

        // TODO Wrapper類不能正確處理帶$的類名
        //第一步封裝一個包裝類(wrapper)
        //該類是手動生成的
        //如果類是以$開頭,就使用介面型別獲取,其他的使用實現類獲取

        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);

        //返回一個Invoker例項,doInvoke方法中直接返回上面wrapper的invokeMethod
        //關於生成的wrapper,請看下面列出的生成的程式碼,其中invokeMethod方法中就有實現類對實際方法的呼叫

        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

Wrapper.java 類的 getWrapper(Class<?> c) 方法。

生成包裝類(wrapper)的過程,首先看getWrapper方法

package com.alibaba.dubbo.common.bytecode;

public abstract class Wrapper {

    //快取包裝類
    private static final Map<Class<?>, Wrapper> WRAPPER_MAP = new ConcurrentHashMap<Class<?>, Wrapper>(); //class wrapper map

    public static Wrapper getWrapper(Class<?> c) {
        while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
            c = c.getSuperclass();
        //Object型別的
        if( c == Object.class )
            return OBJECT_WRAPPER;
        //先去Wrapper快取中查詢 
        // c 等於 class io.ymq.dubbo.provider.service.DemoServiceImpl
        Wrapper ret = WRAPPER_MAP.get(c);
        if( ret == null ) {
            //快取中不存在,生成Wrapper類,放到快取
            ret = makeWrapper(c);
            WRAPPER_MAP.put(c,ret);
        }
        return ret;
    }
}

生成完Wrapper以後,返回一個AbstractProxyInvoker例項。至此生成Invoker的步驟就完成了。
可以看到Invoker執行方法的時候,會呼叫Wrapper的invokeMethod,這個方法中會有真實的實現類呼叫真實方法的程式碼

JavassistProxyFactory.java 類的 getInvoker(T proxy, Class<T> type, URL url) 方法。

package com.alibaba.dubbo.rpc.proxy.javassist;

public class JavassistProxyFactory extends AbstractProxyFactory {

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {

        // TODO Wrapper類不能正確處理帶$的類名
        //第一步封裝一個包裝類(wrapper)
        //該類是手動生成的
        //如果類是以$開頭,就使用介面型別獲取,其他的使用實現類獲取

        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);

        //返回一個Invoker例項,doInvoke方法中直接返回上面wrapper的invokeMethod
        //關於生成的wrapper,請看下面列出的生成的程式碼,其中invokeMethod方法中就有實現類對實際方法的呼叫

        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);