1. 程式人生 > >Dubbo/Dubbox的服務消費(二)- 服務發現

Dubbo/Dubbox的服務消費(二)- 服務發現

上文書整理了dubbo是如何生成服務代理的,並且留了個尾巴,這一文主要介紹dubbo是如何實現服務發現的,繼續前文的腳步,看一下dubbo如何完成傳說中的服務自動發現
開啟com.alibaba.dubbo.config.ReferenceConfig 類,只關注

@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
    private T createProxy(Map<String, String> map) {
        //...省略不相關程式碼
        invoker = refprotocol.refer(interfaceClass, urls.get(0
)); //...省略不相關程式碼 }

其中refprotocol是一個全域性變數

 private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

可見其是一個“自適應擴充套件點”那麼我們根據去看Protocol的refer方法對應的extName是什麼,其方法簽名如下

    /**
     * 引用遠端服務:<br>
     * 1. 當用戶呼叫refer()所返回的Invoker物件的invoke()方法時,協議需相應執行同URL遠端export()傳入的Invoker物件的invoke()方法。<br>
     * 2. refer()返回的Invoker由協議實現,協議通常需要在此Invoker中傳送遠端請求。<br>
     * 3. 當url中有設定check=false時,連線失敗不能丟擲異常,並內部自動恢復。<br>
     *
     * @param
<T> 服務的型別 * @param type 服務的型別 * @param url 遠端服務的URL地址 * @return invoker 服務的本地代理 * @throws RpcException 當連線服務提供方失敗時丟擲 */
@Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

這裡URL引數為

registry://註冊中心地址:2181/com.alibaba.dubbo.registry.RegistryService?application=demo&dubbo=2.8.4&file=快取檔案路徑&logger=log4j&organization=demo&owner=weiythi&pid=16072&
refer=application%3Dprojectname%26default.check%3Dfalse%26default.timeout%3D10000%26dubbo%3D2.8.4%26interface%3Dcom.companyName.projectName.moduleName.protocol.dubbo.ServiceName%26logger%3Dlog4j%26
methods%3DadSync%26organization%3DconpanyName%26owner%3Dweiythi%26pid%3D16072%26protocol%3Ddubbo%26revision%3D1.0.0-SNAPSHOT%26side%3Dconsumer%26timestamp%3D1511402541402&
registry=zookeeper&timestamp=1511402541632

已知針對針對該連線的protocol為registry ,所以會使用如下的擴充套件點

  com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
                                                                                                       .getExtension("registry");

返回 META-INF\dubbo\internal\com.alibaba.dubbo.rpc.Protocol 檔案 其內容如下

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol

發現有兩個包裝類

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper

根據擴充套件點的規則
兩個包裝類的套用是無序的,即有可能是以下兩種情況

ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol
ProtocolListenerWrapper->ProtocolFilterWrapper->RegistryProtocol

最終呼叫物件,都是RegistryProtocol,那麼看一下該類的refer方法實現。

    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //該操作會將url上的registry引數設定成協議,然後移除URL上的registry引數
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //根據對應的協議,取得對應的註冊中心實現,以zookeeper為例,這裡取到的registry為com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry
        Registry registry = registryFactory.getRegistry(url);
        //根據上文我們傳入的引數是服務介面物件,所以該條件不成立
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0 ) {
            if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                    || "*".equals( group ) ) {
                return doRefer( getMergeableCluster(), registry, type, url );
            }
        }
        //檢視doRefer方法,這裡傳入了一個cluster物件
        return doRefer(cluster, registry, type, url);
    }

cluster是一個全域性變數,其定義如下

  private Cluster cluster;

其使用擴充套件點的com.alibaba.dubbo.common.extension.ExtensionLoader.injectExtension(T instance) 方法,對其完成賦值

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //這個類姑且叫做註冊目錄類
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        //初始化一個訂閱連結 consumer://本機地址(無埠號)/服務介面名?xxxx=yyyy......
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            //會在zk中建立節點,節點parent path為
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        //訂閱服務
        //1.向url匯流排中新增訂閱的主題,一共三個分別為 providers,configurators,routers
        //2.呼叫 registry.subscribe(url, this);
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                + "," + Constants.CONFIGURATORS_CATEGORY
                + "," + Constants.ROUTERS_CATEGORY));

        return cluster.join(directory);
    }

directory.subscribe(url) 呼叫registry.subscribe(url, this); 方法,第二個引數為其本身,他的UML圖如下,可見其本身就是一個com.alibaba.dubbo.registry.NotifyListener(事件監聽器)
這裡寫圖片描述
subscribe 方法最終會呼叫com.alibaba.dubbo.registry.zookeeper.doSubscribe,其內部程式碼如下

    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                //不管,省略.
            } else {
                List<URL> urls = new ArrayList<URL>();
                //toCategoriesPath 會生成一個String 陣列,陣列元素如下
                //  /dubbo/服務介面名/providers
                //  /dubbo/服務介面名/configurators
                //  /dubbo/服務介面名/routers
                for (String path : toCategoriesPath(url)) {
                    // 快取相關處理,即如果為當前consumer建立對應監聽器,則取快取中的,如果沒有則新建
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //建立節點,如果第二個引數值為true則為臨時節點,此處建立了永久節點
                    zkClient.create(path, false);
                    //新增監視點事件監聽器,方法內部會呼叫 org.I0Itec.zkclient.ZkClient.subscribeChildChanges(path, listener); 方法,
                    //並返回該方法的返回值(子節點列表)
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {

                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 該方法繼承自 com.alibaba.dubbo.registry.support.FailbackRegistry
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

這裡注意 toUrlsWithEmpty(url, parentPath, currentChilds) 方法,該方法會根據consumerUrl過濾zk中的providerUrl,拿到相匹配的服務列表,否則會根據consumerUrl生成帶有category引數的empty協議url(這個empty url的作用我沒理解上去)

已知com.alibaba.dubbo.registry.support.FailbackRegistry類中的notify方法會做以下幾個操作
1.判斷url和listener是否為null 如果為null則丟擲IllegalArgumentException 異常
2.引數檢驗沒有問題,則呼叫doNotify(URL url, NotifyListener listener, List urls) 方法
doNotify(URL url, NotifyListener listener, List urls) 方法呼叫 com.alibaba.dubbo.registry.support.AbstractRegistry 類中的
protected void notify(@NotNull URL url,@NotNull NotifyListener listener,List urls) 方法
如果AbstractRegistry類的notify方法中丟擲異常則將失敗的通知請求記錄到失敗列表,定時重試
目前為止的呼叫順序圖:
這裡寫圖片描述
以下是com.alibaba.dubbo.registry.support.AbstractRegistry 類的protected void notify(@NotNull URL url,@NotNull NotifyListener listener,List urls) 方法實現

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        //......省略引數校驗和日誌輸出......//
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        for (URL u : urls) {
            //該方法會判斷服務介面是否一致,group version 等配置是否一致,通過這個方法可以過濾當前consumer不匹配的服務。
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //在快取檔案中儲存資訊 (dubbo.cache.file 配置的檔案)
            saveProperties(url);
            //呼叫實際的監聽器 ,針對服務的發現來說,此處呼叫的為 com.alibaba.dubbo.registry.integration.RegistryDirectory 類的相關實現
            listener.notify(categoryList);
        }
    }

com.alibaba.dubbo.registry.integration.RegistryDirectory的notify實現如下

public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        //將傳入url按照category分類處理
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }

        // 重新整理 configurators
        if (configuratorUrls != null && configuratorUrls.size() >0 ){
            this.configurators = toConfigurators(configuratorUrls);
        }
        // 重新整理 routers
        if (routerUrls != null && routerUrls.size() >0 ){
            List<Router> routers = toRouters(routerUrls);
            if(routers != null){ // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // 合併override引數
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && localConfigurators.size() > 0) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // 重新整理 invokers,如果是空協議(empty://)會關閉所有的invoker
        //根據invokerURL列表轉換為invoker列表。轉換規則如下:
        //1.如果url已經被轉換為invoker,則不在重新引用,直接從快取中獲取,注意如果url中任何一個引數變更也會重新引用
        //2.如果傳入的invoker列表不為空,則表示最新的invoker列表
        //3.如果傳入的invokerUrl列表是空,則表示只是下發的override規則或route規則,需要重新交叉對比,決定是否需要重新引用。
        refreshInvoker(invokerUrls);
    }
至此服務的發現流程已全部瞭解,即com.alibaba.dubbo.registry.integration.RegistryProtocol的 doRefer方法中的程式碼行
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

呼叫流程已全部完成,該方法還剩最後一行
cluster.join(directory); 但該方法與本文無關,整理dubbo服務實現的時候再說。