1. 程式人生 > 程式設計 >Dubbo原始碼解析(十)遠端通訊——Exchange層

Dubbo原始碼解析(十)遠端通訊——Exchange層

遠端通訊——Exchange層

目標:介紹Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的原始碼解析。

前言

上一篇文章我講的是dubbo框架設計中Transport層,這篇文章我要講的是它的上一層Exchange層,也就是資訊交換層。官方檔案對這一層的解釋是封裝請求響應模式,同步轉非同步,以 Request,Response為中心,擴充套件介面為 Exchanger,ExchangeChannel,ExchangeClient,ExchangeServer。

這一層的設計意圖是什麼?它應該算是在資訊傳輸層上又做了部分裝飾,為了適應rpc呼叫的一些需求,比如rpc呼叫中一次請求只關心它所對應的響應,這個時候只是一個message訊息傳輸過來,是無法區分這是新的請求還是上一個請求的響應,這種類似於冪等性的問題以及rpc非同步處理返回結果、內建事件等特性都是在Transport層無法解決滿足的,所有在Exchange層講message分成了request和response兩種型別,並且在這兩個模型上增加一些系統欄位來處理問題。具體我會在下面講到。而dubbo把一條訊息分為了協議頭和內容兩部分:協議頭包括系統欄位,例如編號等,內容包括具體請求的引數和響應的結果等。在exchange層中大量邏輯都是基於協議頭的。

現在對這一層的設計意圖大致應該有所瞭解了吧,現在來看看exchange的類圖:

exchange類圖

我講解的順序還是按照類圖從上而下,分塊講解,忽略綠色的test類。

原始碼解析

(一)ExchangeChannel

public interface ExchangeChannel extends Channel {

    ResponseFuture request(Object request) throws RemotingException;

    ResponseFuture request(Object request,int timeout) throws RemotingException
; ExchangeHandler getExchangeHandler(); @Override void close(int timeout); } 複製程式碼

該介面是資訊交換通道介面,有四個方法,前兩個是傳送請求訊息,區別就是第二個傳送請求有超時的引數,getExchangeHandler方法就是返回一個資訊交換處理器,第四個是需要覆寫父類的方法。

(二)HeaderExchangeChannel

該類實現了ExchangeChannel,是基於協議頭的資訊交換通道。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);

/**
 * 通道的key值
 */
private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL"; /** * 通道 */ private final Channel channel; /** * 是否關閉 */ private volatile boolean closed = false; 複製程式碼

上述屬性比較簡單,還是放一下這個類的屬性是因為該類中有channel屬性,也就是說HeaderExchangeChannel是Channel的裝飾器,每個實現方法都會呼叫channel的方法。

2.靜態方法

static HeaderExchangeChannel getOrAddChannel(Channel ch) {
    if (ch == null) {
        return null;
    }
    // 獲得通道中的HeaderExchangeChannel
    HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);
    if (ret == null) {
        // 建立一個HeaderExchangeChannel例項
        ret = new HeaderExchangeChannel(ch);
        // 如果通道連線
        if (ch.isConnected()) {
            // 加入屬性值
            ch.setAttribute(CHANNEL_KEY,ret);
        }
    }
    return ret;
}

static void removeChannelIfDisconnected(Channel ch) {
    // 如果通道斷開連線
    if (ch != null && !ch.isConnected()) {
        // 移除屬性值
        ch.removeAttribute(CHANNEL_KEY);
    }
}
複製程式碼

該靜態方法做了HeaderExchangeChannel的建立和銷燬,並且生命週期隨channel銷燬而銷燬。

3.send

@Override
public void send(Object message) throws RemotingException {
    send(message,getUrl().getParameter(Constants.SENT_KEY,false));
}

@Override
public void send(Object message,boolean sent) throws RemotingException {
    // 如果通道關閉,丟擲異常
    if (closed) {
        throw new RemotingException(this.getLocalAddress(),null,"Failed to send message " + message + ",cause: The channel " + this + " is closed!");
    }
    // 判斷訊息的型別
    if (message instanceof Request
            || message instanceof Response
            || message instanceof String) {
        // 傳送訊息
        channel.send(message,sent);
    } else {
        // 新建一個request例項
        Request request = new Request();
        // 設定資訊的版本
        request.setVersion(Version.getProtocolVersion());
        // 該請求不需要響應
        request.setTwoWay(false);
        // 把訊息傳入
        request.setData(message);
        // 傳送訊息
        channel.send(request,sent);
    }
}
複製程式碼

該方法是在channel的send方法上加上了request和response模型,最後再呼叫channel.send,起到了裝飾器的作用。

4.request

@Override
public ResponseFuture request(Object request) throws RemotingException {
    return request(request,channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT));
}

@Override
public ResponseFuture request(Object request,int timeout) throws RemotingException {
    // 如果通道關閉,則丟擲異常
    if (closed) {
        throw new RemotingException(this.getLocalAddress(),"Failed to send request " + request + ",cause: The channel " + this + " is closed!");
    }
    // create request.建立請求
    Request req = new Request();
    // 設定版本號
    req.setVersion(Version.getProtocolVersion());
    // 設定需要響應
    req.setTwoWay(true);
    // 把請求資料傳入
    req.setData(request);
    // 建立DefaultFuture物件,可以從future中主動獲得請求對應的響應資訊
    DefaultFuture future = new DefaultFuture(channel,req,timeout);
    try {
        // 傳送請求訊息
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
複製程式碼

該方法是請求方法,用Request模型把請求內容裝飾起來,然後傳送一個Request型別的訊息,並且返回DefaultFuture例項,DefaultFuture我會在後面講到。

cloes方法也重寫了,我就不再多說,因為比較簡單,沒有重點,其他方法都是直接呼叫channel屬性的方法。

(三)ExchangeClient

該介面繼承了Client和ExchangeChannel,是資訊交換客戶端介面,其中沒有定義多餘的方法。

(四)HeaderExchangeClient

 該類實現了ExchangeClient介面,是基於協議頭的資訊互動客戶端類,同樣它是Client、Channel的介面卡。在該類的原始碼中可以看到所有的實現方法都是呼叫了client和channel屬性的方法。該類主要的作用就是增加了心跳功能,為什麼要增加心跳功能呢,對於長連線,一些拔網線等物理層的斷開,會導致TCP的FIN訊息來不及傳送,對方收不到斷開事件,那麼就需要用到傳送心跳包來檢測連線是否斷開。consumer和provider斷開,處理措施不一樣,會分別做出重連和關閉通道的操作。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);

/**
 * 定時器執行緒池
 */
private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2,new NamedThreadFactory("dubbo-remoting-client-heartbeat",true));
/**
 * 客戶端
 */
private final Client client;
/**
 * 資訊交換通道
 */
private final ExchangeChannel channel;
// heartbeat timer
/**
 * 心跳定時器
 */
private ScheduledFuture<?> heartbeatTimer;
// heartbeat(ms),default value is 0,won't execute a heartbeat.
/**
 * 心跳週期,間隔多久傳送心跳訊息檢測一次
 */
private int heartbeat;
/**
 * 心跳超時時間
 */
private int heartbeatTimeout;
複製程式碼

該類的屬性除了需要適配的屬性外,其他都是跟心跳相關屬性。

2.建構函式

public HeaderExchangeClient(Client client,boolean needHeartbeat) {
    if (client == null) {
        throw new IllegalArgumentException("client == null");
    }
    this.client = client;
    // 建立資訊交換通道
    this.channel = new HeaderExchangeChannel(client);
    // 獲得dubbo版本
    String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
    //獲得心跳週期配置,如果沒有配置,並且dubbo是1.0版本的,則這隻為1分鐘,否則設定為0
    this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY,dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
    // 獲得心跳超時配置,預設是心跳週期的三倍
    this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY,heartbeat * 3);
    // 如果心跳超時時間小於心跳週期的兩倍,則丟擲異常
    if (heartbeatTimeout < heartbeat * 2) {
        throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    }
    if (needHeartbeat) {
        // 開啟心跳
        startHeartbeatTimer();
    }
}
複製程式碼

建構函式就是對一些屬性初始化設定,優先從url中獲取。心跳超時時間小於心跳週期的兩倍就丟擲異常,意思就是至少重試兩次心跳檢測。

3.startHeartbeatTimer

private void startHeartbeatTimer() {
    // 停止現有的心跳執行緒
    stopHeartbeatTimer();
    // 如果需要心跳
    if (heartbeat > 0) {
        // 建立心跳定時器
        heartbeatTimer = scheduled.scheduleWithFixedDelay(
                // 新建一個心跳執行緒
                new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                    @Override
                    public Collection<Channel> getChannels() {
                        // 返回一個只包含HeaderExchangeClient物件的不可變列表
                        return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                    }
                },heartbeat,heartbeatTimeout),TimeUnit.MILLISECONDS);
    }
}
複製程式碼

該方法就是開啟心跳。利用心跳定時器來做到定時檢測心跳。因為這是資訊交換客戶端類,所有這裡的只是返回包含HeaderExchangeClient物件的不可變列表,因為客戶端跟channel是一一對應的,只有這一個該客戶端本身的channel需要心跳。

4.stopHeartbeatTimer

private void stopHeartbeatTimer() {
    if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
        try {
            // 取消定時器
            heartbeatTimer.cancel(true);
            // 取消大量已排隊任務,用於回收空間
            scheduled.purge();
        } catch (Throwable e) {
            if (logger.isWarnEnabled()) {
                logger.warn(e.getMessage(),e);
            }
        }
    }
    heartbeatTimer = null;
}
複製程式碼

該方法是停止現有心跳,也就是停止定時器,釋放空間。

其他方法都是呼叫channel和client屬性的方法。

(五)HeartBeatTask

該類實現了Runnable介面,實現的是心跳任務,裡麵包含了核心的心跳策略。

1.屬性

/**
 * 通道管理
 */
private ChannelProvider channelProvider;

/**
 * 心跳間隔 單位:ms
 */
private int heartbeat;

/**
 * 心跳超時時間 單位:ms
 */
private int heartbeatTimeout;
複製程式碼

後兩個屬性跟HeaderExchangeClient中的屬性含義一樣,第一個是該類自己內部的一個介面:

interface ChannelProvider {
    // 獲得所有的通道集合,需要心跳的通道陣列
    Collection<Channel> getChannels();
}
複製程式碼

該介面就定義了一個方法,獲得需要心跳的通道集合。可想而知,會對集合內的通道都做心跳檢測。

2.run

@Override
public void run() {
    try {
        long now = System.currentTimeMillis();
        // 遍歷所有通道
        for (Channel channel : channelProvider.getChannels()) {
            // 如果通道關閉了,則跳過
            if (channel.isClosed()) {
                continue;
            }
            try {
                // 最後一次接收到訊息的時間戳
                Long lastRead = (Long) channel.getAttribute(
                        HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                // 最後一次傳送訊息的時間戳
                Long lastWrite = (Long) channel.getAttribute(
                        HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                // 如果最後一次接收或者傳送訊息到時間到現在的時間間隔超過了心跳間隔時間
                if ((lastRead != null && now - lastRead > heartbeat)
                        || (lastWrite != null && now - lastWrite > heartbeat)) {
                    // 建立一個request
                    Request req = new Request();
                    // 設定版本號
                    req.setVersion(Version.getProtocolVersion());
                    // 設定需要得到響應
                    req.setTwoWay(true);
                    // 設定事件型別,為心跳事件
                    req.setEvent(Request.HEARTBEAT_EVENT);
                    // 傳送心跳請求
                    channel.send(req);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                + ",cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                    }
                }
                // 如果最後一次接收訊息的時間到現在已經超過了超時時間
                if (lastRead != null && now - lastRead > heartbeatTimeout) {
                    logger.warn("Close channel " + channel
                            + ",because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                    // 如果該通道是客戶端,也就是請求的伺服器掛掉了,客戶端嘗試重連伺服器
                    if (channel instanceof Client) {
                        try {
                            // 重新連線伺服器
                            ((Client) channel).reconnect();
                        } catch (Exception e) {
                            //do nothing
                        }
                    } else {
                        // 如果不是客戶端,也就是是服務端返回響應給客戶端,但是客戶端掛掉了,則服務端關閉客戶端連線
                        channel.close();
                    }
                }
            } catch (Throwable t) {
                logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(),t);
            }
        }
    } catch (Throwable t) {
        logger.warn("Unhandled exception when heartbeat,cause: " + t.getMessage(),t);
    }
}
複製程式碼

該方法中是心跳機制的核心邏輯。注意以下幾個點:

  1. 如果需要心跳的通道本身如果關閉了,那麼跳過,不新增心跳機制。
  2. 無論是接收訊息還是傳送訊息,只要超過了設定的心跳間隔,就傳送心跳訊息來測試是否斷開
  3. 如果最後一次接收到訊息到到現在已經超過了心跳超時時間,那就認定對方的確斷開,分兩種情況來處理對方斷開的情況。分別是服務端斷開,客戶端重連以及客戶端斷開,服務端斷開這個客戶端的連線。,這裡要好好品味一下誰是傳送方,誰在等誰的響應,苦苦沒有等到。

(六)ResponseFuture

public interface ResponseFuture {

    Object get() throws RemotingException;

    Object get(int timeoutInMillis) throws RemotingException;

    void setCallback(ResponseCallback callback);

    boolean isDone();

}
複製程式碼

該介面是響應future介面,該介面的設計意圖跟java.util.concurrent.Future很類似。傳送出去的訊息,潑出去的水,只有等到對方主動響應才能得到結果,但是請求方需要去主動回去該請求的結果,就顯得有些艱難,所有產生了這樣一個介面,它能夠獲取任務執行結果、可以核對請求訊息是否被響應,還能設定回撥來支援非同步。

(七)DefaultFuture

該類實現了ResponseFuture介面,其中封裝了處理響應的邏輯。你可以把DefaultFuture看成是一箇中介,買房和賣房都通過這個中介進行溝通,中介擁有著買房者的資訊request和賣房者的資訊response,並且促成他們之間的買賣。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);

/**
 * 通道集合
 */
private static final Map<Long,Channel> CHANNELS = new ConcurrentHashMap<Long,Channel>();

/**
 * Future集合,key為請求編號
 */
private static final Map<Long,DefaultFuture> FUTURES = new ConcurrentHashMap<Long,DefaultFuture>();

// invoke id.
/**
 * 請求編號
 */
private final long id;
/**
 * 通道
 */
private final Channel channel;
/**
 * 請求
 */
private final Request request;
/**
 * 超時
 */
private final int timeout;
/**
 * 鎖
 */
private final Lock lock = new ReentrantLock();
/**
 * 完成情況,控制多執行緒的休眠與喚醒
 */
private final Condition done = lock.newCondition();
/**
 * 建立開始時間
 */
private final long start = System.currentTimeMillis();
/**
 * 傳送請求時間
 */
private volatile long sent;
/**
 * 響應
 */
private volatile Response response;
/**
 * 回撥
 */
private volatile ResponseCallback callback;
複製程式碼

可以看到,該類的屬性包含了request、response、channel三個例項,在該類中,把請求和響應通過唯一的id一一對應起來。做到非同步處理返回結果時能給準確的返回給對應的請求。可以看到屬性中有兩個集合,分別是通道集合和future集合,也就是該類本身也是所有 DefaultFuture 的管理容器。

2.建構函式

public DefaultFuture(Channel channel,Request request,int timeout) {
    this.channel = channel;
    this.request = request;
    // 設定請求編號
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
    // put into waiting map.,加入到等待集合中
    FUTURES.put(id,this);
    CHANNELS.put(id,channel);
}
複製程式碼

建構函式比較簡單,每一個DefaultFuture例項都跟每一個請求一一對應,被存入到集合中管理起來。

3.closeChannel

public static void closeChannel(Channel channel) {
    // 遍歷通道集合
    for (long id : CHANNELS.keySet()) {
        if (channel.equals(CHANNELS.get(id))) {
            // 通過請求id獲得future
            DefaultFuture future = getFuture(id);
            if (future != null && !future.isDone()) {
                // 建立一個關閉通道的響應
                Response disconnectResponse = new Response(future.getId());
                disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
                disconnectResponse.setErrorMessage("Channel " +
                        channel +
                        " is inactive. Directly return the unFinished request : " +
                        future.getRequest());
                // 接收該關閉通道並且請求未完成的響應
                DefaultFuture.received(channel,disconnectResponse);
            }
        }
    }
}
複製程式碼

該方法是關閉不活躍的通道,並且返回請求未完成。也就是關閉指定channel的請求,返回的是請求未完成。

4.received

public static void received(Channel channel,Response response) {
    try {
        // future集合中移除該請求的future,(響應id和請求id一一對應的)
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            // 接收響應結果
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ",response " + response
                    + (channel == null ? "" : ",channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        // 通道集合移除該請求對應的通道,代表著這一次請求結束
        CHANNELS.remove(response.getId());
    }
}

複製程式碼

該方法是接收響應,也就是某個請求得到了響應,那麼代表這次請求任務完成,所有需要把future從集合中移除。具體的接收響應結果在doReceived方法中實現。

5.doReceived

private void doReceived(Response res) {
    // 獲得鎖
    lock.lock();
    try {
        // 設定響應
        response = res;
        if (done != null) {
            // 喚醒等待
            done.signal();
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
    if (callback != null) {
        // 執行回撥
        invokeCallback(callback);
    }
}

複製程式碼

可以看到,當接收到響應後,會把等待的執行緒喚醒,然後執行回撥來處理該響應結果。

6.invokeCallback

private void invokeCallback(ResponseCallback c) {
    ResponseCallback callbackCopy = c;
    if (callbackCopy == null) {
        throw new NullPointerException("callback cannot be null.");
    }
    c = null;
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
    }

    // 如果響應成功,返回碼是20
    if (res.getStatus() == Response.OK) {
        try {
            // 使用響應結果執行 完成 後的邏輯
            callbackCopy.done(res.getResult());
        } catch (Exception e) {
            logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(),e);
        }
        //超時,回撥處理成超時異常
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        try {
            TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT,channel,res.getErrorMessage());
            // 回撥處理異常
            callbackCopy.caught(te);
        } catch (Exception e) {
            logger.error("callback invoke error,e);
        }
        // 其他情況處理成RemotingException異常
    } else {
        try {
            RuntimeException re = new RuntimeException(res.getErrorMessage());
            callbackCopy.caught(re);
        } catch (Exception e) {
            logger.error("callback invoke error,e);
        }
    }
}

複製程式碼

該方法是執行回撥來處理響應結果。分為了三種情況:

  1. 響應成功,那麼執行完成後的邏輯。
  2. 超時,會按照超時異常來處理
  3. 其他,按照RuntimeException異常來處理

具體的處理都在ResponseCallback介面的實現類裡執行,後面我會講到。

7.get

@Override
public Object get() throws RemotingException {
    return get(timeout);
}

@Override
public Object get(int timeout) throws RemotingException {
    // 超時時間預設為1s
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    // 如果請求沒有完成,也就是還沒有響應返回
    if (!isDone()) {
        long start = System.currentTimeMillis();
        // 獲得鎖
        lock.lock();
        try {
            // 輪詢 等待請求是否完成
            while (!isDone()) {
                // 執行緒阻塞等待
                done.await(timeout,TimeUnit.MILLISECONDS);
                // 如果請求完成或者超時,則結束
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            // 釋放鎖
            lock.unlock();
        }
        // 如果沒有收到響應,則丟擲超時的異常
        if (!isDone()) {
            throw new TimeoutException(sent > 0,getTimeoutMessage(false));
        }
    }
    // 返回響應
    return returnFromResponse();
}

複製程式碼

該方法是實現了ResponseFuture定義的方法,是獲得該future對應的請求對應的響應結果,其實future、請求、響應都是一一對應的。其中如果還沒得到響應,則會執行緒阻塞等待,等到有響應結果或者超時,才返回。返回的邏輯在returnFromResponse中實現。

8.returnFromResponse

private Object returnFromResponse() throws RemotingException {
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    // 如果正常返回,則返回響應結果
    if (res.getStatus() == Response.OK) {
        return res.getResult();
    }
    // 如果超時,則丟擲超時異常
    if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT,res.getErrorMessage());
    }
    // 其他 丟擲RemotingException異常
    throw new RemotingException(channel,res.getErrorMessage());
}

複製程式碼

這程式碼跟invokeCallback方法中差不多,都是把響應分了三種情況。

9.cancel

public void cancel() {
    // 建立一個取消請求的響應
    Response errorResult = new Response(id);
    errorResult.setErrorMessage("request future has been canceled.");
    response = errorResult;
    // 從集合中刪除該請求
    FUTURES.remove(id);
    CHANNELS.remove(id);
}

複製程式碼

該方法是取消一個請求,可以直接關閉一個請求,也就是值建立一個響應來回應該請求,把response值設定到該請求對於到future中,做到了中斷請求的作用。該方法跟closeChannel的區別是closeChannel中對response的狀態設定了CHANNEL_INACTIVE,而cancel方法是中途被主動取消的,雖然有response值,但是並沒有一個響應狀態。

10.RemotingInvocationTimeoutScan

private static class RemotingInvocationTimeoutScan implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                for (DefaultFuture future : FUTURES.values()) {
                    // 已經完成,跳過掃描
                    if (future == null || future.isDone()) {
                        continue;
                    }
                    // 超時
                    if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                        // create exception response.,建立一個超時的響應
                        Response timeoutResponse = new Response(future.getId());
                        // set timeout status.,設定超時狀態,是服務端側超時還是客戶端側超時
                        timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                        // 設定錯誤資訊
                        timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                        // handle response.,接收建立的超時響應
                        DefaultFuture.received(future.getChannel(),timeoutResponse);
                    }
                }
                // 睡眠
                Thread.sleep(30);
            } catch (Throwable e) {
                logger.error("Exception when scan the timeout invocation of remoting.",e);
            }
        }
    }
}

複製程式碼

該方法是掃描呼叫超時任務的執行緒,每次都會遍歷future集合,檢測請求是否超時了,如果超時則建立一個超時響應來回應該請求。

static {
    // 開啟一個後臺掃描呼叫超時任務
    Thread th = new Thread(new RemotingInvocationTimeoutScan(),"DubboResponseTimeoutScanTimer");
    th.setDaemon(true);
    th.start();
}


複製程式碼

開啟一個後臺執行緒進行掃描的邏輯寫在了靜態程式碼塊裡面,只開啟一次。

(八)SimpleFuture

該類實現了ResponseFuture,目前沒有用到,很簡單的實現,我就不多說了。

(九)ExchangeHandler

該介面繼承了ChannelHandler,TelnetHandler介面,是資訊交換處理器介面。

public interface ExchangeHandler extends ChannelHandler,TelnetHandler {
    /**
     * reply.
     * 回覆請求結果
     * @param channel
     * @param request
     * @return response
     * @throws RemotingException
     */
    Object reply(ExchangeChannel channel,Object request) throws RemotingException;

}

複製程式碼

該介面只定義了一個回覆請求結果的方法,返回的是請求結果。

(十)ExchangeHandlerDispatcher

該類實現了ExchangeHandler介面, 是資訊交換處理器排程器類,也就是對應不同的事件,選擇不同的處理器去處理。該類中有三個屬性,分別對應了三種事件:

/**
 * 回覆者排程器
 */
private final ReplierDispatcher replierDispatcher;

/**
 * 通道處理器排程器
 */
private final ChannelHandlerDispatcher handlerDispatcher;

/**
 * Telnet 命令處理器
 */
private final TelnetHandler telnetHandler;

複製程式碼

如果事件是跟通道處理器有關的,就呼叫通道處理器來處理,比如:

@Override
@SuppressWarnings({"unchecked","rawtypes"})
public Object reply(ExchangeChannel channel,Object request) throws RemotingException {
    return ((Replier) replierDispatcher).reply(channel,request);
}

@Override
public void connected(Channel channel) {
    handlerDispatcher.connected(channel);
}
@Override
public String telnet(Channel channel,String message) throws RemotingException {
    return telnetHandler.telnet(channel,message);
}

複製程式碼

可以看到以上三種事件,回覆請求結果需要回復者排程器來處理,連線需要通道處理器排程器來處理,telnet訊息需要Telnet命令處理器來處理。

(十一)ExchangeHandlerAdapter

該類繼承了TelnetHandlerAdapter,實現了ExchangeHandler,是資訊交換處理器的介面卡類。

public abstract class ExchangeHandlerAdapter extends TelnetHandlerAdapter implements ExchangeHandler {

    @Override
    public Object reply(ExchangeChannel channel,Object msg) throws RemotingException {
        // 直接返回null
        return null;
    }

}

複製程式碼

該類直接讓ExchangeHandler定義的方法reply返回null,交由它的子類選擇性的去實現具體的回覆請求結果。

(十二)ExchangeServer

該介面繼承了Server介面,定義了兩個方法:

public interface ExchangeServer extends Server {

    /**
     * get channels.
     * 獲得通道集合
     * @return channels
     */
    Collection<ExchangeChannel> getExchangeChannels();

    /**
     * get channel.
     * 根據遠端地址獲得對應的資訊通道
     * @param remoteAddress
     * @return channel
     */
    ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);

}

複製程式碼

該介面比較好理解,並且在Server介面基礎上新定義了兩個方法。直接來看看它的實現類吧。

(十三)HeaderExchangeServer

該類實現了ExchangeServer介面,是基於協議頭的資訊交換伺服器實現類,HeaderExchangeServer是Server的裝飾器,每個實現方法都會呼叫server的方法。

1.屬性

protected final Logger logger = LoggerFactory.getLogger(getClass());

/**
 * 執行緒池
 */
private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,new NamedThreadFactory(
                "dubbo-remoting-server-heartbeat",true));
/**
 * 伺服器
 */
private final Server server;
// heartbeat timer
/**
 * 心跳定時器
 */
private ScheduledFuture<?> heartbeatTimer;
// heartbeat timeout (ms),won't execute a heartbeat.
/**
 * 心跳週期
 */
private int heartbeat;
/**
 * 心跳超時時間
 */
private int heartbeatTimeout;
/**
 * 資訊交換伺服器是否關閉
 */
private AtomicBoolean closed = new AtomicBoolean(false);

複製程式碼

該類裡面的很多實現跟HeaderExchangeClient差不多,包括心跳檢測等邏輯。看得懂上述我講的HeaderExchangeClient的屬性,想必這裡的屬性應該也很簡單了。

2.建構函式

public HeaderExchangeServer(Server server) {
    if (server == null) {
        throw new IllegalArgumentException("server == null");
    }
    this.server = server;
    //獲得心跳週期配置,如果沒有配置,預設設定為0
    this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY,0);
    // 獲得心跳超時配置,預設是心跳週期的三倍
    this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY,heartbeat * 3);
    // 如果心跳超時時間小於心跳週期的兩倍,則丟擲異常
    if (heartbeatTimeout < heartbeat * 2) {
        throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    }
    // 開始心跳
    startHeartbeatTimer();
}

public Server getServer() {
    return server;
}

複製程式碼

建構函式就是對屬性的設定,心跳的機制以及預設值都跟HeaderExchangeClient中的一模一樣。

3.isRunning

private boolean isRunning() {
    Collection<Channel> channels = getChannels();
    // 遍歷所有連線該伺服器的通道
    for (Channel channel : channels) {

        /**
         *  If there are any client connections,*  our server should be running.
         */

        // 只要有任何一個客戶端連線,則伺服器還執行著
        if (channel.isConnected()) {
            return true;
        }
    }
    return false;
}

複製程式碼

該方法是檢測伺服器是否還執行,只要有一個客戶端連線著,就算伺服器執行著。

4.close

@Override
public void close() {
    // 關閉執行緒池和心跳檢測
    doClose();
    // 關閉伺服器
    server.close();
}

@Override
public void close(final int timeout) {
    // 開始關閉
    startClose();
    if (timeout > 0) {
        final long max = (long) timeout;
        final long start = System.currentTimeMillis();
        if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY,true)) {
            // 傳送 READONLY_EVENT事件給所有連線該伺服器的客戶端,表示 Server 不可讀了。
            sendChannelReadOnlyEvent();
        }
        // 當伺服器還在執行,並且沒有超時,睡眠,也就是等待timeout左右時間在進行關閉
        while (HeaderExchangeServer.this.isRunning()
                && System.currentTimeMillis() - start < max) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                logger.warn(e.getMessage(),e);
            }
        }
    }
    // 關閉執行緒池和心跳檢測
    doClose();
    // 延遲關閉
    server.close(timeout);
}

複製程式碼

兩個close方法,第二個close方法是優雅的關閉,有一定的延時來讓一些響應或者操作做完。關閉分兩個步驟,第一個就是關閉資訊交換伺服器中的執行緒池和心跳檢測,然後才是關閉伺服器。

5.sendChannelReadOnlyEvent

private void sendChannelReadOnlyEvent() {
    // 建立一個READONLY_EVENT事件的請求
    Request request = new Request();
    request.setEvent(Request.READONLY_EVENT);
    // 不需要響應
    request.setTwoWay(false);
    // 設定版本
    request.setVersion(Version.getProtocolVersion());

    Collection<Channel> channels = getChannels();
    // 遍歷連線的通道,進行通知
    for (Channel channel : channels) {
        try {
            // 通過通道還連線著,則傳送通知
            if (channel.isConnected())
                channel.send(request,getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY,true));
        } catch (RemotingException e) {
            logger.warn("send cannot write message error.",e);
        }
    }
}

複製程式碼

在關閉伺服器中有一個操作就是傳送事件READONLY_EVENT,告訴客戶端該伺服器不可讀了,就是該方法實現的,逐個通知連線的客戶端該事件。

6.doClose

private void doClose() {
    if (!closed.compareAndSet(false,true)) {
        return;
    }
    // 停止心跳檢測
    stopHeartbeatTimer();
    try {
        // 關閉執行緒池
        scheduled.shutdown();
    } catch (Throwable t) {
        logger.warn(t.getMessage(),t);
    }
}

複製程式碼

該方法就是close方法呼叫到的停止心跳檢測和關閉執行緒池。

7.getExchangeChannels

@Override
public Collection<ExchangeChannel> getExchangeChannels() {
    Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
    // 獲得連線該伺服器通道集合
    Collection<Channel> channels = server.getChannels();
    if (channels != null && !channels.isEmpty()) {
        // 遍歷通道集合,為每個通道都建立資訊交換通道,並且加入資訊交換通道集合
        for (Channel channel : channels) {
            exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
        }
    }
    return exchangeChannels;
}

複製程式碼

該方法是返回連線該伺服器資訊交換通道集合。邏輯就是先獲得通道集合,在根據通道來建立資訊交換通道,然後返回資訊通道集合。

8.reset

@Override
public void reset(URL url) {
    // 重置屬性
    server.reset(url);
    try {
        // 重置的邏輯跟建構函式一樣設定
        if (url.hasParameter(Constants.HEARTBEAT_KEY)
                || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
            int h = url.getParameter(Constants.HEARTBEAT_KEY,heartbeat);
            int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY,h * 3);
            if (t < h * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            if (h != heartbeat || t != heartbeatTimeout) {
                heartbeat = h;
                heartbeatTimeout = t;
                // 重新開始心跳
                startHeartbeatTimer();
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(),t);
    }
}

複製程式碼

該方法就是重置屬性,重置後,重新開始心跳,設定心跳屬性的機制跟建構函式一樣。

9.startHeartbeatTimer

private void startHeartbeatTimer() {
    // 先停止現有的心跳檢測
    stopHeartbeatTimer();
    if (heartbeat > 0) {
        // 建立心跳定時器
        heartbeatTimer = scheduled.scheduleWithFixedDelay(
                new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                    @Override
                    public Collection<Channel> getChannels() {
                        // 返回一個不可修改的連線該伺服器的資訊交換通道集合
                        return Collections.unmodifiableCollection(
                                HeaderExchangeServer.this.getChannels());
                    }
                },TimeUnit.MILLISECONDS);
    }
}

複製程式碼

該方法是開始心跳,跟HeaderExchangeClient類中的開始心跳方法唯一區別是獲得的通道不一樣,客戶端跟通道是一一對應的,所有隻要對一個通道進行心跳檢測,而服務端跟通道是一對多的關係,所有需要對該伺服器連線的所有通道進行心跳檢測。

10.stopHeartbeatTimer

private void stopHeartbeatTimer() {
    if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
        try {
            // 取消定時器
            heartbeatTimer.cancel(true);
            // 取消大量已排隊任務,用於回收空間
            scheduled.purge();
        } catch (Throwable e) {
            if (logger.isWarnEnabled()) {
                logger.warn(e.getMessage(),e);
            }
        }
    }
    heartbeatTimer = null;
}

複製程式碼

該方法是停止當前的心跳檢測。

(十四)ExchangeServerDelegate

該類實現了ExchangeServer介面,是資訊交換伺服器裝飾者,是ExchangeServer的裝飾器。該類就一個屬性ExchangeServer server,所有實現方法都呼叫了server屬性的方法。目前只有在p2p中被用到,程式碼為就不貼了,很簡單。

(十五)Exchanger

@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    /**
     * bind.
     * 繫結一個伺服器
     * @param url 伺服器url
     * @param handler 資料交換處理器
     * @return message server 資料交換伺服器
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url,ExchangeHandler handler) throws RemotingException;

    /**
     * connect.
     * 連線一個伺服器,也就是建立一個客戶端
     * @param url 伺服器url
     * @param handler 資料交換處理器
     * @return message channel 返回資料交換客戶端
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url,ExchangeHandler handler) throws RemotingException;

}

複製程式碼

該介面是資料交換者介面,該介面是一個可擴充套件介面預設實現的是HeaderExchanger類,並且用到了dubbo SPI的Adaptive機制,優先實現url攜帶的配置。如果不瞭解dubbo SPI機制的可以看《dubbo原始碼解析(二)Dubbo擴充套件機制SPI》。那麼回到該介面定義的方法,定義了繫結和連線兩個方法,分別返回資訊互動伺服器和客戶端例項。

(十六)HeaderExchanger

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url,ExchangeHandler handler) throws RemotingException {
        // 用傳輸層連線返回的client 建立對應的資訊交換客戶端,預設開啟心跳檢測
        return new HeaderExchangeClient(Transporters.connect(url,new DecodeHandler(new HeaderExchangeHandler(handler))),true);
    }

    @Override
    public ExchangeServer bind(URL url,ExchangeHandler handler) throws RemotingException {
        // 用傳輸層繫結返回的server 建立對應的資訊交換服務端
        return new HeaderExchangeServer(Transporters.bind(url,new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

複製程式碼

該類繼承了Exchanger介面,是Exchanger介面的預設實現,實現了Exchanger介面定義的兩個方法,分別呼叫的是Transporters的連線和繫結方法,再利用這這兩個方法返回的客戶端和服務端例項來建立資訊交換的客戶端和服務端。

(十七)Replier

我們知道Request對應的是ExchangeHandler介面實現物件來處理,但有些時候我們需要不同資料型別對應不同的處理器,該類就是為了支援這一需求所設計的。

public interface Replier<T> {

    /**
     * reply.
     * 回覆請求結果
     * @param channel
     * @param request
     * @return response
     * @throws RemotingException
     */
    Object reply(ExchangeChannel channel,T request) throws RemotingException;

}

複製程式碼

可以看到該介面跟ExchangeHandler定義的方法也一一,只有請求的型別改為了範型。

(十八)ReplierDispatcher

該類實現了Replier介面,是回覆者排程器實現類。

/**
 * 預設回覆者
 */
private final Replier<?> defaultReplier;

/**
 * 回覆者集合
 */
private final Map<Class<?>,Replier<?>> repliers = new ConcurrentHashMap<Class<?>,Replier<?>>();

複製程式碼

這是該類的兩個屬性,快取了回覆者集合和預設的回覆者。

/**
 * 從回覆者集合中找到該型別的回覆者,並且返回
 * @param type
 * @return
 */
private Replier<?> getReplier(Class<?> type) {
    for (Map.Entry<Class<?>,Replier<?>> entry : repliers.entrySet()) {
        if (entry.getKey().isAssignableFrom(type)) {
            return entry.getValue();
        }
    }
    if (defaultReplier != null) {
        return defaultReplier;
    }
    throw new IllegalStateException("Replier not found,Unsupported message object: " + type);
}

/**
 * 回覆請求
 * @param channel
 * @param request
 * @return
 * @throws RemotingException
 */
@Override
@SuppressWarnings({"unchecked",Object request) throws RemotingException {
    return ((Replier) getReplier(request.getClass())).reply(channel,request);
}

複製程式碼

上述是該類中關鍵的兩個方法,reply還是呼叫實現類的reply。根據請求的資料型別來使用指定的回覆者進行回覆。

(十九)MultiMessage

該類實現了實現 Iterable 介面,是多訊息的封裝,我們直接看它的屬性:

/**
 * 訊息集合
 */
private final List messages = new ArrayList();

複製程式碼

該類要和《dubbo原始碼解析(九)遠端通訊——Transport層》的(八)MultiMessageHandler聯合著看。

(二十)HeartbeatHandler

該類繼承了AbstractChannelHandlerDelegate類,是心跳處理器。是用來處理心跳事件的,也接收訊息上增加了對心跳訊息的處理。該類是

@Override
public void received(Channel channel,Object message) throws RemotingException {
    // 設定接收時間的時間戳屬性值
    setReadTimestamp(channel);
    // 如果是心跳請求
    if (isHeartbeatRequest(message)) {
        Request req = (Request) message;
        // 如果需要響應
        if (req.isTwoWay()) {
            // 建立一個響應
            Response res = new Response(req.getId(),req.getVersion());
            // 設定為心跳事件的響應
            res.setEvent(Response.HEARTBEAT_EVENT);
            // 傳送訊息,也就是返回響應
            channel.send(res);
            if (logger.isInfoEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY,0);
                if (logger.isDebugEnabled()) {
                    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                            + ",cause: The channel has no data-transmission exceeds a heartbeat period"
                            + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                }
            }
        }
        return;
    }
    // 如果是心跳響應,則直接return
    if (isHeartbeatResponse(message)) {
        if (logger.isDebugEnabled()) {
            logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
        }
        return;
    }
    handler.received(channel,message);
}

複製程式碼

該方法是就是在handler處理訊息上增加了處理心跳訊息的功能,做到了功能增強。

(二十一)Exchangers

該類跟Transporters的設計意圖是一樣的,Transporters我在《dubbo原始碼解析(八)遠端通訊——開篇》的(十)Transporters已經講到了。Exchangers也用到了外觀模式。程式碼為就不貼了,可以對照著Transporters來看,很簡單。

(二十二)Request

請求模型類,最重要的肯定是模型的屬性,我們來看看屬性:

/**
 * 心跳事件
 */
public static final String HEARTBEAT_EVENT = null;

/**
 * 只讀事件
 */
public static final String READONLY_EVENT = "R";

/**
 * 請求編號自增序列
 */
private static final AtomicLong INVOKE_ID = new AtomicLong(0);

/**
 * 請求編號
 */
private final long mId;

/**
 * dubbo版本
 */
private String mVersion;

/**
 * 是否需要響應
 */
private boolean mTwoWay = true;

/**
 * 是否是事件
 */
private boolean mEvent = false;

/**
 * 是否是異常的請求
 */
private boolean mBroken = false;

/**
 * 請求資料
 */
private Object mData;

複製程式碼
  1. 由於心跳事件比較常用,所有設定為null。
  2. 請求編號使用INVOKE_ID生成,是JVM 程式內唯一的。
  3. 其他屬性比較簡單

(二十三)Response

響應模型,來看看它的屬性:

/**
 * 心跳事件
 */
public static final String HEARTBEAT_EVENT = null;

/**
 * 只讀事件
 */
public static final String READONLY_EVENT = "R";

/**
 * ok.
 * 成功狀態碼
 */
public static final byte OK = 20;

/**
 * clien side timeout.
 * 客戶端側的超時狀態碼
 */
public static final byte CLIENT_TIMEOUT = 30;

/**
 * server side timeout.
 * 服務端側超時的狀態碼
 */
public static final byte SERVER_TIMEOUT = 31;

/**
 * channel inactive,directly return the unfinished requests.
 * 通道不活躍,返回未完成請求的狀態碼
 */
public static final byte CHANNEL_INACTIVE = 35;

/**
 * request format error.
 * 請求格式錯誤狀態碼
 */
public static final byte BAD_REQUEST = 40;

/**
 * response format error.
 * 響應格式錯誤狀態碼
 */
public static final byte BAD_RESPONSE = 50;

/**
 * service not found.
 * 服務找不到狀態碼
 */
public static final byte SERVICE_NOT_FOUND = 60;

/**
 * service error.
 * 服務錯誤狀態碼
 */
public static final byte SERVICE_ERROR = 70;

/**
 * internal server error.
 * 內部伺服器錯誤狀態碼
 */
public static final byte SERVER_ERROR = 80;

/**
 * internal server error.
 * 客戶端錯誤狀態碼
 */
public static final byte CLIENT_ERROR = 90;

/**
 * server side threadpool exhausted and quick return.
 * 伺服器端執行緒池耗盡並快速返回狀態碼
 */
public static final byte SERVER_THREADPOOL_EXHAUSTED_ERROR = 100;

/**
 * 響應編號
 */
private long mId = 0;

/**
 * dubbo 版本
 */
private String mVersion;

/**
 * 狀態
 */
private byte mStatus = OK;

/**
 * 是否是事件
 */
private boolean mEvent = false;

/**
 * 錯誤資訊
 */
private String mErrorMsg;

/**
 * 返回結果
 */
private Object mResult;

複製程式碼

很多屬性跟Request模型的屬性一樣,並且含義也一樣,不過該模型多了很多的狀態碼。關鍵的是id跟請求一一對應。

(二十四)ResponseCallback

public interface ResponseCallback {

    /**
     * done.
     * 處理請求
     * @param response
     */
    void done(Object response);

    /**
     * caught exception.
     * 處理異常
     * @param exception
     */
    void caught(Throwable exception);

}

複製程式碼

該介面是回撥的介面,定義了兩個方法,分別是處理正常的響應結果和處理異常。

(二十五)ExchangeCodec

該類繼承了TelnetCodec,是資訊交換編解碼器。在本文的開頭,我就寫到,dubbo將一條訊息分成了協議頭和協議體,用來解決粘包拆包問題,但是頭跟體在編解碼上有區別,我們先來看看dubbo 的協議頭的配置:

dubbo_protocol_header

上圖是官方檔案的圖片,能夠清晰的看出協議中各個資料所佔的位數:

  1. 0-7位和8-15位:Magic High和Magic Low,類似java位元組碼檔案裡的魔數,用來判斷是不是dubbo協議的資料包,就是一個固定的數字
  2. 16位:Req/Res:請求還是響應標識。
  3. 17位:2way:單向還是雙向
  4. 18位:Event:是否是事件
  5. 19-23位:Serialization 編號
  6. 24-31位:status狀態
  7. 32-95位:id編號
  8. 96-127位:body資料
  9. 128-…位:上圖表格內的資料

可以看到一個該協議中前65位是協議頭,後面的都是協議體資料。那麼在編解碼中,協議頭是通過 Codec 編解碼,而body部分是用Serialization序列化和反序列化的。下面我們就來看看該類對協議頭的編解碼。

1.屬性

// header length.
/**
 * 協議頭長度:16位元組 = 128Bits
 */
protected static final int HEADER_LENGTH = 16;
// magic header.
/**
 * MAGIC二進位制:1101101010111011,十進位制:55995
 */
protected static final short MAGIC = (short) 0xdabb;
/**
 * Magic High,也就是0-7位:11011010
 */
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
/**
 * Magic Low  8-15位 :10111011
 */
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
// message flag.
/**
 * 128 二進位制:10000000
 */
protected static final byte FLAG_REQUEST = (byte) 0x80;
/**
 * 64 二進位制:1000000
 */
protected static final byte FLAG_TWOWAY = (byte) 0x40;
/**
 * 32 二進位制:100000
 */
protected static final byte FLAG_EVENT = (byte) 0x20;
/**
 * 31 二進位制:11111
 */
protected static final int SERIALIZATION_MASK = 0x1f;

複製程式碼

可以看到 MAGIC是個固定的值,用來判斷是不是dubbo協議的資料包,並且MAGIC_LOW和MAGIC_HIGH分別是MAGIC的低位和高位。其他的屬性用來幹嘛後面會講到。

2.encode

@Override
public void encode(Channel channel,ChannelBuffer buffer,Object msg) throws IOException {
    if (msg instanceof Request) {
        // 如果訊息是Request型別,對請求訊息編碼
        encodeRequest(channel,buffer,(Request) msg);
    } else if (msg instanceof Response) {
        // 如果訊息是Response型別,對響應訊息編碼
        encodeResponse(channel,(Response) msg);
    } else {
        // 直接讓父類( Telnet ) 處理,目前是 Telnet 命令的結果。
        super.encode(channel,msg);
    }
}

複製程式碼

該方法是根據訊息的型別來分別進行編碼,分為三種情況:Request型別、Response型別以及其他

3.encodeRequest

protected void encodeRequest(Channel channel,Request req) throws IOException {
    Serialization serialization = getSerialization(channel);
    // header.
    // 建立16位元組的位元組陣列
    byte[] header = new byte[HEADER_LENGTH];
    // set magic number.
    // 設定前16位資料,也就是設定header[0]和header[1]的資料為Magic High和Magic Low
    Bytes.short2bytes(MAGIC,header);

    // set request and serialization flag.
    // 16-23位為serialization編號,用到或運算10000000|serialization編號,例如serialization編號為11111,則為00011111
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

    // 繼續上面的例子,00011111|1000000 = 01011111
    if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
    // 繼續上面的例子,01011111|100000 = 011 11111 可以看到011代表請求標記、雙向、是事件,這樣就設定了16、17、18位,後面19-23位是Serialization 編號
    if (req.isEvent()) header[2] |= FLAG_EVENT;

    // set request id.
    // 設定32-95位請求id
    Bytes.long2bytes(req.getId(),header,4);

    // encode request data.
    // // 編碼 `Request.data` 到 Body ,並寫入到 Buffer
    int savedWriteIndex = buffer.writerIndex();
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    // 對body資料序列化
    ObjectOutput out = serialization.serialize(channel.getUrl(),bos);
    // 如果該請求是事件
    if (req.isEvent()) {
        // 特殊事件編碼
        encodeEventData(channel,out,req.getData());
    } else {
        // 正常請求編碼
        encodeRequestData(channel,req.getData(),req.getVersion());
    }
    // 釋放資源
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    int len = bos.writtenBytes();
    //檢驗訊息長度
    checkPayload(channel,len);
    // 設定96-127位:Body值
    Bytes.int2bytes(len,12);

    // write
    // 把header寫入到buffer
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

複製程式碼

該方法是對Request型別的訊息進行編碼,仔細閱讀上述我寫的註解,結合協議頭各個位數的含義,好好品味我舉的例子。享受二進位制位運算帶來的快樂,也可以看到前半部分邏輯是對協議頭的編碼,後面還有對body值的序列化。

4.encodeResponse

protected void encodeRequest(Channel channel,12);

    // write
    // 把header寫入到buffer
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

protected void encodeResponse(Channel channel,Response res) throws IOException {
    int savedWriteIndex = buffer.writerIndex();
    try {
        Serialization serialization = getSerialization(channel);
        // header.
        // 建立16位元組大小的位元組陣列
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.
        // 設定前0-15位為魔數
        Bytes.short2bytes(MAGIC,header);
        // set request and serialization flag.
        // 設定響應標誌和序列化id
        header[2] = serialization.getContentTypeId();
        // 如果是心跳事件,則設定第18位為事件
        if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
        // set response status.
        // 設定24-31位為狀態碼
        byte status = res.getStatus();
        header[3] = status;
        // set request id.
        // 設定32-95位為請求id
        Bytes.long2bytes(res.getId(),4);

        // 寫入資料
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 對body進行序列化
        ObjectOutput out = serialization.serialize(channel.getUrl(),bos);
        // encode response data or error message.
        if (status == Response.OK) {
            if (res.isHeartbeat()) {
                // 對心跳事件編碼
                encodeHeartbeatData(channel,res.getResult());
            } else {
                // 對普通響應編碼
                encodeResponseData(channel,res.getResult(),res.getVersion());
            }
        } else out.writeUTF(res.getErrorMessage());
        // 釋放
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();

        int len = bos.writtenBytes();
        checkPayload(channel,len);
        Bytes.int2bytes(len,12);
        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    } catch (Throwable t) {
        // clear buffer
        buffer.writerIndex(savedWriteIndex);
        // send error message to Consumer,otherwise,Consumer will wait till timeout.
        //如果在寫入資料失敗,則返回響應格式錯誤的返回碼
        if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
            Response r = new Response(res.getId(),res.getVersion());
            r.setStatus(Response.BAD_RESPONSE);

            if (t instanceof ExceedPayloadLimitException) {
                logger.warn(t.getMessage(),t);
                try {
                    r.setErrorMessage(t.getMessage());
                    // 傳送響應
                    channel.send(r);
                    return;
                } catch (RemotingException e) {
                    logger.warn("Failed to send bad_response info back: " + t.getMessage() + ",cause: " + e.getMessage(),e);
                }
            } else {
                // FIXME log error message in Codec and handle in caught() of IoHanndler?
                logger.warn("Fail to encode response: " + res + ",send bad_response info instead,t);
                try {
                    r.setErrorMessage("Failed to send response: " + res + ",cause: " + StringUtils.toString(t));
                    channel.send(r);
                    return;
                } catch (RemotingException e) {
                    logger.warn("Failed to send bad_response info back: " + res + ",e);
                }
            }
        }

        // Rethrow exception
        if (t instanceof IOException) {
            throw (IOException) t;
        } else if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        } else if (t instanceof Error) {
            throw (Error) t;
        } else {
            throw new RuntimeException(t.getMessage(),t);
        }
    }
}

複製程式碼

該方法是對Response型別的訊息進行編碼,該方法裡面我沒有舉例子演示如何進行編碼,不過過程跟encodeRequest類似。

5.decode

@Override
public Object decode(Channel channel,ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    // 讀取前16位元組的協議頭資料,如果資料不滿16位元組,則讀取全部
    byte[] header = new byte[Math.min(readable,HEADER_LENGTH)];
    buffer.readBytes(header);
    // 解碼
    return decode(channel,readable,header);
}

@Override
protected Object decode(Channel channel,int readable,byte[] header) throws IOException {
    // check magic number.
    // 核對魔數(該數字固定)
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        // 將 buffer 完全複製到 `header` 陣列中
        if (header.length < readable) {
            header = Bytes.copyOf(header,readable);
            buffer.readBytes(header,length,readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header,i);
                break;
            }
        }
        return super.decode(channel,header);
    }
    // check length.
    // Header 長度不夠,返回需要更多的輸入,解決拆包現象
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    int len = Bytes.bytes2int(header,12);
    // 檢查資訊頭長度
    checkPayload(channel,len);

    int tt = len + HEADER_LENGTH;
    // 總長度不夠,返回需要更多的輸入,解決拆包現象
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer,len);

    try {
        // 對body反序列化
        return decodeBody(channel,is,header);
    } finally {
        // 如果不可用
        if (is.available() > 0) {
            try {
                // 列印錯誤日誌
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                // 跳過未讀完的流
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(),e);
            }
        }
    }
}

複製程式碼

該方法就是解碼前的一些核對過程,包括檢測是否為dubbo協議,是否有拆包現象等,具體的解碼在decodeBody方法。

6.decodeBody

protected Object decodeBody(Channel channel,InputStream is,byte[] header) throws IOException {
    // 用並運運算元
    byte flag = header[2],proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    // 獲得請求id
    long id = Bytes.bytes2long(header,4);
    // 如果第16位為0,則說明是響應
    if ((flag & FLAG_REQUEST) == 0) {
        // decode response.
        Response res = new Response(id);
        // 如果第18位不是0,則說明是心跳事件
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(Response.HEARTBEAT_EVENT);
        }
        // get status.
        byte status = header[3];
        res.setStatus(status);
        try {
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(),proto);
            // 如果響應是成功的
            if (status == Response.OK) {
                Object data;
                if (res.isHeartbeat()) {
                    // 如果是心跳事件,則心跳事件的解碼
                    data = decodeHeartbeatData(channel,in);
                } else if (res.isEvent()) {
                    // 如果是事件,則事件的解碼
                    data = decodeEventData(channel,in);
                } else {
                    // 否則執行普通解碼
                    data = decodeResponseData(channel,in,getRequestData(id));
                }
                // 重新設定響應結果
                res.setResult(data);
            } else {
                res.setErrorMessage(in.readUTF());
            }
        } catch (Throwable t) {
            res.setStatus(Response.CLIENT_ERROR);
            res.setErrorMessage(StringUtils.toString(t));
        }
        return res;
    } else {
        // decode request.
        // 對請求型別解碼
        Request req = new Request(id);
        // 設定版本號
        req.setVersion(Version.getProtocolVersion());
        // 如果第17位不為0,則是雙向
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        // 如果18位不為0,則是心跳事件
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            // 反序列化
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(),proto);
            Object data;
            if (req.isHeartbeat()) {
                // 如果請求是心跳事件,則心跳事件解碼
                data = decodeHeartbeatData(channel,in);
            } else if (req.isEvent()) {
                // 如果是事件,則事件解碼
                data = decodeEventData(channel,in);
            } else {
                // 否則,用普通解碼
                data = decodeRequestData(channel,in);
            }
            // 把重新設定請求資料
            req.setData(data);
        } catch (Throwable t) {
            // bad request
            // 設定是異常請求
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}

複製程式碼

該方法就是解碼的過程,並且對協議頭和協議體分開解碼,協議頭編碼是做或運算,而解碼則是做並運算,協議體用反序列化的方式解碼,同樣也是分為了Request型別、Response型別進行解碼。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的原始碼解,其中關鍵的是設計了Request和Response模型,整個資訊交換都圍繞這兩大模型,並且設計了dubbo協議,解決拆包粘包問題,在資訊交換中協議頭攜帶的資訊起到了關鍵作用,也滿足了rpc呼叫的一些需求。下一篇我會講解遠端通訊的buffer部分。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見。