1. 程式人生 > >dubbo的服務呼叫過程

dubbo的服務呼叫過程

在這裡插入圖片描述服務消費的過程:referenceConfig類的init方法呼叫Protocol的refer方法,生成invoker例項,然後把Invoker轉換為客戶端需要的介面。

2、原始碼解析

dubbo的消費端初始化在ReferenceConfig的get()方法

 public synchronized T get() {
        if (destroyed){
            throw new IllegalStateException("Already destroyed!");
        }
    	if (ref == null) {
    		init();
    	}
    	return ref;
    }

init()方法做了一系列獲取url引數的操作後,進行ref = createProxy(map);獲取代理。
在createProxy方法中,根據url是做本地引用,直接根據url的ip和埠號做直接引用,還是通過註冊中心獲取服務地址,進行引用。這裡我們直接看最後一種(這也是最常用的)

	private T createProxy(Map<String, String> map) {
		URL tmpUrl = new URL("temp", "localhost", 0, map);
		final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                //預設情況下如果本地有服務暴露,則引用本地服務.
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }
		
		if (isJvmRefer) {
			URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
			invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
		} else {
            if (url != null && url.length() > 0) { // 使用者指定URL,指定的URL可能是對點對直連地址,也可能是註冊中心URL
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // 通過註冊中心配置拼裝URL
            	List<URL> us = loadRegistries(false);
            	if (us != null && us.size() > 0) {
                	for (URL u : us) {
                	    URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                	    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
            	}
            	if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最後一個registry url
                    }
                }
                if (registryURL != null) { // 有 註冊中心協議的URL
                    // 對有註冊中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                }  else { // 不是 註冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && ! invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // 建立服務代理
        return (T) proxyFactory.getProxy(invoker);
    }

上面程式碼中
List us = loadRegistries(false);獲取到註冊中心的地址

[registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubboTest-test-web&dubbo=2.5.3&pid=10628&registry=zookeeper&timestamp=1543316713611]

然後進入 invoker = refprotocol.refer(interfaceClass, urls.get(0));獲取invoker。
這裡refprotocol是
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
需要根據url的名稱來載入對應的spi擴充套件,找到真正的類。根據url,是register,所以會定位到com.alibaba.dubbo.registry.integration.RegistryDirectory類。找到類的refer方法:

	public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
	
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //根據url找到註冊中心,這裡用的是zookeeper註冊中心
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
        	return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0 ) {
            if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                    || "*".equals( group ) ) {
                return doRefer( getMergeableCluster(), registry, type, url );
            }
        }
        return doRefer(cluster, registry, type, url);
    }

上面方法中,找到註冊中心後,呼叫doRefer(cluster, registry, type, url),返回invoker

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);
    }

上面程式碼中,subscribeUrl 的值是:

consumer://192.168.86.1/cn.andy.dubbo.DataService?application=dubboTest-test-web&check=false&connections=5&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&pid=10628&retries=0&revision=0.0.1-SNAPSHOT&side=consumer&timestamp=1543316675743

這個url就是向註冊中心註冊的消費端的地址。其中註冊中心的地址分為永久地址和臨時地址。在這個例子中,永久地址就是cn.andy.dubbo.DataService,其下分為provider和consumer以及monitor等。其中,服務提供端的ip地址和埠號作為臨時地址放在provider下,消費端的ip地址和埠號放在consumer下。同時,消費端會對provider下的地址進行訂閱,當其有變化時,會通知消費端,這樣,消費端和服務端就對應起來了。

上面程式碼中directory可以理解為服務端invoker的list集合。
RegistryDirectory directory = new RegistryDirectory(type, url);
裡面的成員
private volatile Map<String, Invoker> urlInvokerMap,儲存了服務端的invoker集合。
direct通過訂閱註冊中心的相關路徑:
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ “,” + Constants.CONFIGURATORS_CATEGORY
+ “,” + Constants.ROUTERS_CATEGORY));
當服務端的invoker集合發生變化,或者路由,配置等發生變化時,都會通知這個director。以invoker發生變化為例(即服務端的提供者數量發生變化)。

    public void subscribe(URL url) {
       setConsumerUrl(url);
       registry.subscribe(url, this);
   }

裡面的this,就是RegistryDirectory,那麼檢視RegistryDirectory的notify方法:


       List<URL> invokerUrls = new ArrayList<URL>();
       List<URL> routerUrls = new ArrayList<URL>();
       List<URL> configuratorUrls = new ArrayList<URL>();
       for (URL url : urls) {
           String protocol = url.getProtocol();
           String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
           if (Constants.ROUTERS_CATEGORY.equals(category) 
                   || Constants.ROUTE_PROTOCOL.equals(protocol)) {
               routerUrls.add(url);
           } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                   || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
               configuratorUrls.add(url);
           } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
               invokerUrls.add(url);
           } else {
               logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
           }
       }
       // configurators 
       if (configuratorUrls != null && configuratorUrls.size() >0 ){
           this.configurators = toConfigurators(configuratorUrls);
       }
       // routers
       if (routerUrls != null && routerUrls.size() >0 ){
           List<Router> routers = toRouters(routerUrls);
           if(routers != null){ // null - do nothing
               setRouters(routers);
           }
       }
       List<Configurator> localConfigurators = this.configurators; // local reference
       // 合併override引數
       this.overrideDirectoryUrl = directoryUrl;
       if (localConfigurators != null && localConfigurators.size() > 0) {
           for (Configurator configurator : localConfigurators) {
               this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
           }
       }
       // providers,根據url重新整理invoker列表
       refreshInvoker(invokerUrls);
   
   /**
    * 根據invokerURL列表轉換為invoker列表。轉換規則如下:
    * 1.如果url已經被轉換為invoker,則不在重新引用,直接從快取中獲取,注意如果url中任何一個引數變更也會重新引用
    * 2.如果傳入的invoker列表不為空,則表示最新的invoker列表
    * 3.如果傳入的invokerUrl列表是空,則表示只是下發的override規則或route規則,需要重新交叉對比,決定是否需要重新引用。
    * @param invokerUrls 傳入的引數不能為null
    */
   private void refreshInvoker(List<URL> invokerUrls){
       if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
               && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
           this.forbidden = true; // 禁止訪問
           this.methodInvokerMap = null; // 置空列表
           destroyAllInvokers(); // 關閉所有Invoker
       } else {
           this.forbidden = false; // 允許訪問
           Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
           if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
               invokerUrls.addAll(this.cachedInvokerUrls);
           } else {
               this.cachedInvokerUrls = new HashSet<URL>();
               this.cachedInvokerUrls.addAll(invokerUrls);//快取invokerUrls列表,便於交叉對比
           }
           if (invokerUrls.size() ==0 ){
           	return;
           }
           Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表
           Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名對映Invoker列表
           // state change
           //如果計算錯誤,則不進行處理.
           if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
               logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
               return ;
           }
           this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
           this.urlInvokerMap = newUrlInvokerMap;
           try{
               destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 關閉未使用的Invoker
           }catch (Exception e) {
               logger.warn("destroyUnusedInvokers error. ", e);
           }
       }
   }

生成directory之後,通過return cluster.join(directory)返回invoker。這裡的cluster可以是failoverCluster、failfastCluster等。在我們後面消費端進行方法呼叫時,cluster會根據規則將invoker集合返回一個invoker給消費端。
在上面程式碼中,檢視 Map<String, Invoker> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表。在這個方法中,消費端開啟netty客戶端。

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if(urls == null || urls.size() == 0){
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
        	//如果reference端配置了protocol,則只選擇匹配的protocol
        	if (queryProtocols != null && queryProtocols.length() >0) {
        		boolean accept = false;
        		String[] acceptProtocols = queryProtocols.split(",");
        		for (String acceptProtocol : acceptProtocols) {
        			if (providerUrl.getProtocol().equals(acceptProtocol)) {
        				accept = true;
        				break;
        			}
        		}
        		if (!accept) {
        			continue;
        		}
        	}
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() 
                        + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);
            
            String key = url.toFullString(); // URL引數是排序的
            if (keys.contains(key)) { // 重複URL
                continue;
            }
            keys.add(key);
            // 快取key為沒有合併消費端引數的URL,不管消費端如何合併引數,如果服務端URL發生變化,則重新refer
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // 快取中沒有,重新refer
                try {
                	boolean enabled = true;
                	if (url.hasParameter(Constants.DISABLED_KEY)) {
                		enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
                	} else {
                		enabled = url.getParameter(Constants.ENABLED_KEY, true);
                	}
                	if (enabled) {
                		invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                	}
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
                }
                if (invoker != null) { // 將新的引用放入快取
                    newUrlInvokerMap.put(key, invoker);
                }
            }else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

上面的 URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // URL引數是排序的
是將privoderurl和消費端的url合併,相當於有多少個provider,就有多少個key(因為消費端只有一個),其格式如下:

dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubboTest-test-web&check=false&connections=5&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=55972&retries=0&revision=0.0.1-SNAPSHOT&service.filter=andyFilter&side=consumer&timeout=60000&timestamp=1543369485194&token=1234567

前面的是服務提供端的ip和埠號等,但是後面的side=consumer。相當於url是providerurl合併了消費端的引數。
然後根據這個url,生成invoker
invoker = new InvokerDelegete(protocol.refer(serviceType, url), url, providerUrl);
其中,serviceType=interface cn.andy.dubbo.DataService,url就是上面的key。
在protocol.refer(serviceType, url)中完成netty的客戶端開啟。
而requestHandler 作為一個channel,是netty的處理channel。

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要處理高版本呼叫低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }