1. 程式人生 > 程式設計 >Dubbo原始碼解析(十五)遠端通訊——Mina

Dubbo原始碼解析(十五)遠端通訊——Mina

遠端通訊——Mina

目標:介紹基於Mina的來實現的遠端通訊、介紹dubbo-remoting-mina內的原始碼解析。

前言

Apache MINA是一個網路應用程式框架,可幫助使用者輕鬆開發高效能和高可擴充套件性的網路應用程式。它通過Java NIO在各種傳輸(如TCP / IP和UDP / IP)上提供抽象的事件驅動非同步API。它通常被稱為NIO框架庫、客戶端伺服器框架庫或者網路套接字型檔。那麼本問就要講解在dubbo專案中,基於mina的API實現服務端和客戶端來完成遠端通訊這件事情。

下面是mina實現的包結構:

mina包結構

原始碼分析

(一)MinaChannel

該類繼承了AbstractChannel,是基於mina實現的通道。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);

/**
 * 通道的key
 */
private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL";

/**
 * mina中的一個控制程式碼,表示兩個端點之間的連線,與傳輸型別無關
 */
private final IoSession session;
複製程式碼

該類的屬性除了封裝了一個CHANNEL_KEY以外,還用到了mina中的IoSession,它封裝著一個連線所需要的方法,比如獲得遠端地址等。

2.getOrAddChannel

static MinaChannel getOrAddChannel(IoSession session,URL url,ChannelHandler handler) {
    // 如果連線session為空,則返回空
    if (session == null) {
        return null;
    }
    // 獲得MinaChannel例項
    MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
    // 如果不存在,則建立
    if (ret == null
) { // 建立一個MinaChannel例項 ret = new MinaChannel(session,url,handler); // 如果兩個端點連線 if (session.isConnected()) { // 把新建立的MinaChannel新增到session 中 MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY,ret); // 如果屬性的舊值不為空,則重新設定舊值 if (old != null) { session.setAttribute(CHANNEL_KEY,old); ret = old; } } } return ret; } 複製程式碼

該方法是一個獲得MinaChannel物件的方法,其中每一個MinaChannel都會被放在session的屬性值中。

3.removeChannelIfDisconnected

static void removeChannelIfDisconnected(IoSession session) {
    if (session != null && !session.isConnected()) {
        session.removeAttribute(CHANNEL_KEY);
    }
}
複製程式碼

該方法是當沒有連線時移除該通道,比較簡單。

4.send

@Override
public void send(Object message,boolean sent) throws RemotingException {
    super.send(message,sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 傳送訊息,返回future
        WriteFuture future = session.write(message);
        // 如果已經傳送過了
        if (sent) {
            // 獲得延遲時間
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            // 等待timeout的連線時間後檢視是否傳送成功
            success = future.join(timeout);
        }
    } catch (Throwable e) {
        throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress() + ",cause: " + e.getMessage(),e);
    }

    if (!success) {
        throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}
複製程式碼

該方法是最關鍵的傳送訊息,其中呼叫到了session的write方法,就是mina封裝的傳送訊息。並且根據返回的WriteFuture物件來判斷是否傳送成功。

(二)MinaHandler

該類繼承了IoHandlerAdapter,是通道處理器實現類,其中就是mina專案中IoHandler介面的幾個 方法。

/**
 * url物件
 */
private final URL url;

/**
 * 通道處理器物件
 */
private final ChannelHandler handler;
複製程式碼

該類有兩個屬性,上述提到的實現IoHandler介面方法都是呼叫了handler來實現的,我就舉例講一個,其他的都一樣的寫法:

@Override
public void sessionOpened(IoSession session) throws Exception {
    // 獲得MinaChannel物件
    MinaChannel channel = MinaChannel.getOrAddChannel(session,handler);
    try {
        // 呼叫接連該通道
        handler.connected(channel);
    } finally {
        // 如果沒有連線則移除通道
        MinaChannel.removeChannelIfDisconnected(session);
    }
}
複製程式碼

該方法在IoHandler中叫做sessionOpened,其實就是連線方法,所以呼叫的是handler.connected。其他方法也一樣,請自行檢視。

(三)MinaClient

該類繼承了AbstractClient類,是基於mina實現的客戶端類。

1.屬性

/**
 * 套接字連線集合
 */
private static final Map<String,SocketConnector> connectors = new ConcurrentHashMap<String,SocketConnector>();

/**
 * 連線的key
 */
private String connectorKey;
/**
 * 套接字連線者
 */
private SocketConnector connector;

/**
 * 一個控制程式碼
 */
private volatile IoSession session; // volatile,please copy reference to use
複製程式碼

該類中的屬性都跟mina專案中封裝類有關係。

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 用url來作為key
    connectorKey = getUrl().toFullString();
    // 先從集合中取套接字連線
    SocketConnector c = connectors.get(connectorKey);
    if (c != null) {
        connector = c;
        // 如果為空
    } else {
        // set thread pool. 設定執行緒池
        connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker",true)));
        // config 獲得套接字連線配置
        SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
        cfg.setThreadModel(ThreadModel.MANUAL);
        // 啟用TCP_NODELAY
        cfg.getSessionConfig().setTcpNoDelay(true);
        // 啟用SO_KEEPALIVE
        cfg.getSessionConfig().setKeepAlive(true);
        int timeout = getConnectTimeout();
        // 設定連線超時時間
        cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
        // set codec.
        // 設定編解碼器
        connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(),getUrl(),this)));
        // 加入集合
        connectors.put(connectorKey,connector);
    }
}
複製程式碼

該方法是開啟客戶端,在mina中用套接字連線者connector來表示。其中的操作就是新建一個connector,並且設定相應的屬性,然後加入集合。

3.doConnect

@Override
protected void doConnect() throws Throwable {
    // 連線伺服器
    ConnectFuture future = connector.connect(getConnectAddress(),new MinaHandler(getUrl(),this));
    long start = System.currentTimeMillis();
    final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
    // 用於對執行緒的阻塞和喚醒
    final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
    // 加入監聽器
    future.addListener(new IoFutureListener() {
        @Override
        public void operationComplete(IoFuture future) {
            try {
                // 如果已經讀完了
                if (future.isReady()) {
                    // 建立獲得該連線的IoSession例項
                    IoSession newSession = future.getSession();
                    try {
                        // Close old channel 關閉舊的session
                        IoSession oldSession = MinaClient.this.session; // copy reference
                        if (oldSession != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
                                }
                                // 關閉連線
                                oldSession.close();
                            } finally {
                                // 移除通道
                                MinaChannel.removeChannelIfDisconnected(oldSession);
                            }
                        }
                    } finally {
                        // 如果MinaClient關閉了
                        if (MinaClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new mina channel " + newSession + ",because the client closed.");
                                }
                                // 關閉session
                                newSession.close();
                            } finally {
                                MinaClient.this.session = null;
                                MinaChannel.removeChannelIfDisconnected(newSession);
                            }
                        } else {
                            // 設定新的session
                            MinaClient.this.session = newSession;
                        }
                    }
                }
            } catch (Exception e) {
                exception.set(e);
            } finally {
                // 減少數量,釋放所有等待的執行緒
                finish.countDown();
            }
        }
    });
    try {
        // 當前執行緒等待,直到鎖存器倒計數到零,除非執行緒被中斷,或者指定的等待時間過去
        finish.await(getConnectTimeout(),TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        throw new RemotingException(this,"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
                + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
                + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
                + Version.getVersion() + ",e);
    }
    Throwable e = exception.get();
    if (e != null) {
        throw e;
    }
}
複製程式碼

該方法是客戶端連線伺服器的實現方法。其中用到了CountDownLatch來代表完成完成事件,它來做一個執行緒等待,直到1個執行緒完成上述的動作,也就是連線完成結束,才釋放等待的執行緒。保證每次只有一條執行緒去連線,解決future.awaitUninterruptibly()死鎖問題。

其他方法請自行檢視我寫的註釋。

(四)MinaServer

該類繼承了AbstractServer,是基於mina實現的服務端實現類。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);

/**
 * 套接字接收者物件
 */
private SocketAcceptor acceptor;
複製程式碼

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // set thread pool.
    // 建立套接字接收者物件
    acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS),Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker",true)));
    // config
    // 設定配置
    SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig();
    cfg.setThreadModel(ThreadModel.MANUAL);
    // set codec. 設定編解碼器
    acceptor.getFilterChain().addLast("codec",this)));

    // 開啟伺服器
    acceptor.bind(getBindAddress(),this));
}
複製程式碼

該方法是建立伺服器,並且開啟伺服器。關鍵就是呼叫了acceptor的方法。

3.doClose

@Override
protected void doClose() throws Throwable {
    try {
        if (acceptor != null) {
            // 取消繫結,也就是關閉伺服器
            acceptor.unbind(getBindAddress());
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(),e);
    }
}
複製程式碼

該方法是關閉伺服器,就是呼叫了acceptor.unbind方法。

4.getChannels

@Override
public Collection<Channel> getChannels() {
    // 獲得連線到該伺服器到所有連線控制程式碼
    Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
    Collection<Channel> channels = new HashSet<Channel>();
    for (IoSession session : sessions) {
        if (session.isConnected()) {
            // 每次都用一個連線控制程式碼建立一個通道
            channels.add(MinaChannel.getOrAddChannel(session,this));
        }
    }
    return channels;
}
複製程式碼

該方法是獲得所有連線該伺服器的通道。

5.getChannel

@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
    // 獲得連線到該伺服器到所有連線控制程式碼
    Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
    // 遍歷所有控制程式碼,找到要找的通道
    for (IoSession session : sessions) {
        if (session.getRemoteAddress().equals(remoteAddress)) {
            return MinaChannel.getOrAddChannel(session,this);
        }
    }
    return null;
}
複製程式碼

該方法是獲得地址對應的單個通道。

(五)MinaTransporter

public class MinaTransporter implements Transporter {

    public static final String NAME = "mina";

    @Override
    public Server bind(URL url,ChannelHandler handler) throws RemotingException {
        // 建立MinaServer例項
        return new MinaServer(url,handler);
    }

    @Override
    public Client connect(URL url,ChannelHandler handler) throws RemotingException {
        // 建立MinaClient例項
        return new MinaClient(url,handler);
    }

}
複製程式碼

該類實現了Transporter介面,是基於mina的傳輸層實現。可以看到,bind和connect方法分別就是建立了MinaServer和MinaClient例項。這裡我建議檢視一下《dubbo原始碼解析(九)遠端通訊——Transport層》。

(六)MinaCodecAdapter

該類是基於mina實現的編解碼類,實現了ProtocolCodecFactory。

1.屬性

/**
 * 編碼物件
 */
private final ProtocolEncoder encoder = new InternalEncoder();

/**
 * 解碼物件
 */
private final ProtocolDecoder decoder = new InternalDecoder();

/**
 * 編解碼器
 */
private final Codec2 codec;

/**
 * url物件
 */
private final URL url;

/**
 * 通道處理器物件
 */
private final ChannelHandler handler;

/**
 * 緩衝區大小
 */
private final int bufferSize;
複製程式碼

屬性比較好理解,該編解碼器用到了ProtocolEncoder和ProtocolDecoder,而InternalEncoder和InternalDecoder兩個類是該類的內部類,它們實現了ProtocolEncoder和ProtocolDecoder,關鍵的編解碼邏輯在這兩個類中實現。

2.構造方法

public MinaCodecAdapter(Codec2 codec,ChannelHandler handler) {
    this.codec = codec;
    this.url = url;
    this.handler = handler;
    int b = url.getPositiveParameter(Constants.BUFFER_KEY,Constants.DEFAULT_BUFFER_SIZE);
    // 如果快取區大小在16位元組以內,則設定配置大小,如果不是,則設定8位元組的緩衝區大小
    this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}

複製程式碼

3.InternalEncoder

private class InternalEncoder implements ProtocolEncoder {

    @Override
    public void dispose(IoSession session) throws Exception {
    }

    @Override
    public void encode(IoSession session,Object msg,ProtocolEncoderOutput out) throws Exception {
        // 動態分配一個1k的緩衝區
        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
        // 獲得通道
        MinaChannel channel = MinaChannel.getOrAddChannel(session,handler);
        try {
            // 編碼
            codec.encode(channel,buffer,msg);
        } finally {
            // 檢測是否斷開連線,如果斷開,則移除
            MinaChannel.removeChannelIfDisconnected(session);
        }
        // 寫資料到out中
        out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
        out.flush();
    }
}

複製程式碼

該內部類是編碼類,其中的encode方法中寫到了編碼核心呼叫的是codec.encode。

4.InternalDecoder

private class InternalDecoder implements ProtocolDecoder {

    private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;

    @Override
    public void decode(IoSession session,ByteBuffer in,ProtocolDecoderOutput out) throws Exception {
        int readable = in.limit();
        if (readable <= 0) return;

        ChannelBuffer frame;

        // 如果緩衝區還有可讀位元組數
        if (buffer.readable()) {
            // 如果緩衝區是DynamicChannelBuffer型別的
            if (buffer instanceof DynamicChannelBuffer) {
                // 往buffer中寫入資料
                buffer.writeBytes(in.buf());
                frame = buffer;
            } else {
                // 緩衝區大小
                int size = buffer.readableBytes() + in.remaining();
                // 動態分配一個緩衝區
                frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
                // buffer的資料把寫到frame
                frame.writeBytes(buffer,buffer.readableBytes());
                // 把流中的資料寫到frame
                frame.writeBytes(in.buf());
            }
        } else {
            // 否則是基於Java NIO的ByteBuffer生成的緩衝區
            frame = ChannelBuffers.wrappedBuffer(in.buf());
        }

        // 獲得通道
        Channel channel = MinaChannel.getOrAddChannel(session,handler);
        Object msg;
        int savedReadIndex;

        try {
            do {
                // 獲得讀索引
                savedReadIndex = frame.readerIndex();
                try {
                    // 解碼
                    msg = codec.decode(channel,frame);
                } catch (Exception e) {
                    buffer = ChannelBuffers.EMPTY_BUFFER;
                    throw e;
                }
                // 拆包
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    frame.readerIndex(savedReadIndex);
                    break;
                } else {
                    if (savedReadIndex == frame.readerIndex()) {
                        buffer = ChannelBuffers.EMPTY_BUFFER;
                        throw new Exception("Decode without read data.");
                    }
                    if (msg != null) {
                        // 把資料寫到輸出流裡面
                        out.write(msg);
                    }
                }
            } while (frame.readable());
        } finally {
            // 如果frame還有可讀資料
            if (frame.readable()) {
                //丟棄可讀資料
                frame.discardReadBytes();
                buffer = frame;
            } else {
                buffer = ChannelBuffers.EMPTY_BUFFER;
            }
            MinaChannel.removeChannelIfDisconnected(session);
        }
    }

    @Override
    public void dispose(IoSession session) throws Exception {
    }

    @Override
    public void finishDecode(IoSession session,ProtocolDecoderOutput out) throws Exception {
    }
}

複製程式碼

該內部類是解碼類,其中decode方法中關鍵的是呼叫了codec.decode,其餘的操作是利用緩衝區對資料的沖刷流轉。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了基於mina的來實現的遠端通訊、介紹dubbo-remoting-mina內的原始碼解析,關鍵需要對mina有所瞭解。下一篇我會講解基於netty3實現遠端通訊部分。