1. 程式人生 > 程式設計 >Dubbo原始碼解析(十六)遠端通訊——Netty3

Dubbo原始碼解析(十六)遠端通訊——Netty3

遠端通訊——Netty3

目標:介紹基於netty3的來實現的遠端通訊、介紹dubbo-remoting-netty內的原始碼解析。

前言

現在dubbo預設的網路傳輸Transport介面預設實現的還是基於netty3實現的網路傳輸,不過馬上後面預設實現就要改為netty4了。由於netty4對netty3對相容性不是很好,所以保留了兩個版本的實現。

下面是包結構:

netty3包結構

原始碼分析

(一)NettyChannel

該類繼承了AbstractChannel類,是基於netty3實現的通道。

1.屬性

/**
 * 通道集合
 */
private static final ConcurrentMap<org.jboss.netty.channel.Channel,NettyChannel> channelMap = new
ConcurrentHashMap<org.jboss.netty.channel.Channel,NettyChannel>(); /** * 通道 */ private final org.jboss.netty.channel.Channel channel; /** * 屬性集合 */ private final Map<String,Object> attributes = new ConcurrentHashMap<String,Object>(); 複製程式碼

2.getOrAddChannel

static NettyChannel getOrAddChannel
(org.jboss.netty.channel.Channel ch,URL url,ChannelHandler handler)
{ if (ch == null) { return null; } // 首先從集合中取通道 NettyChannel ret = channelMap.get(ch); // 如果為空,則新建 if (ret == null) { NettyChannel nc = new NettyChannel(ch,url,handler); // 如果通道連線著 if
(ch.isConnected()) { // 加入集合 ret = channelMap.putIfAbsent(ch,nc); } if (ret == null) { ret = nc; } } return ret; } 複製程式碼

該方法是獲得通道,當通道在集合中沒有的時候,新建一個通道。

3.removeChannelIfDisconnected

static void removeChannelIfDisconnected(org.jboss.netty.channel.Channel ch) {
    if (ch != null && !ch.isConnected()) {
        channelMap.remove(ch);
    }
}
複製程式碼

該方法是當通道沒有連線的時候,從集合中移除它。

4.send

@Override
public void send(Object message,boolean sent) throws RemotingException {
    super.send(message,sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 寫入資料,傳送訊息
        ChannelFuture future = channel.write(message);
        // 如果已經傳送過
        if (sent) {
            // 獲得超時時間
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            // 等待timeout的連線時間後檢視是否傳送成功
            success = future.await(timeout);
        }
        // 看是否有異常
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress() + ",cause: " + e.getMessage(),e);
    }

    if (!success) {
        throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}
複製程式碼

該方法是傳送訊息,其中用到了channe.write方法傳輸訊息,並且通過返回的future來判斷是否傳送成功。

5.close

@Override
public void close() {
    try {
        super.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        // 如果通道斷開,則移除該通道
        removeChannelIfDisconnected(channel);
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        // 清空屬性
        attributes.clear();
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        if (logger.isInfoEnabled()) {
            logger.info("Close netty channel " + channel);
        }
        // 關閉通道
        channel.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
}
複製程式碼

該方法是關閉通道,做了三個操作,分別是從集合中移除、清除屬性、關閉通道。

其他實現方法比較簡單,我就講解了。

(二)NettyHandler

該類繼承了SimpleChannelHandler類,是基於netty3的通道處理器,而該類被加上了@Sharable註解,也就是說該處理器可以從屬於多個ChannelPipeline

1.屬性

/**
 * 通道集合,key是主機地址 ip:port
 */
private final Map<String,Channel> channels = new ConcurrentHashMap<String,Channel>(); // <ip:port,channel>

/**
 * url物件
 */
private final URL url;

/**
 * 通道
 */
private final ChannelHandler handler;
複製程式碼

該類的屬性比較簡單,並且該類中實現的方法都是呼叫了屬性handler的方法,我舉一個例子來講,其他的可以自己檢視原始碼,比較簡單。

@Override
public void channelConnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception {
    // 獲得通道例項
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(),handler);
    try {
        if (channel != null) {
            // 儲存該通道,加入到集合中
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()),channel);
        }
        // 連線
        handler.connected(channel);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}
複製程式碼

該方法是通道連線的方法,其中先獲取了通道例項,然後吧該例項加入到集合中,最好帶哦用handler.connected來進行連線。

(三)NettyClient

該類繼承了AbstractClient,是基於netty3實現的客戶端類。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

// ChannelFactory's closure has a DirectMemory leak,using static to avoid
// https://issues.jboss.org/browse/NETTY-424
/**
 * 通道工廠,用static來避免直接快取區的一個OOM問題
 */
private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss",true)),Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker",Constants.DEFAULT_IO_THREADS);
/**
 * 客戶端引導物件
 */
private ClientBootstrap bootstrap;

/**
 * 通道
 */
private volatile Channel channel; // volatile,please copy reference to use
複製程式碼

上述屬性中ChannelFactory用了static修飾,為了避免netty3中會有直接緩衝記憶體洩漏的現象,具體的討論可以訪問註釋中的討論。

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 設定日誌工廠
    NettyHelper.setNettyLoggerFactory();
    // 例項化客戶端引導類
    bootstrap = new ClientBootstrap(channelFactory);
    // config
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    // 配置選擇項
    bootstrap.setOption("keepAlive",true);
    bootstrap.setOption("tcpNoDelay",true);
    bootstrap.setOption("connectTimeoutMillis",getConnectTimeout());
    // 建立通道處理器
    final NettyHandler nettyHandler = new NettyHandler(getUrl(),this);
    // 設定責任鏈路
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        /**
         * 獲得通道
         * @return
         */
        @Override
        public ChannelPipeline getPipeline() {
            // 新建編解碼
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),getUrl(),NettyClient.this);
            // 獲得管道
            ChannelPipeline pipeline = Channels.pipeline();
            // 設定解碼器
            pipeline.addLast("decoder",adapter.getDecoder());
            // 設定編碼器
            pipeline.addLast("encoder",adapter.getEncoder());
            // 設定通道處理器
            pipeline.addLast("handler",nettyHandler);
            // 返回通道
            return pipeline;
        }
    });
}
複製程式碼

該方法是建立客戶端,並且開啟,其中的邏輯就是用netty3的客戶端引導類來建立一個客戶端,如果對netty不熟悉的朋友可以先補補netty知識。

3.doConnect

@Override
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    // 用引導類連線
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try {
        // 在超時時間內是否連線完成
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(),TimeUnit.MILLISECONDS);

        if (ret && future.isSuccess()) {
            // 獲得通道
            Channel newChannel = future.getChannel();
            // 非同步修改此通道
            newChannel.setInterestOps(Channel.OP_READ_WRITE);
            try {
                // Close old channel 關閉舊的通道
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        // 關閉
                        oldChannel.close();
                    } finally {
                        // 移除通道
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                // 如果客戶端關閉
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ",because the client closed.");
                        }
                        // 關閉通道
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.getCause() != null) {
            throw new RemotingException(this,"client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + ",error message is:" + future.getCause().getMessage(),future.getCause());
        } else {
            throw new RemotingException(this,"client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        // 如果客戶端沒有連線
        if (!isConnected()) {
            // 取消future
            future.cancel();
        }
    }
}
複製程式碼

該方法是客戶端連線伺服器的方法。其中呼叫了bootstrap.connect。後面的邏輯是用來檢測是否連線,最後如果未連線,則會取消該連線任務。

4.doClose

@Override
protected void doClose() throws Throwable {
    /*try {
        bootstrap.releaseExternalResources();
    } catch (Throwable t) {
        logger.warn(t.getMessage());
    }*/
}
複製程式碼

在這裡不能關閉是因為channelFactory 是靜態屬性,被多個 NettyClient 共用。所以不能釋放資源。

(四)NettyServer

該類繼承了AbstractServer,實現了Server,是基於netty3實現的伺服器類。

1.屬性

/**
 * 連線該伺服器的通道集合
 */
private Map<String,Channel> channels; // <ip:port,channel>

/**
 * 伺服器引導類物件
 */
private ServerBootstrap bootstrap;

/**
 * 通道
 */
private org.jboss.netty.channel.Channel channel;
複製程式碼

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 設定日誌工廠
    NettyHelper.setNettyLoggerFactory();
    // 建立執行緒池
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss",true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker",true));
    // 新建通道工廠
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss,worker,getUrl().getPositiveParameter(Constants.IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS));
    // 新建服務引導類物件
    bootstrap = new ServerBootstrap(channelFactory);

    // 新建通道處理器
    final NettyHandler nettyHandler = new NettyHandler(getUrl(),this);
    // 獲得通道集合
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer",true));
    // 禁用nagle演演算法,將資料立即傳送出去。納格演演算法是以減少封包傳送量來增進TCP/IP網路的效能
    bootstrap.setOption("child.tcpNoDelay",true);
    // 設定管道工廠
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        /**
         * 獲得通道
         * @return
         */
        @Override
        public ChannelPipeline getPipeline() {
            // 新建編解碼器
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),NettyServer.this);
            // 獲得通道
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer",new IdleStateHandler(timer,idleTimeout / 1000,0));
            }*/
            // 設定解碼器
            pipeline.addLast("decoder",nettyHandler);
            // 返回通道
            return pipeline;
        }
    });
    // bind 繫結地址,也就是啟用伺服器
    channel = bootstrap.bind(getBindAddress());
}
複製程式碼

該方法是建立伺服器,並且開啟伺服器。同樣建立伺服器的方式跟正常的用netty建立伺服器方式一樣,只是新加了編碼器和解碼器。還有一個注意點就是這裡ServerBootstrap 的可選項。

3.doClose

@Override
protected void doClose() throws Throwable {
    try {
        if (channel != null) {
            // unbind.關閉通道
            channel.close();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        // 獲得所有連線該伺服器的通道集合
        Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
        if (channels != null && !channels.isEmpty()) {
            // 遍歷通道集合
            for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                try {
                    // 關閉通道連線
                    channel.close();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(),e);
                }
            }
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        if (bootstrap != null) {
            // release external resource. 回收資源
            bootstrap.releaseExternalResources();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        if (channels != null) {
            // 清空集合
            channels.clear();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(),e);
    }
}
複製程式碼

該方法是關閉伺服器,一系列的操作很清晰,我就不多說了。

4.getChannels

@Override
public Collection<Channel> getChannels() {
    Collection<Channel> chs = new HashSet<Channel>();
    for (Channel channel : this.channels.values()) {
        // 如果通道連線,則加入集合,返回
        if (channel.isConnected()) {
            chs.add(channel);
        } else {
            channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
        }
    }
    return chs;
}
複製程式碼

該方法是返回連線該伺服器的通道集合,並且用了HashSet儲存,不會重複。

(五)NettyTransporter

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url,ChannelHandler listener) throws RemotingException {
        // 建立一個NettyServer
        return new NettyServer(url,listener);
    }

    @Override
    public Client connect(URL url,ChannelHandler listener) throws RemotingException {
        // 建立一個NettyClient
        return new NettyClient(url,listener);
    }

}
複製程式碼

該類就是基於netty3的Transporter實現類,同樣兩個方法也是分別建立了NettyServer和NettyClient。

(六)NettyHelper

該類是設定日誌的工具類,其中基於netty3的InternalLoggerFactory實現類一個DubboLoggerFactory。這個我就不講解了,比較好理解,不理解也無傷大雅。

(七)NettyCodecAdapter

該類是基於netty3實現的編解碼類。

1.屬性

/**
 * 編碼者
 */
private final ChannelHandler encoder = new InternalEncoder();

/**
 * 解碼者
 */
private final ChannelHandler decoder = new InternalDecoder();

/**
 * 編解碼器
 */
private final Codec2 codec;

/**
 * url物件
 */
private final URL url;

/**
 * 緩衝區大小
 */
private final int bufferSize;

/**
 * 通道物件
 */
private final com.alibaba.dubbo.remoting.ChannelHandler handler;

複製程式碼

InternalEncoder和InternalDecoder屬性是該類的內部類,分別掌管著編碼和解碼

2.構造方法

public NettyCodecAdapter(Codec2 codec,com.alibaba.dubbo.remoting.ChannelHandler handler) {
    this.codec = codec;
    this.url = url;
    this.handler = handler;
    int b = url.getPositiveParameter(Constants.BUFFER_KEY,Constants.DEFAULT_BUFFER_SIZE);
    // 如果快取區大小在16位元組以內,則設定配置大小,如果不是,則設定8位元組的緩衝區大小
    this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}

複製程式碼

你會發現對於快取區大小的規則都是一樣的。

3.InternalEncoder

@Sharable
private class InternalEncoder extends OneToOneEncoder {

    @Override
    protected Object encode(ChannelHandlerContext ctx,Channel ch,Object msg) throws Exception {
        // 動態分配一個1k的緩衝區
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
        // 獲得通道物件
        NettyChannel channel = NettyChannel.getOrAddChannel(ch,handler);
        try {
            // 編碼
            codec.encode(channel,buffer,msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ch);
        }
        // 基於buteBuffer建立一個緩衝區,並且寫入資料
        return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
    }
}

複製程式碼

該內部類實現類編碼的邏輯,主要呼叫了codec.encode。

4.InternalDecoder

private class InternalDecoder extends SimpleChannelUpstreamHandler {

    private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
            com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

    @Override
    public void messageReceived(ChannelHandlerContext ctx,MessageEvent event) throws Exception {
        Object o = event.getMessage();
        // 如果訊息不是一個ChannelBuffer型別
        if (!(o instanceof ChannelBuffer)) {
            // 轉發事件到與此上下文關聯的處理程式最近的上游
            ctx.sendUpstream(event);
            return;
        }

        ChannelBuffer input = (ChannelBuffer) o;
        // 如果可讀資料不大於0,直接返回
        int readable = input.readableBytes();
        if (readable <= 0) {
            return;
        }

        com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
        if (buffer.readable()) {
            // 判斷buffer是否是動態分配的緩衝區
            if (buffer instanceof DynamicChannelBuffer) {
                // 寫入資料
                buffer.writeBytes(input.toByteBuffer());
                message = buffer;
            } else {
                // 需要的緩衝區大小
                int size = buffer.readableBytes() + input.readableBytes();
                // 動態生成緩衝區
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                        size > bufferSize ? size : bufferSize);
                // 把buffer資料寫入message
                message.writeBytes(buffer,buffer.readableBytes());
                // 把input資料寫入message
                message.writeBytes(input.toByteBuffer());
            }
        } else {
            // 否則 基於ByteBuffer通過buffer來建立一個新的緩衝區
            message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                    input.toByteBuffer());
        }

        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(),handler);
        Object msg;
        int saveReaderIndex;

        try {
            // decode object.
            do {
                saveReaderIndex = message.readerIndex();
                try {
                    // 解碼
                    msg = codec.decode(channel,message);
                } catch (IOException e) {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                    throw e;
                }
                // 拆包
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    message.readerIndex(saveReaderIndex);
                    break;
                } else {
                    // 如果已經到達讀索引,則沒有資料可解碼
                    if (saveReaderIndex == message.readerIndex()) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw new IOException("Decode without read data.");
                    }
                    //
                    if (msg != null) {
                        // 將訊息傳送到指定關聯的處理程式最近的上游
                        Channels.fireMessageReceived(ctx,msg,event.getRemoteAddress());
                    }
                }
            } while (message.readable());
        } finally {
            // 如果訊息還有可讀資料,則丟棄
            if (message.readable()) {
                message.discardReadBytes();
                buffer = message;
            } else {
                buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
            }
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,ExceptionEvent e) throws Exception {
        ctx.sendUpstream(e);
    }
}

複製程式碼

該內部類實現瞭解碼的邏輯,其中大部分邏輯都在對資料做讀寫,關鍵的解碼呼叫了codec.decode。

(八)NettyBackedChannelBufferFactory

該類是建立緩衝區的工廠類。它實現了ChannelBufferFactory介面,也就是實現類它的三種獲得緩衝區的方法。

public class NettyBackedChannelBufferFactory implements ChannelBufferFactory {

    /**
     * 單例
     */
    private static final NettyBackedChannelBufferFactory INSTANCE = new NettyBackedChannelBufferFactory();

    public static ChannelBufferFactory getInstance() {
        return INSTANCE;
    }


    @Override
    public ChannelBuffer getBuffer(int capacity) {
        return new NettyBackedChannelBuffer(ChannelBuffers.dynamicBuffer(capacity));
    }


    @Override
    public ChannelBuffer getBuffer(byte[] array,int offset,int length) {
        org.jboss.netty.buffer.ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(length);
        buffer.writeBytes(array,offset,length);
        return new NettyBackedChannelBuffer(buffer);
    }


    @Override
    public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
        return new NettyBackedChannelBuffer(ChannelBuffers.wrappedBuffer(nioBuffer));
    }
}

複製程式碼

可以看到,都是建立了一個NettyBackedChannelBuffer,下面講解NettyBackedChannelBuffer。

(九)NettyBackedChannelBuffer

該類是基於netty3的buffer重新實現的緩衝區,它實現了ChannelBuffer介面,並且有一個屬性:

private org.jboss.netty.buffer.ChannelBuffer buffer;

複製程式碼

那麼其中的幾乎所有方法都是呼叫了這個buffer的方法,因為我在dubbo原始碼解析(十一)遠端通訊——Buffer中寫到ChannelBuffer介面方法定義跟netty中的緩衝區定義幾乎一樣,連註釋都幾乎一樣。所有知識單純的呼叫了buffer的方法。具體的程式碼可以檢視我的GitHub

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了基於netty3的來實現的遠端通訊、介紹dubbo-remoting-netty內的原始碼解析,關鍵需要對netty有所瞭解。下一篇我會講解基於netty4實現遠端通訊部分。