1. 程式人生 > >dubbo遠端呼叫原始碼分析(一):客戶端傳送請求

dubbo遠端呼叫原始碼分析(一):客戶端傳送請求

dubbo遠端呼叫的原始碼分析,分成了三篇文章地址分別如下:

本文分為三部分,分別是:

消費端註冊部分

消費端動態代理部分

消費端事件處理器部分

消費端註冊部分

在分析dubbo遠端呼叫的時候,要從dubbo消費端(consumer)註冊開始說起

dubbo的consumer在註冊的時候,在配置檔案中是用以下形式註冊的:

<dubbo:reference id="sequenceService" interface="SequenceService"/>

在服務啟動的時候,這個配置項會生成一個ReferenceBean的物件,當我們在程式碼中使用SequenceService時,實際上我們使用的是這個ReferenceBean物件呼叫getObject()方法之後返回的物件,這個物件就是SequenceService的代理物件。

先看一下ReferenceBean類的getObject()方法:

   public Object getObject() throws Exception {

        return get();

   }

get()方法在ReferenceBean的父類ReferenceConfig中:

   public synchronized T get() {

        if (destroyed) {  

            throw new IllegalStateException("Already destroyed!");

        }

        if (ref == null) {

            init();

        }

        return ref;

   }

返回的這個ref就是代理類,下面看一下用來初始化的init()方法:

   private void init() {

        if (initialized) {

            return;

        }

        initialized = true;

        if (interfaceName == null ||interfaceName.length() == 0) {

            throw new IllegalStateException("<dubbo:reference interface=\"\" />interface not allow null!");

        }

        // 獲取消費者全域性配置

        checkDefault();

        appendProperties(this);

        if (getGeneric() == null &&getConsumer() != null) {

           setGeneric(getConsumer().getGeneric());

        }

        if(ProtocolUtils.isGeneric(getGeneric())) {

            interfaceClass =GenericService.class;

        } else {

            try {

                interfaceClass =Class.forName(interfaceName, true, Thread.currentThread()

                       .getContextClassLoader());

            } catch (ClassNotFoundException e){

                throw newIllegalStateException(e.getMessage(), e);

            }

           checkInterfaceAndMethods(interfaceClass, methods);

        }

        String resolve =System.getProperty(interfaceName);

        String resolveFile = null;

        if (resolve == null || resolve.length()== 0) {

            resolveFile =System.getProperty("dubbo.resolve.file");

            if (resolveFile == null ||resolveFile.length() == 0) {

                File userResolveFile = newFile(new File(System.getProperty("user.home")),"dubbo-resolve.properties");

                if (userResolveFile.exists()) {

                    resolveFile =userResolveFile.getAbsolutePath();

                }

            }

            if (resolveFile != null &&resolveFile.length() > 0) {

                Properties properties = newProperties();

                FileInputStream fis = null;

                try {

                    fis = newFileInputStream(new File(resolveFile));

                    properties.load(fis);

                } catch (IOException e) {

                    throw new IllegalStateException("Unload " + resolveFile + ", cause: "+ e.getMessage(), e);

                } finally {

                    try {

                        if (null != fis)fis.close();

                    } catch (IOException e) {

                       logger.warn(e.getMessage(), e);

                    }

                }

                resolve =properties.getProperty(interfaceName);

            }

        }

        if (resolve != null &&resolve.length() > 0) {

           url = resolve;

            if (logger.isWarnEnabled()) {

                if (resolveFile != null&& resolveFile.length() > 0) {

                    logger.warn("Usingdefault dubbo resolve file " + resolveFile + " replace " +interfaceName + "" + resolve + " to p2p invoke remoteservice.");

                } else {

                    logger.warn("Using-D" + interfaceName + "=" + resolve + " to p2p invokeremote service.");

                }

            }

        }

        if (consumer != null) {

           if (application == null){

                application =consumer.getApplication();

            }

            if (module == null) {

                module = consumer.getModule();

            }

            if (registries == null) {

                registries =consumer.getRegistries();

            }

            if (monitor == null) {

                monitor =consumer.getMonitor();

            }

        }

        if (module != null) {

            if (registries == null) {

                registries = module.getRegistries();

            }

            if (monitor == null) {

                monitor = module.getMonitor();

            }

        }

        if (application != null) {

            if (registries == null) {

                registries =application.getRegistries();

            }

            if (monitor == null) {

                monitor =application.getMonitor();

            }

        }

        checkApplication();

        checkStubAndMock(interfaceClass);

        Map<String, String> map = new HashMap<String, String>();

        Map<Object, Object> attributes =new HashMap<Object, Object>();

        map.put(Constants.SIDE_KEY,Constants.CONSUMER_SIDE);

        map.put(Constants.DUBBO_VERSION_KEY,Version.getVersion());

       map.put(Constants.TIMESTAMP_KEY,String.valueOf(System.currentTimeMillis()));

        if (ConfigUtils.getPid() > 0) {

            map.put(Constants.PID_KEY,String.valueOf(ConfigUtils.getPid()));

        }

        if (!isGeneric()) {

            String revision =Version.getVersion(interfaceClass, version);

            if (revision != null &&revision.length() > 0) {

                map.put("revision",revision);

            }

 

            String[] methods =Wrapper.getWrapper(interfaceClass).getMethodNames();

            if (methods.length == 0) {

                logger.warn("NO method found in service interface " + interfaceClass.getName());

                map.put("methods",Constants.ANY_VALUE);

            } else {

                map.put("methods",StringUtils.join(new HashSet<String>(Arrays.asList(methods)),","));

            }

        }

        map.put(Constants.INTERFACE_KEY,interfaceName);

        appendParameters(map, application);

        appendParameters(map, module);

        appendParameters(map, consumer,Constants.DEFAULT_KEY);

        appendParameters(map, this);

        String prifix =StringUtils.getServiceKey(map);

        if (methods != null &&methods.size() > 0) {

            for (MethodConfig method : methods){

                appendParameters(map, method,method.getName());

                String retryKey =method.getName() + ".retry";

                if (map.containsKey(retryKey)){

                    String retryValue =map.remove(retryKey);

                    if ("false".equals(retryValue)) {

                       map.put(method.getName() + ".retries", "0");

                    }

                }

                appendAttributes(attributes,method, prifix + "." + method.getName());

                checkAndConvertImplicitConfig(method,map, attributes);

            }

        }

        //attributes通過系統context進行儲存.

       StaticContext.getSystemContext().putAll(attributes);

        ref = createProxy(map);

   }

基本意思就是整合各種配置,比如類名、方法、版本等等,組成一個map,最終通過createProxy()方法生成代理類,createProxy()方法的程式碼如下:

   @SuppressWarnings({"unchecked", "rawtypes","deprecation"})

   private T createProxy(Map<String, String> map) {

        URL tmpUrl = new URL("temp","localhost", 0, map);

        final boolean isJvmRefer;

        if (isInjvm() == null) {

            if (url != null &&url.length() > 0) { //指定URL的情況下,不做本地引用

                isJvmRefer = false;

            } else if(InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {

                //預設情況下如果本地有服務暴露,則引用本地服務.

                isJvmRefer = true;

            } else {

                isJvmRefer = false;

            }

        } else {

            isJvmRefer =isInjvm().booleanValue();

        }

 

        if (isJvmRefer) {

            URL url = new URL(Constants.LOCAL_PROTOCOL,NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);

            invoker =refprotocol.refer(interfaceClass, url);

            if (logger.isInfoEnabled()) {

                logger.info("Using injvmservice " + interfaceClass.getName());

            }

        } else {

            if (url != null &&url.length() > 0) { // 使用者指定URL,指定的URL可能是對點對直連地址,也可能是註冊中心URL

                String[] us =Constants.SEMICOLON_SPLIT_PATTERN.split(url);

                if (us != null &&us.length > 0) {

                    for (String u : us) {

                        URL url =URL.valueOf(u);

                        if (url.getPath() ==null || url.getPath().length() == 0) {

                            url =url.setPath(interfaceName);

                        }

                        if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {

                           urls.add(url.addParameterAndEncoded(Constants.REFER_KEY,StringUtils.toQueryString(map)));

                        } else {

                           urls.add(ClusterUtils.mergeUrl(url, map));

                        }

                    }

                }

            } else { // 通過註冊中心配置拼裝URL

                List<URL> us =loadRegistries(false);

                if (us != null &&us.size() > 0) {

                    for (URL u : us) {

                        URL monitorUrl =loadMonitor(u);

                        if (monitorUrl != null){

                           map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));

                        }

                       urls.add(u.addParameterAndEncoded(Constants.REFER_KEY,StringUtils.toQueryString(map)));

                    }

                }

                if (urls == null || urls.size()== 0) {

                    throw newIllegalStateException("No such any registry to reference " +interfaceName + " on the consumer " + NetUtils.getLocalHost() +" use dubbo version " + Version.getVersion() + ", please config<dubbo:registry address=\"...\" /> to your springconfig.");

                }

            }

 

            if (urls.size() == 1) {

                invoker =refprotocol.refer(interfaceClass, urls.get(0));

            } else {

                List<Invoker<?>>invokers = new ArrayList<Invoker<?>>();

               URL registryURL =null;

                for (URL url : urls) {

                   invokers.add(refprotocol.refer(interfaceClass, url));

                    if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {

                        registryURL = url; // 用了最後一個registryurl

                    }

                }

                if (registryURL != null) { // 有 註冊中心協議的URL

                    // 對有註冊中心的Cluster只用 AvailableCluster

                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY,AvailableCluster.NAME);

                    invoker = cluster.join(new StaticDirectory(u, invokers));

                } else { // 不是 註冊中心的URL

                    invoker = cluster.join(new StaticDirectory(invokers));

                }

            }

        }

 

        Boolean c = check;

        if (c == null && consumer !=null) {

            c = consumer.isCheck();

        }

        if (c == null) {

            c = true; // default true

        }

        if (c && !invoker.isAvailable()){

            throw new IllegalStateException("Failed to check the status of the service " +interfaceName + ". No provider available for the service " + (group== null ? "" : group + "/") + interfaceName + (version ==null ? "" : ":" + version) + " from the url " +invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() +" use dubbo version " + Version.getVersion());

        }

        if (logger.isInfoEnabled()) {

            logger.info("Refer dubboservice " + interfaceClass.getName() + " from url " +invoker.getUrl());

        }

        // 建立服務代理

        return (T)proxyFactory.getProxy(invoker);

   }

一開始呼叫isInjvm()方法判斷目標介面是否在本地就有,如果本地就有,直接呼叫本地的介面。

如果本地沒有,就在配置中找有沒有使用者指定的url,如果指定了就使用使用者指定的url提供的介面。

如果沒有指定url,則從註冊中心中獲得目標url列表。

如果urls.size()==1,則直接用這個url獲得invoker,這個invoker就是最後用來建立動態代理用的。

當urls.size()>1時,有registryURL屬性,如果配置了註冊中心協議Protocol,則只用AvailableCluster得到invoker。

cluster.join()方法是用來獲得invoker的,cluster屬性的定義:

private transient volatile Invoker<?> invoker;

Invoker是個介面,根據配置的不同會使用不同的實現類,比如上面的AvailableCluster,他的join()方法是這樣的:

   public <T> Invoker<T> join(Directory<T> directory)throws RpcException {

        return new AbstractClusterInvoker<T>(directory) {

            public Result doInvoke(Invocationinvocation, List<Invoker<T>> invokers, LoadBalance loadbalance)throws RpcException {

                for (Invoker<T> invoker :invokers) {

                    if (invoker.isAvailable()){

                        returninvoker.invoke(invocation);

                    }

                }

                throw new RpcException("No provider available in " + invokers);

            }

        };

   }

實際上這個join()方法返回了一個AbstractClusterInvoker物件,並重寫了他的doInvoke()方法,這個方法在動態代理實際被呼叫時會用到。

現在回到createProxy()方法,最後用得到的invoker通過proxyFactory建立動態代理,至此動態代理就建立完了。

消費端動態代理部分

當我們在程式碼中配置好的SequenceService進行遠端呼叫時,實際上呼叫的是對應Invoker的invoke()方法,invoker是一個介面,對於這個介面的實現大概是這樣的:

Invoker

----AbstractInvoker

----AbstractClusterInvoker

----AbstractProxyInvoker

----DelegateInvoker

----MockClusterInvoker

----MergeableClusterInvoker

----InvokerListenerAdapter

----InvokerListenerAdapter

……

還有很多

AbstractInvoker就是用來遠端通訊的Invoker

AbstractClusterInvoker是provider是叢集時使用Invoker,比AbstractInvoker多了負載均衡,選擇provider的過程,最終確定了呼叫的provider之後還是會呼叫AbstractInvoker中的invoke()方法。

我們先看AbstractClusterInvoker的invoke()方法:

    public Result invoke(final Invocation invocation) throws RpcException {


       checkWhetherDestroyed();


        LoadBalanceloadbalance;
 

       List<Invoker<T>> invokers = list(invocation);

        if (invokers !=null && invokers.size() > 0) {

            loadbalance =ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()

                   .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));

        } else {

            loadbalance=ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);

        }

       RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        return doInvoke(invocation, invokers, loadbalance);

    }

首先判斷當前consumer是否已經destory了,然後用list(invocation)方法獲得所有的provider資訊,獲得負載均衡演算法LoadBalance,設定同步屬性,最後呼叫doInvoke方法。

AbstractClusterInvoker的doInvoke()方法是個抽象方法:

protected abstract Result doInvoke(Invocation invocation,List<Invoker<T>> invokers,

                                      LoadBalance loadbalance) throws RpcException;

他的子類有很多,比如:

AvailableClusterInvoker 選擇第一個可用的provider。 

FailBackClusterInvoker失敗自動恢復,後臺記錄失敗請求,定時重發,通常用於訊息通知操作。

FailfastClusterInvoker快速失敗,只發起一次呼叫,失敗立即報錯,通常用於非冪等性的寫操作。

FailoverClusterInvoker失敗轉移,當出現失敗,重試其它伺服器,通常用於讀操作,但重試會帶來更長延遲。

FailsafeClusterInvoker失敗安全,出現異常時,直接忽略,通常用於寫入審計日誌等操作。

ForkingClusterInvoker並行呼叫,只要一個成功即返回,通常用於實時性要求較高的操作,但需要浪費更多服務資源。

具體使用哪個得看配置

我們以之前提到的AvailableClusterInvoker為例,看一下doInvoke()方法:

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,LoadBalance loadbalance) throws RpcException {

        for(Invoker<T> invoker : invokers) {

            if(invoker.isAvailable()) {

                return invoker.invoke(invocation);

            }

        }

        throw new RpcException("No provider available in " + invokers);

    }

就是判斷invoker是否可用,可用就直接呼叫invoker的invoke()方法,實際上呼叫的還是AbstractInvoker的invoke()方法,如果不是叢集就直接調這個方法了,該方法程式碼如下:

    public Result invoke(Invocation inv) throws RpcException {

        if(destroyed.get()) {

            throw new RpcException("Rpc invoker for service " + this + " on consumer" + NetUtils.getLocalHost()

                    +" use dubbo version " + Version.getVersion()

                    +" is DESTROYED, can not be invoked any more!");

        }

        RpcInvocation invocation = (RpcInvocation) inv;

       invocation.setInvoker(this);

        if (attachment!= null && attachment.size() > 0) {

            invocation.addAttachmentsIfAbsent(attachment);

        }

        Map<String,String> context = RpcContext.getContext().getAttachments();

        if (context !=null) {

           invocation.addAttachmentsIfAbsent(context);

        }

        if(getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY,false)) {

           invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());

        }

       RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


        try {

            return doInvoke(invocation);

        } catch(InvocationTargetException e) { // biz exception

            Throwablete = e.getTargetException();

            if (te ==null) {

                return new RpcResult(e);

            } else {

                if (te instanceof RpcException) {

                   ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);

                }

                return new RpcResult(te);

            }

        } catch(RpcException e) {

            if(e.isBiz()) {

                return new RpcResult(e);

            } else {

                throw e;

            }

        } catch(Throwable e) {

            return new RpcResult(e);

        }

    }

還是先判斷consumer是否是destory的,其實destroyed是destory的過去分詞,不是人家拼錯了。

然後經歷一堆和AbstractClusterInvoker的invoke一樣的引數設定,最後呼叫doInvoke()方法,而且這個方法在這個Invoker裡面也是抽象的。

AbstractInvoker的doInvoke()方法在DubboInvoker類裡面有具體實現,這個DubboInvoker是AbstractInvoker的子類,doInvoke()方法如下:

    @Override

    protected Result doInvoke(final Invocation invocation) throws Throwable {

        RpcInvocation inv = (RpcInvocation) invocation;

        final String methodName = RpcUtils.getMethodName(invocation);

       inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());

       inv.setAttachment(Constants.VERSION_KEY, version);


        ExchangeClient currentClient;

        if(clients.length == 1) {

           currentClient = clients[0];

        } else {

           currentClient = clients[index.getAndIncrement() % clients.length];

        }

        try {

            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);

            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            int timeout= getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);

            if(isOneway) {

                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

                currentClient.send(inv, isSent);

               RpcContext.getContext().setFuture(null);

                return new RpcResult();

            } else if(isAsync) {

               ResponseFuture future = currentClient.request(inv, timeout);

                RpcContext.getContext().setFuture(newFutureAdapter<Object>(future));

                return new RpcResult();

            } else {

               RpcContext.getContext().setFuture(null);

                return(Result) currentClient.request(inv, timeout).get();

            }

        } catch(TimeoutException e) {

            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: "+ getUrl() + ", cause: " + e.getMessage(), e);

        } catch(RemotingException e) {

            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " +getUrl() + ", cause: " + e.getMessage(), e);

        }

    }

在經過一堆的設定引數(地址、版本)之後,dubbo獲得了兩個引數,isAsync和isOneway,isAsync為true時代表非同步呼叫,isOneway為true時代表沒有返回值。

當isOneway為true時,呼叫send()方法然後返回一個空的RpcResult,ExchangeClient的send()方法就是用來把訊息發給provider的,send()方法的返回值型別是void。

而當isAsync為true時,設定了一個ResponseFuture之後返回一個空的RpcResult

最後的else就是普通的同步呼叫,不需要設定Future,一直等到provider端返回處理結果,currentClient.request方法負責把請求發出。

ExchangeClient是個介面,request()方法的實現類在HeaderExchangeClient類中,HeaderExchangeClient的request()方法只有一行,直接呼叫了HeaderExchangeChannel的request方法,這個request方法如下:

    public ResponseFuture request(Object request, int timeout) throws RemotingException {

        if (closed) {

            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request" + request + ", cause: The channel " + this + " isclosed!");

        }

        // create request.

        Request req =new Request();

       req.setVersion("2.0.0");

       req.setTwoWay(true);

       req.setData(request);

        DefaultFuture future = new DefaultFuture(channel, req, timeout);

        try {

           channel.send(req);

        } catch(RemotingException e) {

           future.cancel();

            throw e;

        }

        return future;

    }

其中的channel就是dubbo整合的Netty的Channel類,負責伺服器間訊息傳輸,這個類在dubbo中和netty中都能找到,這裡呼叫了他的send()方法。

Channel的send()方法來自EndPoint介面

Channel介面實現了EndPoint介面

AbstractChannel抽象類實現了Channel介面,然而他的send()方法的功能只是判斷當前channel是否已關閉

    public void send(Object message, boolean sent) throws RemotingException {

        if (isClosed()){

            throw new RemotingException(this, "Failed to send message "

                    +(message == null ? "" : message.getClass().getName()) + ":"+ message

                    +", cause: Channel closed. channel: " + getLocalAddress() + "-> " + getRemoteAddress());

        }

    }

最後NettyChannel類繼承了AbstractChannel類,重寫了父類的send()方法,程式碼如下:

    public void send(Object message, boolean sent) throws RemotingException {

       super.send(message, sent);


        boolean success= true;

        int timeout =0;

        try {

           ChannelFuture future = channel.write(message);

            if (sent) {

                timeout= getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);

                success= future.await(timeout);

            }

            Throwable cause = future.getCause();

            if (cause!= null) {

                throw cause;

            }

        } catch(Throwable e) {

            throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);

        }

 

        if (!success) {

            throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress()

                    +"in timeout(" + timeout + "ms) limit");

        }

    }

一開始呼叫了父類的send()方法,判斷是否關閉

channel.write()方法就是Channel負責傳送訊息的方法,至此,訊息只要再通過一些事件處理器(主要是編碼),就可以發到provider端了。

消費端事件處理器部分

NettyClient在初始化時添加了三個事件處理器用來處理髮送訊息和接收訊息的事件,分別是NettyCodecAdapter.DeCoder,NettyCodecAdapter.Encoder,NettyHandler,程式碼在NettyClient類的doOpen()方法裡:

    @Override

    protected void doOpen() throws Throwable {

       NettyHelper.setNettyLoggerFactory();

        bootstrap = new ClientBootstrap(channelFactory);

        // config

        // @see org.jboss.netty.channel.socket.SocketChannelConfig

        bootstrap.setOption("keepAlive",true);

       bootstrap.setOption("tcpNoDelay", true);

       bootstrap.setOption("connectTimeoutMillis", getTimeout());

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

       bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() {

               NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(),NettyClient.this);

               ChannelPipeline pipeline = Channels.pipeline();

               pipeline.addLast("decoder", adapter.getDecoder());

               pipeline.addLast("encoder", adapter.getEncoder());

               pipeline.addLast("handler", nettyHandler);

                returnpipeline;

            }

        });

    }

幾種事件處理器的在新增時順序是DeCoder,Encoder,NettyHandler。

當執行緒給對方傳送資訊時,叫做下行事件,下行事件會先經過NettyHandler再經過Encoder。

當執行緒接收對方發來的資訊時,叫做上行事件,上行事件會先經過DeCoder再經過NettyHandler。

在呼叫Channel.write()時,會呼叫事件處理器中的NettyHandler和Encoder,反過來當provider給consumer返回資訊時呼叫的是DeCoder和NettyHandler。

第一步,NettyHandler的writeRequested方法會首先被呼叫:

    @Override

    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

       super.writeRequested(ctx, e);

        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);

        try {

           handler.sent(channel, e.getMessage());

        } finally {

           NettyChannel.removeChannelIfDisconnected(ctx.getChannel());

        }

    }

這裡是進行一些dubbo的回撥功能。

第二步是呼叫NettyCodecAdapter.Encoder,encoder的定義和實現類就在NettyCodecAdapter類中:

    private final ChannelHandler encoder = new InternalEncoder();


    private final Codec2codec;


    @Sharable

    private class InternalEncoder extends OneToOneEncoder {


        @Override

        protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {

           com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =

                    com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);

           NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);

            try {

               codec.encode(channel, buffer, msg);

            } finally {

               NettyChannel.removeChannelIfDisconnected(ch);

            }

            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());

        }

    }

Codec2是提供encode和decode的介面,該介面由DubboCodec類實現,而具體的實現程式碼在DubboCodec類的父類ExchangeCodec中:

    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {

        if (msg instanceof Request) {

           encodeRequest(channel, buffer, (Request) msg);

        } else if (msg instanceof Response) {

            encodeResponse(channel,buffer, (Response) msg);

        } else {

           super.encode(channel, buffer, msg);

        }

    }

在發起呼叫時,msg是一個Request,呼叫encodeRequest()方法:

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {

        Serialization serialization = getSerialization(channel);

        // header.

        byte[] header =new byte[HEADER_LENGTH];

        // set magicnumber.

       Bytes.short2bytes(MAGIC, header);

 
        // set requestand serialization flag.

        header[2] =(byte) (FLAG_REQUEST | serialization.getContentTypeId());


        if(req.isTwoWay()) header[2] |= FLAG_TWOWAY;

        if (req.isEvent()) header[2] |= FLAG_EVENT;

 
        // set requestid.

       Bytes.long2bytes(req.getId(), header, 4);

 
        // encoderequest data.

        int savedWriteIndex = buffer.writerIndex();

       buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);

       ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

        if(req.isEvent()) {

           encodeEventData(channel, out, req.getData());

        } else {

           encodeRequestData(channel, out, req.getData());

        }

       out.flushBuffer();

        bos.flush();

        bos.close();

        int len =bos.writtenBytes();

       checkPayload(channel, len);

       Bytes.int2bytes(len, header, 12);

 

        // write

       buffer.writerIndex(savedWriteIndex);

       buffer.writeBytes(header); // write header.

       buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);

    }

這個方法寫的是把請求序列化為二進位制的過程,其中的encodeRequestData()方法,經過了好幾個只有一行的方法呼叫,最終執行的是ExchangeCodec類的如下方法:

    protected void encodeRequestData(ObjectOutput out, Object data) throws IOException {

       out.writeObject(data);

    }

至此資料會被髮送到provider端。