1. 程式人生 > >dubbo服務呼叫

dubbo服務呼叫

一:消費端傳送請求

1.當呼叫dubbo介面方法時,因為獲取的類例項是FactoryBean介面返回的代理類,所以會先經過InvokerInvocationHandler的invoke方法,這個代理是在類初始化時設定的

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
2.在MockClusterInvoker裡先檢查是不是Mock型別,不是的話就會在FailoverClusterInvoker以及他的父類AbstractClusterInvoker進行處理
public Result invoke(final Invocation invocation) throws RpcException {
        checkWheatherDestoried();
        LoadBalance loadbalance;
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }
3.先從註冊目錄裡查詢滿足該方法的Invoker列表,然後用MockInvokersSelector進行路由,這個物件是最開始建立註冊目錄是setRouters(routers);設定進去的。然後獲取負載均衡,預設是隨機,判斷需不需要非同步,最後把這些引數傳到Fai,loverClusterInvoker,獲取預設重試次數三次,當不是第一次呼叫時需要重新獲取最新Invoker列表,通過負載均衡進行篩選本次需要是用的Invoker,在ThreadLocal<RpcContext> LOCAL儲存本次的呼叫資訊。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	List<Invoker<T>> copyinvokers = invokers;
    	checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
        	//重試時,進行重新選擇,避免重試時invoker列表已發生變化.
        	//注意:如果列表發生了變化,那麼invoked判斷會失效,因為invoker示例已經改變
        	if (i > 0) {
        		checkWheatherDestoried();
        		copyinvokers = list(invocation);
        		//重新檢查一下
        		checkInvokers(copyinvokers, invocation);
        	}
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                Result result = invoker.invoke(invocation);
  
                return result;
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
    
    }
4.那麼下來的invoker會是哪個物件呢?在消費端註冊時會設定監聽介面,也就是RegistryDirectory自身,當收到通知時會觸發notify方法,方法最後會呼叫  refreshInvoker(invokerUrls);
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名對映Invoker列表
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
 /**
     * 代理類,主要用於儲存註冊中心下發的url地址,用於重新重新refer時能夠根據providerURL queryMap overrideMap重新組裝
     */
    private static class InvokerDelegete<T> extends InvokerWrapper<T>{
        private URL providerUrl;
        public InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl) {
            super(invoker, url);
            this.providerUrl = providerUrl;
        }
        public URL getProviderUrl() {
            return providerUrl;
        }
    }
5.invoke方法的呼叫由InvokerWrapper-》ProtocolFilterWrapper,這塊是通過連線過濾器鏈進行串聯起來的buildInvokerChain-》ListenerInvokerWrapper-》AbstractInvoker,給RpcInvocation 設定Invoker,新增本呼叫類的相關超時等配置項,獲取並且設定本執行緒的上下文 RpcContext.getContext().getAttachments()的配置,判斷是否非同步-》DubboInvoker
public Result invoke(Invocation inv) throws RpcException {
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
        	invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> context = RpcContext.getContext().getAttachments();
        if (context != null) {
        	invocation.addAttachmentsIfAbsent(context);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
        	invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        
        
        try {
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
          
    }
6.判斷同步非同步,或者不需要回復,設定上下文RpcContext非同步Future為null,然後開始超時等待get()-》 DefaultFuture類的get(timeout);
protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } 
        try {
           {
            	RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } 
    }
7.獲取交換層,因為這些invoker都是從開始refer服務後更新的快取中取出來的,所以路徑為ReferenceCountExchangeClient-》HeaderExchangeClient-》HeaderExchangeChannel,這是Request 第一次被創建出來,設定超時等待的DefaultFuture,
 public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
        }
        return future;
    }
8.開始通過channel準備往通道里面發請求,這個通道就是[id: 0x, /本地IP:埠=> /服務提供者IP:埠]-》AbstractPeer-》AbstractClient-》AbstractClient從NettyClient中獲取getChannel();最後通過NettyChannel的channel寫入請求ChannelFuture future = channel.write(message);經過netty框架的下行通道ChannelDownstreamHandler處理,到達我們最開始啟動客戶端 時設定的final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);NettyClient的pipeline.addLast("handler", nettyHandler);

這個handle是通過this傳進來的,handler.sent(channel, e.getMessage());-》AbstractPeer-》MultiMessageHandler的父類AbstractChannelHandlerDelegate-》HeartbeatHandler這裡會設定寫入時間-》WrappedChannelHandler-》DecodeHandler的父類AbstractChannelHandlerDelegate-》HeaderExchangeHandler重新設定寫入時間

具體的通訊編解碼工具是依靠DubboCountCodec來完成的。

二:提供端處理請求

9.先用DubboCountCodec解碼,NettyHandler的messageReceived(handler.received(channel, e.getMessage());)處理請求-》NettyServer的父類AbstractPeer-》MultiMessageHandler-》HeartbeatHandler給通道channel設定讀取時間,判斷是心跳,回覆還是請求-》AllChannelHandler獲取之前配置的執行緒池,然後把這個請求放入執行緒池處理,設定channel為接收狀態, cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

10.DecodeHandler判斷請求有沒有進行再次編碼,如果是請求訊息,取出(Request)message).getData()為DecodeableRpcInvocation型別,-》HeaderExchangeHandler獲取之前連結的時候儲存的ExchangeChannel

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
建立回覆物件Response ,並且呼叫DubboProtocol的建立的匿名類requestHandler處理請求,
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // find handler by message class.
        Object msg = req.getData();
        try {
            // handle data.
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }

根據channel和Invocation 獲取之前暴露的DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);這個key是介面全限定名:埠號,然後

exporter.getInvoker()返回具體的方法代理Invoker。給上下文RpcContext設定遠端地址,開始invoke呼叫

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

11.ProtocolFilterWrapper過濾器鏈處理--》ListenerInvokerWrapper-》RegistryProtocol內部類InvokerDelegete的父類InvokerWrapper,這些都是暴露服務是後封裝的-》AbstractProxyInvoker,在最初暴露服務時會用Javassist生成一個代理

public Result invoke(Invocation invocation) throws RpcException {
        try {
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } 
    }
呼叫相應的方法返回結果 res.setStatus(Response.OK);         res.setResult(result);
new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };

三:提供端傳送處理結果

12.往HeaderExchangeHandler的channel通道里寫入迴應 channel.send(response);在NettyChannel父類AbstractChannel父類AbstractPeer設定傳送標誌sent也就是超時,ChannelFuture future = channel.write(message);

四:消費端接收處理結果

13.在HeaderExchangeHandler的received方法中處理結果handleResponse(channel, (Response) message);但結果不是心跳和空的話DefaultFuture.received(channel, response);future.doReceived(response);之前消費端等待請求結果時呼叫get堵塞在done.await(timeout, TimeUnit.MILLISECONDS);

    public Object get(int timeout) throws RemotingException {
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
而結果訊息到來時response 會有值,並且done.signal();
 private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
最後返回結果即可returnFromResponse();

相關推薦

Dubbo服務呼叫Failed to invoke the method錯誤記錄

Dubbo服務呼叫Failed to invoke the method錯誤記錄 在開發過程中我遇到一個問題: 一個多模組專案,服務與應用之間採用dubbo進行呼叫,啟動服務後用瀏覽器訪問一切都好,但當採用fiddler進行模擬外系統請求時卻死活調不通,報錯如下: [ERR

Dubbo服務呼叫原理

服務呼叫原理 引用服務 最終,建立一個代理物件 InvokerInvocationHandler Invoke,是一層一層封裝的結果 invoker.invoke 執行 MockClusterInvoker invok

使用Zipkin和Brave 實現dubbo服務呼叫跟蹤

git專案地址:https://github.com/blacklau/http-dubbo-zipkin(點選開啟連結),請下載使用。 本工程通過模擬訂單詳情的查詢,演示系統的呼叫鏈跟蹤,跟蹤資訊包括呼叫服務及請求引數。 涉及的各工程作用: louie-webap

dubbo 服務呼叫原始碼分析

下面是呼叫方法棧和核心程式碼分析InvokerInvocationHandler.invoke(Object, Method, Object[]) line: 38com.alibaba.dubbo.rpc.cluster.support.wrapper.MockCluste

dubbo服務呼叫

一:消費端傳送請求 1.當呼叫dubbo介面方法時,因為獲取的類例項是FactoryBean介面返回的代理類,所以會先經過InvokerInvocationHandler的invoke方法,這個代理是在類初始化時設定的 public Object invoke(Object

Dubbo 服務呼叫返回的物件部分屬性返回為null

 最近呼叫dubbo服務時有兩個奇怪現象;controller接收到bean引數後都有值,但是當呼叫服務時傳過去的引數就部分為null了(比如remark在controller裡檢視bean物件是有值的,但是到facade服務層傳遞過去的bean中remark就沒有值了);還

spring 自動注入和 dubbo服務呼叫問題

今天 遇到這樣一個問題: 工程A中我寫寫了一個spring security的自定義認證類,這個類始終不能由容器管理,但是這個時候 我需要通過spring 自動注入某個服務類來獲取使用者相關資訊 來進行認證) 問題出現在: 自定義認證類不是由容器管

Dubbo 服務呼叫原理淺析

dubbo概念dubbo原理dubbo應用場景    Dubbo概念:      Dubbo是一個分散式服務框架,致力於提供高效能和透明化的RPC遠端服務呼叫方案,以及SOA服務治理方案。簡單的說,du

Dubbo服務呼叫,時好時壞,一會兒呼叫正常,一會兒呼叫不正常?

摘要:昨天上線個新版本的dubbo服務到預釋出環境,今天來驗證功能,發現,其中一個介面呼叫,1次正常,下一次就不正常,再重新整理又正常了,這到底是什麼問題呢,我們來分析下: 一:問題排查: 1.首先檢視是不是部署了多臺服務提供者,如果是停掉其中一臺,這麼做以後,發現還是有

Dubbo服務呼叫過程原始碼解析④

[TOC] > [Dubbo SPI原始碼解析①](https://www.cnblogs.com/lbhym/p/14192704.html) > > [Dubbo服務暴露原始碼解析②](https://www.cnblogs.com/lbhym/p/14192711.html) > > [Dubbo服務

dubbo其實很簡單,就是一個遠端服務呼叫的框架(1)

dubbo專題」dubbo其實很簡單,就是一個遠端服務呼叫的框架(1) 一、dubbo是什麼? 1)本質:一個Jar包,一個分散式框架,,一個遠端服務呼叫的分散式框架。 既然是新手教學,肯定很多同學不明白什麼是分散式和遠端服務呼叫,為什麼要分散式,為什麼要遠端呼叫。我簡單畫個對比圖說明(

dubbo服務呼叫過程

服務消費的過程:referenceConfig類的init方法呼叫Protocol的refer方法,生成invoker例項,然後把Invoker轉換為客戶端需要的介面。 2、原始碼解析 dubbo的消費端初始化在ReferenceConfig的get()方法 public

dubbo原始碼分析-服務呼叫流程-筆記

消費端呼叫過程流程圖 消費端的呼叫過程 消費端介面例項: 服務端接收訊息處理過程 NettyHandler. messageReceived 接收訊息的時候,通過NettyHandler.messageReceived作為入口 @Override public vo

dubbo服務的配置與使用,以及怎麼在呼叫本地的dubbo服務

隨著專案的精分,以及小型化,一個大的專案會被拆分為數個小而精簡的專案。會分為前端專案,介面專案以及服務專案等等。那麼前端介面怎麼來呼叫其他的服務專案呢,這時就需要用到dubbo服務來呼叫這些服務。        2.在使用dub

一個電商專案的Web服務化改造7 Dubbo服務呼叫 4個專案

使用dubbo服務的過程,很簡單,和之前學習的WebService完全一樣,和本地介面呼叫也基本一致。    dubbo和WebService的區別:我認為dubbo就是封裝了WebService,然後提供了更多的配套功能。看jar包依賴,dubbo依賴的WebService。(青出於藍,而勝於藍。冰,水為之

dubbo原始碼淺析(五)-遠端服務呼叫流程

消費端呼叫遠端服務介面時,使用上和呼叫普通的java介面是沒有任何區別,但是服務消費者和提供者是跨JVM和主機的,客戶端如何封裝請求讓服務端理解請求並且解析服務端返回的介面呼叫結果,服務端如何解析客戶端的請求並且向客戶端返回呼叫結果,這些框架是如何實現的,下面就

dubbo服務的 遠端呼叫

首先dubbo 和spring 是無縫整合的,先看下配置檔案 提供端的,<!-- 具體的實現bean --> <bean id="testService" class="com.dubbo.provider.impl.TetsServiceImpl" /&g

Dubbo 原始碼分析 - 服務呼叫過程

注: 本系列文章已捐贈給 Dubbo 社群,你也可以在 Dubbo 官方文件中閱讀本系列文章。 1. 簡介 在前面的文章中,我們分析了 Dubbo SPI、服務匯出與引入、以及叢集容錯方面的程式碼。經過前文的鋪墊,本篇文章我們終於可以分析服務呼叫過程了。Dubbo 服務呼叫過程比較複雜,包含眾多步驟。比如

Dubbo分散式遠端服務呼叫框架(告別Web Service模式中的WSdl,以服務者與消費者的方式在dubbo上註冊

1. Dubbo是什麼? Dubbo是一個分散式服務框架,致力於提供高效能和透明化的RPC遠端服務呼叫方案,以及SOA服務治理方案。簡單的說,dubbo就是個服務框架,如果沒有分散式的需求,其實是不需要用的,只有在分散式的時候,才有dubbo這樣的分散式服務框架的需求,

呼叫Dubbo服務報以下錯誤(com.alibaba.dubbo.remoting.RemotingException),問題原因和解決辦法

2017-04-19 23:41:48,333 ERROR [com.alibaba.dubbo.remoting.transport.AbstractClient] -  [DUBBO] Failed to start NettyClient LX-20161101CZV