1. 程式人生 > 程式設計 >Dubbo原始碼解析(十七)遠端通訊——Netty4

Dubbo原始碼解析(十七)遠端通訊——Netty4

遠端通訊——Netty4

目標:介紹基於netty4的來實現的遠端通訊、介紹dubbo-remoting-netty4內的原始碼解析。

前言

netty4對netty3相容性不是很好,並且netty4在很多的術語和api也發生了改變,導致升級netty4會很艱辛,網上應該有很多相關文章,高版本的總有高版本的優勢所在,所以dubbo也需要與時俱進,又新增了基於netty4來實現遠端通訊模組。下面講解的,如果跟上一篇文章有重複的地方我就略過去了。關鍵還是要把遠端通訊的api那幾篇看懂,看這幾篇實現才會很簡單。

下面是包的結構:

dubbo-remoting-netty4包結構

原始碼分析

(一)NettyChannel

該類繼承了AbstractChannel,是基於netty4的通道實現類

1.屬性

/**
 * 通道集合
 */
private static final ConcurrentMap<Channel,NettyChannel> channelMap = new ConcurrentHashMap<Channel,NettyChannel>();

/**
 * 通道
 */
private final Channel channel;

/**
 * 屬性集合
 */
private final Map<String,Object> attributes = new ConcurrentHashMap<String,Object>();
複製程式碼

屬性跟netty3實現的通道類屬性幾乎一樣,我就不講解了。

2.getOrAddChannel

static NettyChannel getOrAddChannel(Channel ch,URL url,ChannelHandler handler) {
    if (ch == null) {
        return null;
    }
    // 首先從集合中取通道
    NettyChannel ret = channelMap.get(ch);
    // 如果為空,則新建
    if (ret == null) {
        NettyChannel nettyChannel = new
NettyChannel(ch,url,handler); // 如果通道還活躍著 if (ch.isActive()) { // 加入集合 ret = channelMap.putIfAbsent(ch,nettyChannel); } if (ret == null) { ret = nettyChannel; } } return ret; } 複製程式碼

該方法是獲得通道,如果集合中沒有找到對應通道,則建立一個,然後加入集合。

3.send

@Override
public void send(Object message,boolean sent) throws RemotingException {
    super.send(message,sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 寫入資料,傳送訊息
        ChannelFuture future = channel.writeAndFlush(message);
        // 如果已經傳送過
        if (sent) {
            // 獲得超時時間
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            // 等待timeout的連線時間後檢視是否傳送成功
            success = future.await(timeout);
        }
        // 獲得異常
        Throwable cause = future.cause();
        // 如果異常不為空,則丟擲異常
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress() + ",cause: " + e.getMessage(),e);
    }

    if (!success) {
        throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}
複製程式碼

該方法是傳送訊息,呼叫了channel.writeAndFlush方法,與netty3的實現只是呼叫的api不同。

4.close

@Override
public void close() {
    try {
        super.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        // 移除通道
        removeChannelIfDisconnected(channel);
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        // 清理屬性集合
        attributes.clear();
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
    try {
        if (logger.isInfoEnabled()) {
            logger.info("Close netty channel " + channel);
        }
        // 關閉通道
        channel.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(),e);
    }
}
複製程式碼

該方法就是操作了四個步驟,比較清晰。

(二)NettyClientHandler

該類繼承了ChannelDuplexHandler,是基於netty4實現的客戶端通道處理實現類。這裡的設計與netty3實現的通道處理器有所不同,netty3實現的通道處理器是被客戶端和服務端統一使用的,而在這裡服務端和客戶端使用了兩個不同的Handler來處理。並且netty3的NettyHandler是基於netty3的SimpleChannelHandler設計的,而這裡是基於netty4的ChannelDuplexHandler。

/**
 * url物件
 */
private final URL url;

/**
 * 通道
 */
private final ChannelHandler handler;
複製程式碼

該類的屬性只有兩個,下面實現的方法也都是呼叫了handler的方法,我就舉一個例子:

@Override
public void disconnect(ChannelHandlerContext ctx,ChannelPromise future)
        throws Exception {
    // 獲得通道
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),handler);
    try {
        // 斷開連線
        handler.disconnected(channel);
    } finally {
        // 從集合中移除
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}
複製程式碼

可以看到分了三部,獲得通道物件,呼叫handler方法,最後檢測一下通道是否活躍。其他方法也是差不多。

(三)NettyServerHandler

該類繼承了ChannelDuplexHandler,是基於netty4實現的服務端通道處理實現類。

/**
 * 連線該伺服器的通道數 key為ip:port
 */
private final Map<String,Channel> channels = new ConcurrentHashMap<String,Channel>(); // <ip:port,channel>

/**
 * url物件
 */
private final URL url;

/**
 * 通道處理器
 */
private final ChannelHandler handler;
複製程式碼

該類有三個屬性,比NettyClientHandler多了一個屬性channels,下面的實現方法也是一樣的,都是呼叫了handler方法,來看一個例子:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 啟用事件
    ctx.fireChannelActive();

    // 獲得通道
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),handler);
    try {
        // 如果通道不為空,則加入集合中
        if (channel != null) {
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()),channel);
        }
        // 連線該通道
        handler.connected(channel);
    } finally {
        // 如果通道不活躍,則移除通道
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}
複製程式碼

該方法是通道活躍的時候呼叫了handler.connected,差不多也是常規套路,就多了啟用事件和加入到通道中。其他方法也差不多。

(四)NettyClient

該類繼承了AbstractClient,是基於netty4實現的客戶端實現類。

1.屬性

/**
 * NioEventLoopGroup物件
 */
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS,new DefaultThreadFactory("NettyClientWorker",true));

/**
 * 客戶端引導類
 */
private Bootstrap bootstrap;

/**
 * 通道
 */
private volatile Channel channel; // volatile,please copy reference to use
複製程式碼

屬性裡的NioEventLoopGroup物件是netty4中的物件,什麼用處請看netty的解析。

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 建立一個客戶端的通道處理器
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(),this);
    // 建立一個引導類
    bootstrap = new Bootstrap();
    // 設定可選項
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE,true)
            .option(ChannelOption.TCP_NODELAY,true)
            .option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,getTimeout())
            .channel(NioSocketChannel.class);

    // 如果連線超時時間小於3s,則設定為3s,也就是說最低的超時時間為3s
    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,getConnectTimeout());
    }

    // 建立一個客戶端
    bootstrap.handler(new ChannelInitializer() {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            // 編解碼器
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),getUrl(),NettyClient.this);
            // 加入責任鏈
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder",adapter.getDecoder())
                    .addLast("encoder",adapter.getEncoder())
                    .addLast("handler",nettyClientHandler);
        }
    });
}
複製程式碼

該方法還是做了建立客戶端,並且開啟的操作,其中很多的引數設定操作。

其他方法跟 dubbo原始碼解析(十六)遠端通訊——Netty3中寫到的NettyClient實現一樣。

(五)NettyServer

該類繼承了AbstractServer,實現了Server。是基於netty4實現的伺服器類

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);


/**
 * 連線該伺服器的通道集合 key為ip:port
 */
private Map<String,Channel> channels; // <ip:port,channel>

/**
 * 伺服器引導類
 */
private ServerBootstrap bootstrap;

/**
 * 通道
 */
private io.netty.channel.Channel channel;

/**
 * boss執行緒組
 */
private EventLoopGroup bossGroup;
/**
 * worker執行緒組
 */
private EventLoopGroup workerGroup;
複製程式碼

屬性相較netty3而言,新增了兩個執行緒組,同樣也是因為netty3和netty4的設計不同。

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 建立服務引導類
    bootstrap = new ServerBootstrap();

    // 建立boss執行緒組
    bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory("NettyServerBoss",true));
    // 建立worker執行緒組
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker",true));

    // 建立伺服器處理器
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);
    // 獲得通道集合
    channels = nettyServerHandler.getChannels();

    // 設定ventLoopGroup還有可選項
    bootstrap.group(bossGroup,workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY,Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR,Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 編解碼器
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),NettyServer.this);
                    // 增加責任鏈
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder",adapter.getDecoder())
                            .addLast("encoder",adapter.getEncoder())
                            .addLast("handler",nettyServerHandler);
                }
            });
    // bind 繫結
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    // 等待繫結完成
    channelFuture.syncUninterruptibly();
    // 設定通道
    channel = channelFuture.channel();

}
複製程式碼

該方法是建立伺服器,並且開啟。如果熟悉netty4點朋友應該覺得還是很好理解的。其他方法跟《 dubbo原始碼解析(十六)遠端通訊——Netty3》中寫到的NettyClient實現一樣,處理close中要多關閉兩個執行緒組

(六)NettyTransporter

該類跟 《dubbo原始碼解析(十六)遠端通訊——Netty3》中的NettyTransporter一樣的實現。

(七)NettyCodecAdapter

該類是基於netty4的編解碼器。

1.屬性

/**
 * 編碼器
 */
private final ChannelHandler encoder = new InternalEncoder();

/**
 * 解碼器
 */
private final ChannelHandler decoder = new InternalDecoder();

/**
 * 編解碼器
 */
private final Codec2 codec;

/**
 * url物件
 */
private final URL url;

/**
 * 通道處理器
 */
private final com.alibaba.dubbo.remoting.ChannelHandler handler;
複製程式碼

屬性跟基於netty3實現的編解碼一樣。

2.InternalEncoder

private class InternalEncoder extends MessageToByteEncoder {

    @Override
    protected void encode(ChannelHandlerContext ctx,Object msg,ByteBuf out) throws Exception {
        // 建立緩衝區
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
        // 獲得通道
        Channel ch = ctx.channel();
        // 獲得netty通道
        NettyChannel channel = NettyChannel.getOrAddChannel(ch,handler);
        try {
            // 編碼
            codec.encode(channel,buffer,msg);
        } finally {
            // 檢測通道是否活躍
            NettyChannel.removeChannelIfDisconnected(ch);
        }
    }
}
複製程式碼

該內部類是編碼器的抽象,主要的編碼還是呼叫了codec.encode。

3.InternalDecoder

private class InternalDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx,ByteBuf input,List<Object> out) throws Exception {

        // 建立緩衝區
        ChannelBuffer message = new NettyBackedChannelBuffer(input);

        // 獲得通道
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),handler);

        Object msg;

        int saveReaderIndex;

        try {
            // decode object.
            do {
                // 記錄讀索引
                saveReaderIndex = message.readerIndex();
                try {
                    // 解碼
                    msg = codec.decode(channel,message);
                } catch (IOException e) {
                    throw e;
                }
                // 拆包
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    message.readerIndex(saveReaderIndex);
                    break;
                } else {
                    //is it possible to go here ?
                    if (saveReaderIndex == message.readerIndex()) {
                        throw new IOException("Decode without read data.");
                    }
                    // 讀取資料
                    if (msg != null) {
                        out.add(msg);
                    }
                }
            } while (message.readable());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
}
複製程式碼

該內部類是解碼器的抽象類,其中關鍵的是呼叫了codec.decode。

(八)NettyBackedChannelBuffer

該類是緩衝區類。

/**
 * 緩衝區
 */
private ByteBuf buffer;
複製程式碼

其中的方法幾乎都呼叫了該屬性的方法。而ByteBuf是netty4中的位元組資料的容器。

(九)FormattingTuple和MessageFormatter

這兩個類是用於用於格式化的,是從netty4中複製出來的,其中並且稍微做了一下改動。我就不再講解了。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了基於netty4的來實現的遠端通訊、介紹dubbo-remoting-netty4內的原始碼解析,關鍵需要對netty4有所瞭解。下一篇我會講解基於zookeeper實現遠端通訊部分。