1. 程式人生 > >Dubbo原始碼分析之四:服務的呼叫

Dubbo原始碼分析之四:服務的呼叫

在呼叫服務之前,先得獲得服務的引用。
ReferenceBean 就是服務的引用。它實現了一個FactoryBean介面,在我們需要一個服務時,FactoryBean介面的getObject() 方法會被呼叫。

public Object getObject() throws Exception {
      return get(); //返回服務的代理。
}
。。。
// get() ->init()->createProxy(map)
// 過程主要是一些引數的檢查,初始化等準備工作
// 我們重點看看createProxy 方法,引數為服務需要的所有配置資訊

private T createProxy
(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { //預設情況下如果本地有服務暴露,則引用本地服務.
isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if
(logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { if (url != null && url.length() > 0) { // 使用者指定URL,指定的URL可能是對點對直連地址,也可能是註冊中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 通過註冊中心配置拼裝URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { // 我們重點看下這裡,invokers 存放所有可用的服務呼叫者 // 然後叢集策略會根據這些呼叫者做一些負載均衡處理。 // 見下一段 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最後一個registry url } } if (registryURL != null) { // 有 註冊中心協議的URL // 對有註冊中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 加入叢集,返回一個叢集呼叫者。它內部會做一些負載均衡的處理。 // 預設的叢集策略為自動恢復,預設的負載均衡為隨機,如果呼叫者只有2個,則為輪詢。 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是 註冊中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } 。。。// 省略 // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }

接下來 我們看看 refprotocol.refer(interfaceClass, url)
refprotocol 其實是一個代理,代理誰不知道,只有在呼叫時才能確定。
這個代理類是動態生成,動態編譯的。後面有機會我們再講。
通過這個動態代理的使用,最終會呼叫到 RegistryProtocol.refer 方法。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //根據URL獲得對應的註冊器。
        // 如果是ZK,則 registry 為ZookeeperRegistry
        Registry registry = registryFactory.getRegistry(url); 
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        // 如果消費者使用了分組
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0 ) {
            if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                    || "*".equals( group ) ) {
                return doRefer( getMergeableCluster(), registry, type, url );
            }
        }
        // 返回呼叫器
        return doRefer(cluster, registry, type, url);
    }

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // RegistryDirectory 可以把它理解成註冊資源,其中包含了消費者,服務,路由等相關資訊。
        // 呼叫者需要這些資訊來進行呼叫
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry); // 註冊器
        directory.setProtocol(protocol); // 協議
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
             // 註冊消費者
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        // 訂閱,
        // 會從註冊中心獲得對應的服務提供者的資訊
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        // 預設的cluster 為FailFastCluster
        // 返回 FailfastClusterInvoker        
        return cluster.join(directory);
    }

接下來,我們看看 FailfastClusterInvoker 。FailfastClusterInvoker 繼承 AbstractClusterInvoker 實現了Invoker 介面。
我們看看AbstractClusterInvoker.invoke

public Result invoke(final Invocation invocation) throws RpcException {

        checkWheatherDestoried();// 檢查此呼叫者是否被銷燬

        LoadBalance loadbalance; // 負載均衡演算法 

        // 獲得對應的Invoker。主要從RegistryDirectory 中獲取。
        List<Invoker<T>> invokers = list(invocation);

        if (invokers != null && invokers.size() > 0) {
             // 確定負載均衡演算法,預設為隨機
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        //如果是非同步操作預設新增invocation id
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 呼叫FailfastClusterInvoker.doInvoke
        return doInvoke(invocation, invokers, loadbalance);
    }

我們看看 list(invocation) 是怎麼獲取的。
呼叫關係為
list(invocation) -> AbstractDirectory.list(invocation) ->RegistryDirectory.doList(invocation)

// RegistryDirectory.doList
public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) { // 如果被禁止消費
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " +  NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " + Version.getVersion() + ", Please check registry access list (whitelist/blacklist).");
        }
        List<Invoker<T>> invokers = null;
        // methodInvokerMap 為方法到呼叫器的對映
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if(args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根據第一個引數列舉路由
            }
            if(invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if(invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if(invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

重點在 methodInvokerMap 中。那麼其中的資料是這麼來的呢?答案在
RegistryProtocol.doRefer 中的 directory.subscribe 。上面也提到過。

// RegistryDirectory
public void subscribe(URL url) {
        setConsumerUrl(url);
        registry.subscribe(url, this); //訂閱,回撥notify方法
    }

// notify() -> refreshInvoker() -> toInvokers()
// methodInvokerMap 的賦值就在 refreshInvoker 中

而 invoker 也會被初始化為 InvokerDelegete。InvokerDelegete的建立在toInvokers方法中。
InvokerDelegete 是 RegistryDirectory 中內部類。也就是一個代理。
最終會根據 protocol 呼叫最終的 Invoker。
例如:dubbo
protocol=>DubboProtocol
invoker=> DubboInvoker

// DubboProtocol
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {

        // modified by lishen
        // 序列優化。主要是SerializationOptimizer
        // 將需要序列化的類放入SerializationOptimizer
        optimizeSerialization(url);

        // create rpc invoker.
        // 建立RPC 呼叫者
        // 底層通訊主要看選擇了那種client
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

 // 獲得client,連線
 private ExchangeClient[] getClients(URL url){
        //是否共享連線
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        //如果connections不配置,則共享連線,否則每服務每連線
        if (connections == 0){
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect){
                // 獲得共享連線,沒有則建立(調initClient(url))  
                // 能不能共享主要看 url.address           
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url); // 建立client
            }
        }
        return clients;
    }

    /**
     * 建立新連線.
     */
    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
        boolean compatible = (version != null && version.startsWith("1.0."));
        // 設定DubboCodec 編碼器 (序列化的操作就在裡面)
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        //預設開啟heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO存在嚴重效能問題,暫時不允許使用
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client ;
        try {
            //設定連線應該是lazy的 
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
                client = new LazyConnectExchangeClient(url ,requestHandler);
            } else {
                // 重點看這裡,獲得一個連線
                client = Exchangers.connect(url ,requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url
                    + "): " + e.getMessage(), e);
        }
        return client;
    }

建立一個連線,最終定為到了Exchangers.connect(url ,requestHandler);

// Exchangers
    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        // 如果沒有編碼器,則設定一個ExchangeCodec編碼器
        // 但前面已經設定過了
        // 後面會講
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 獲得一個連線
        return getExchanger(url).connect(url, handler);
    }

    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); // 預設為header
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        // 最終獲得 HeaderExchanger
        // 怎麼獲得前面也講過,主要通過類路徑掃描定義檔案
        // header -> HeaderExchanger
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

我們在看看HeaderExchanger

// HeaderExchanger
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 根據底層client 封裝一個HeaderExchangeClient
    // 重點看Transporters.connect。 
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

Transporters.connect 最終會呼叫
ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
如果沒有特別指定,將最終使用NettyTransporter。
netty為預設的通訊方式。(見Transporter.class 的註解)
在NettyTransporter中會直接new NettyClient

// NettyClient
// 此方法將由父類的構造方法呼叫,然後再呼叫connect()
@Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true); //長連線
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                // getcodec()獲取到的是DubboCodes編碼器。序列化的操作就在對應的Codes類中

                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder()); //解碼
                pipeline.addLast("encoder", adapter.getEncoder());// 編碼
                pipeline.addLast("handler", nettyHandler); //處理器
                return pipeline;
            }
        });
    }

doOpen() 呼叫完回撥用doConnect,來完成通道的建立。
並儲存在 NettyClient.this.channel = newChannel;
資料的傳送在父類的send方法。
傳送方法的呼叫來源於DubboInvoker.doInvoke 方法。