1. 程式人生 > >dubbo協議下的單一長連線與多執行緒併發如何協同工作

dubbo協議下的單一長連線與多執行緒併發如何協同工作

開發十年,就只剩下這套架構體系了! >>>   

dubbo協議下的單一長連線與多執行緒併發如何協同工作

上班的路上突然就冒出了這麼個問題:既然在dubbo中描述消費者和提供者之間採用的是單一長連線,那麼如果消費者端是高併發多執行緒模型的web應用,單一長連線如何解決多執行緒併發請求問題呢?

其實如果不太瞭解socket或者多執行緒程式設計的相關知識,不太容易理解這個問題。傳統的最簡單的RPC方式,應該是為每次遠端呼叫請求建立一個對應的執行緒,我們先不說這種方式的缺點。至少優點很明顯,就是簡單。簡單體現在哪兒?

通訊雙方一對一(相比NIO來說)。

通俗點來說,socket通訊的雙方傳送和接受資料不會被其它(執行緒)干擾,這種干擾不同於數資料包的“粘包問題”。其實說白了就相當於電話線路的場景:

試想一下如果多個人同時對著同一個話筒大喊,對方接受到的聲音就會是重疊且雜亂的。

對於單一的socket通道來說,如果傳送方多執行緒的話,不加控制就會導致通道中的資料亂七八糟,接收端無法區分資料的單位,也就無法正確的處理請求。

乍一看,似乎dubbo協議所說的單一長連線與客戶端多執行緒併發請求之間,是水火不容的。但其實稍加設計,就可以讓它們和諧相處。

socket中的粘包問題是怎麼解決的?用的最多的其實是定義一個定長的資料包頭,其中包含了完整資料包的長度,以此來完成伺服器端拆包工作。

那麼解決多執行緒使用單一長連線併發請求時包乾擾的方法也有點雷同,就是給包頭中新增一個標識id,伺服器端響應請求時也要攜帶這個id,供客戶端多執行緒領取對應的響應資料提供線索。

其實如果不考慮效能的話,dubbo完全也可以為每個客戶端執行緒建立一個對應的伺服器端執行緒,但這是海量高併發場景所不能接受的~~

那麼腦補一張圖:

下面咱們試圖從程式碼中找到痕跡。

一路追蹤,我們來到這個類:com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.java,先來看看其中的request方法,大概在第101行左右:

 public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);

    //這個future就是前面我們提到的:客戶端併發請求執行緒阻塞的物件
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        channel.send(req);  //非阻塞呼叫
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

注意這個方法返回的ResponseFuture物件,當前處理客戶端請求的執行緒在經過一系列呼叫後,會拿到ResponseFuture物件,最終該執行緒會阻塞在這個物件的下面這個方法呼叫上,如下:

public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (! isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (! isDone()) {    //無限連
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (! isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

上面我已經看到請求執行緒已經阻塞,那麼又是如何被喚醒的呢?再看一下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.java,其實所有實現了ChannelHandler介面的類都被設計為裝飾器模式,所以你可以看到類似這樣的程式碼:

 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(
            new HeartbeatHandler(
                    ExtensionLoader.getExtensionLoader(Dispather.class).getAdaptiveExtension().dispath(handler, url)
            ));
}

現在來仔細看一下HeaderExchangeHandler類的定義,先看一下它定義的received方法,下面是程式碼片段:

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
          .....
        } else if (message instanceof Response) {   
            //這裡就是作為消費者的dubbo客戶端在接收到響應後,觸發通知對應等待執行緒的起點
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
           .....
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

我們主要看中間的那個條件分支,它是用來處理響應訊息的,也就是說當dubbo客戶端接收到來自服務端的響應後會執行到這個分支,它簡單的呼叫了handleResponse方法,我們追過去看看:

static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {  //排除心跳型別的響應
        DefaultFuture.received(channel, response);
    }
}

熟悉的身影:DefaultFuture,它是實現了我們上面說的ResponseFuture介面型別,實際上細心的童鞋應該可以看到,上面request方法中其實例項化的就是這個DefaultFutrue物件:

DefaultFuture future = new DefaultFuture(channel, req, timeout);

那麼我們可以繼續來看一下DefaultFuture.received方法的實現細節:

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

留一下我們之前提到的id的作用,這裡可以看到它已經開始發揮作用了。通過idDefaultFuture.FUTURES可以拿到具體的那個DefaultFuture物件,它就是上面我們提到的,阻塞請求執行緒的那個物件。好,找到目標後,呼叫它的doReceived方法,這就是標準的java多執行緒程式設計知識了:

private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}

這樣我們就可以證實上圖中左邊的綠色箭頭所標註的兩點。


接下來我們再來看看右邊綠色箭頭提到的兩點是如何實現的?其實dubbo在NIO的實現上預設依賴的是netty,也就是說真正在長連線兩端發包和接包的苦力是netty。由於哥們我對netty不是很熟悉,所以暫時我們就直接把netty當做黑箱,只需要知道它可以很好的完