1. 程式人生 > >Netty若干示例程式碼

Netty若干示例程式碼

文章目錄

Netty概述

丟棄伺服器

詳細程式碼:test-netty4-discard

丟棄伺服器,就是將收到的所有資料都丟掉,不做任何處理

DiscardServerHandler

package org.zln.test.netty4.discard;

@Slf4j
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception { //丟棄收到的資料 ((ByteBuf) msg).release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 出現異常時關閉連線 ctx.close(); } }

DiscardServerHandler繼承自ChannelInboundHandlerAdapter,

ChannelInboundHandlerAdapter實現了ChannelInboundHandler介面。

ChannelInboundHandler提供了很多事件處理方法

  • channelRead

當服務端收到新資料的時候,channelRead方法被呼叫

收到的訊息的型別是 ByteBuf,它是一個引用計數物件,必須顯示呼叫release方法來釋放。

一般在channelRead中處理的程式碼形如


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
	try {
	// Do something with msg
	} finally {
       ReferenceCountUtil.release(msg);
	}
}
  • exceptionCaught

當服務端產生Throwable物件後,就會呼叫exceptionCaught

在大部分情況下,捕獲的異常應該被記錄下來並且把關聯的 channel 給關閉掉。

然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現,

比如你可能想在關閉連線之前傳送一個錯誤碼的響應訊息。

DiscardServer

package org.zln.test.netty4.discard;

public class DiscardServer {
    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new DiscardServer(port).run();
    }

    private void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel)
                            throws Exception {
                            socketChannel.pipeline()
                                .addLast(new DiscardServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

NioEventLoopGroup:用於處理I/O操作的多執行緒事件迴圈器

bossGroup:接收客戶端連線

workerGroup:處理已經接收到的連線

​ 一旦boss接收到連線,就會把連線資訊註冊到worker中

ServerBootstrap:用於啟動NIO服務的輔助類

測試

telnet localhost 8080

目前在DiscardServerHandler上,是直接丟棄的,沒有打印出來,所以我們現在加一個列印

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        System.out.println(in.toString(CharsetUtil.UTF_8));
    }

在toString中已經做了release動作,所以不需要再次手工釋放資源

詳細程式碼:test-netty4-discard-demo2

應答伺服器

詳細程式碼:test-netty4-ask

將客戶端的請求訊息原樣返回

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);//寫入後,內部已經實現了訊息資源的釋放
        ctx.flush();
    }

或者

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.writeAndFlush(msg);//寫入後,內部已經實現了訊息資源的釋放
    }

時間伺服器

時間伺服器

在與客戶端建立連線時,就傳送時間訊息

TimeServerHandler

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        final ChannelFuture f = ctx.writeAndFlush(time);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
        cause.printStackTrace();
//        出現異常時關閉連線
        ctx.close();
    }
}

final ByteBuf time = ctx.alloc().buffer(4);

分配指定大小的緩衝

因為要寫入32位蒸熟,所以分配4個位元組大小的緩衝區

為什麼不需要flip操作?

傳統NIO緩衝區,因為只有一個位置索引,所以在寫完後,如果想要讀區,需要執行一次flip操作,將位置指標設定到頭部。

Netty提供的ByteBuf緩衝區物件,有讀寫兩個指標,執行寫的時候只是寫的指標索引增加,讀指標位置索引不變,所以不需要flip操作。

ChannelFuture

表示一個還未傳送的I/O事件

對ChannelFuture新增監聽,可以得知當前I/O操作的具體執行情況

addListener

新增一個事件監聽。

如果直接ctx.close();的話,由於writeAndFlush是非同步的,會出現還沒寫完連線就被關閉的情況。

所以要在監聽到寫完成的事件後再執行close操作

ctx.close()

連線關閉也不是立馬生效的,其也是返回一個ChannelFuture物件

另一種簡單的監聽寫法為

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        final ChannelFuture f = ctx.writeAndFlush(time);
        f.addListener(ChannelFutureListener.CLOSE);
    }

內建的ChannelFutureListener.CLOSE其實和我們自己對ChannelFutureListener內容是完全一樣的

TimeClient

public class TimeClient {

    private static final String HOST = "localhost";
    private static final int PORT = 8080;

    public static void main(String[] args) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            ChannelFuture f = b.connect(HOST, PORT).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  • 比較客戶端與服務的啟動類的若干不同

1、只有worker沒有boss

2、使用NioSocketChannel,不是NioServerSocketChannel

3、不需要childOption。因為客戶端的SocketChannel沒有父類

4、使用connect,不是bind

TimeClientHandler

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
//        出現異常時關閉連線
        ctx.close();
    }
}

客戶端的Handler就比較簡單了,就不細說了

整個流程是這樣子的

1、客戶端發起連線

2、服務端channelActive監聽到連線,傳送時間資料

3、客戶端channelRead接收到服務端請求,列印資料

漏洞

這裡其實是有一個漏洞的,就是我們其實是希望一條完整的訊息大小為4個位元組,可實際情況是不一定的。

也就是會函式TCP/IP協議的粘包與拆包的問題

使用物件傳遞日期

完整程式碼

我們需要處理的資料,往往是有一個結構的,一般會封裝到一個物件中,

而資料在網路中實際傳輸的時候,肯定是以位元組的形式的。

如果我們在程式碼上想要直接處理物件,那麼就需要編寫解碼器和編碼器。

編碼器:將傳送的物件轉化為位元組

解碼器:將收到的位元組轉化為物件

  • UnixTime
public class UnixTime {
    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }

}

我們把時間戳封裝在UnixTime物件中

  • TimeDecoder:解碼器
public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        out.add(new UnixTime(in.readUnsignedInt()));
    }
}
  • TimeEncoder:編碼器
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {

    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out)
        throws Exception {
        out.writeInt((int) msg.value());
    }
}
  • TimeServerHandler

使用了編碼器後,我們可以直接傳送物件

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

  • TimeClientHandler

使用瞭解碼器後,直接轉化成物件

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UnixTime m = (UnixTime) msg;
        System.out.println("收到:" + m);
        ctx.close();
    }

  • 配置

編碼器和解碼器編寫完後,需要配置到啟動類中

服務端配置編碼器:socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler());

客戶端配置解碼器:ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());

聊天應用

詳細程式碼

ChatServerHandler

@Slf4j
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        }
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
        }
        channels.remove(ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming) {
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + msg + "\n");
            } else {
                channel.writeAndFlush("[響應]" + msg + "\n");
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("ChatClient:" + incoming.remoteAddress() + "線上");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("ChatClient:" + incoming.remoteAddress() + "掉線");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("ChatClient:" + incoming.remoteAddress() + "異常"); // 當出現異常就關閉連線
        cause.printStackTrace();
        ctx.close();
    }


}

ChatServerInitializer

public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast