1. 程式人生 > >netty自定義編碼器和解碼器(粘包處理)

netty自定義編碼器和解碼器(粘包處理)

這裡的實現方式是:將訊息分為兩部分,也就是訊息頭和訊息尾,訊息頭中寫入要傳送資料的總長度,通常是在訊息頭的第一個欄位使用int值來標識傳送資料的長度。 首先我們寫一個Encoder,我們繼承自MessageToByteEncoder ,把物件轉換成byte,繼承這個物件,會要求我們實現一個encode方法:

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    byte[] body = convertToBytes(msg);  //將物件轉換為byte,虛擬碼,具體用什麼進行序列化,你們自行選擇。可以使用我上面說的一些
int dataLength = body.length; //讀取訊息的長度 out.writeInt(dataLength); //先將訊息長度寫入,也就是訊息頭 out.writeBytes(body); //訊息體中包含我們要傳送的資料 }

那麼當我們在Decode的時候,該怎麼處理髮送過來的資料呢?這裡我們繼承ByteToMessageDecoder方法,繼承這個物件,會要求我們實現一個decode方法

public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if
(in.readableBytes() < HEAD_LENGTH) { //這個HEAD_LENGTH是我們用於表示頭長度的位元組數。 由於上面我們傳的是一個int型別的值,所以這裡HEAD_LENGTH的值為4. return; } in.markReaderIndex(); //我們標記一下當前的readIndex的位置 int dataLength = in.readInt(); // 讀取傳送過來的訊息的長度。ByteBuf 的readInt()方法會讓他的readIndex增加4 if (dataLength < 0
) { // 我們讀到的訊息體長度為0,這是不應該出現的情況,這裡出現這情況,關閉連線。 ctx.close(); } if (in.readableBytes() < dataLength) { //讀到的訊息體長度如果小於我們傳送過來的訊息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; // 嗯,這時候,我們讀到的長度,滿足我們的要求了,把傳送過來的資料,取出來吧~~ in.readBytes(body); // Object o = convertToObject(body); //將byte資料轉化為我們需要的物件。虛擬碼,用什麼序列化,自行選擇 out.add(o); }

下面來一個示例(例項只做了字串的處理,其他自定義物件的處理參考上面)。 服務端(接收端):

public class Server {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                // 新增自定義的解碼器
                socketChannel.pipeline().addLast(new MyCustomMessageDecoder());
                socketChannel.pipeline().addLast(new ServerMessageHandler());
            }
        });
        try {
            ChannelFuture future = bootstrap.bind(9999).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }

    }
}

自定義的訊息解碼器:

public class MyCustomMessageDecoder extends ByteToMessageDecoder {
    // 訊息頭:傳送端寫的是一個int,佔用4位元組。
    private final static int HEAD_LENGTH = 4;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //
        if (in.readableBytes() < HEAD_LENGTH) {
            return;
        }

        // 標記一下當前的readIndex的位置
        in.markReaderIndex();

        // 讀取資料長度
        int dataLength = in.readInt();
        // 我們讀到的訊息體長度為0,這是不應該出現的情況,這裡出現這情況,關閉連線。
        if (dataLength < 0) {
            ctx.close();
        }

        //讀到的訊息體長度如果小於我們傳送過來的訊息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }

        // 將緩衝區的資料讀到位元組陣列
        byte[] body = new byte[dataLength];
        in.readBytes(body);
        //將byte資料轉化為我們需要的物件。虛擬碼,用什麼序列化,自行選擇
        Object msg = convertToObj(body);
        out.add(msg);
    }

    private Object convertToObj(byte[] body) {
        return new String(body,0,body.length);
    }
}

Server端的訊息處理器:

public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
    private int messageCount = 0;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String _msg = (String) msg;
        System.out.println("["+(++messageCount)+"]接收到訊息:" + _msg);

        // 注意:業務異常需要處理,不能不管,否則會呼叫exceptionCaught()

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客戶端程式碼:

public class Client
{
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_SNDBUF,10)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .handler(new LoggingHandler(LogLevel.INFO))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 增加自定義編碼器
                        socketChannel.pipeline().addLast(new MyCustomMessageEncoder());
                        socketChannel.pipeline().addLast(new ClientMessageHandler());
                    }
                });

        try {
            ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }
}

自定義的訊息編碼器:

public class MyCustomMessageEncoder extends MessageToByteEncoder<Object> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        // 要傳送的資料
        // 這裡如果是自定義的型別,msg即為自定義的型別,需要轉為byte[]
        byte[] body = ((ByteBuf)msg).array();

        // 資料長度
        int dataLength = body.length;
        // 緩衝區先寫入資料長度
        out.writeInt(dataLength);
        // 再寫入資料
        out.writeBytes(body);
    }
}

客戶端的訊息處理器:

public class ClientMessageHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String msg = "hello,world";
        byte[] data;
        ByteBuf buf;
        for (int i=0;i<100;i++) {
            data = (msg+i).getBytes();
            buf = Unpooled.copiedBuffer(data);
            ctx.writeAndFlush(buf);
        }
        System.out.println("100條 訊息傳送完畢");
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

執行效果 客戶端:

服務端: