1. 程式人生 > >使用netty傳送報文的坑

使用netty傳送報文的坑

最近跟銀行除錯一個介面的時候,行方說明是TCP/IP socket同步短連結的方式,開始採用socket和niosocket都不行,最後採用了了netty形式傳送,程式碼很簡單就是建立一個ChannelHandlerAdapter.主要程式碼如下,

測試類:
public static void main(String[] args) {
SockerClient client = new SockerClient();
Channel connect = client.connect(“xxx.xxx.xxx.xx”, xxxx);
client.sendMessage(msg)
}

class SockerClient {
        private ClientHandler clientHandler = new ClientHandler();
        public Channel  connect(String host, int port) throws Exception {
            EventLoopGroup workerGroup = new NioEventLoopGroup();

            Bootstrap b = new Bootstrap();
            b.group(workerGroup
)
; b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline()
.addLast(clientHandler); } }); return b.connect(host, port).sync().channel(); } public String sendMessage(String msg) throws Exception { ChannelPromise promise = clientHandler.sendMessage(msg); promise.await(); return clientHandler.getData(); } }
public static class ClientHandler extends ChannelInboundHandlerAdapter {
        private ChannelHandlerContext ctx;
        private ChannelPromise promise;
        private String data;

        //初始化ctx,用來後面傳送報文
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            System.out.println("---------------------執行active的執行緒"+Thread.currentThread());
            super.channelActive(ctx);

            this.ctx = ctx;
        }

        //傳送報文
        public ChannelPromise sendMessage(String message) throws Exception {
            System.out.println("---------------------執行sendMessage的執行緒"+Thread.currentThread());
            if (ctx == null) {
                throw new IllegalStateException();
            }
            ByteBuf encoded = ctx.alloc().buffer(4 * message.length());
            encoded.writeBytes(message.getBytes("GBK"));
            promise = ctx.writeAndFlush(encoded).channel().newPromise();
            return promise;
        }

        public String getData() {
            return data;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf result = (ByteBuf) msg;
            byte[] result1 = new byte[result.readableBytes()];
            result.readBytes(result1);
            data = new String(result1, "GBK");
            promise.setSuccess();
            result.release();
        }
    }

本來是想單獨寫個sendMessage方法去傳送message的,後來發現sendMessage這個方法的裡面的ctx時而為空,有時又正常,因為channelActive這個繼承的方法會在建立連結時執行,就初始化ctx,感覺沒道理會為空。想了半天,於是把這個兩個方法執行的執行緒打印出來,才發現執行activeThread方法的執行緒是nioEventLoopGroup,而執行sendMessageThread方法的執行緒是主執行緒main.

---------------------執行active的執行緒nioEventLoopGroup-2-1
---------------------執行sendMessage的執行緒main

雖然在main方法中是client.connect先執行,但是建立連線確是另一個執行緒完成的,不在是main執行緒,而sendMessage是main執行緒執行,會出現sendMessage和channelActive並不是按照固定順序執行,說白了就是誰搶的快誰執行。。。所以要想在獲取ctx後再發送message,直接將傳送報文這一步寫在channelActive裡面。

稍微改造下就這樣。

public class NettyClient {
    public static void main(String aa[]){
        String msg = "xxx";
        System.out.println(NettyClient.sendMessage("xxx.xxx.xx.xx", 0000, msg));
    }

    public static String sendMessage(String host, int port, String msg) {
        final ClientHandler clientHandler = new ClientHandler(msg);
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline().addLast(clientHandler);
                        }
                    });
// 等待客戶端連結成功
            ChannelFuture future = bootstrap.connect(host, port).sync();
            System.out.println("客戶端連結成功!");
// 等待客戶端連結關閉
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("請求異常:",e);
            return null;
        } finally {
            group.shutdownGracefully();
        }
        return clientHandler.getData();
    }
}

class ClientHandler extends ChannelInboundHandlerAdapter {
    private String data;
    private String message;

    public ClientHandler(String message) {
        this.message = message;
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf encoded = ctx.alloc().buffer(4 * this.message.length());
        encoded.writeBytes(this.message.getBytes("GBK"));
        ctx.writeAndFlush(encoded);
        encoded.release();
    }
    public String getData() {
        return data;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        result.readBytes(result1);
        data = new String(result1, "GBK");
        result.release();
    }
}