使用netty傳送報文的坑
阿新 • • 發佈:2019-02-09
最近跟銀行除錯一個介面的時候,行方說明是TCP/IP socket同步短連結的方式,開始採用socket和niosocket都不行,最後採用了了netty形式傳送,程式碼很簡單就是建立一個ChannelHandlerAdapter.主要程式碼如下,
測試類:
public static void main(String[] args) {
SockerClient client = new SockerClient();
Channel connect = client.connect(“xxx.xxx.xxx.xx”, xxxx);
client.sendMessage(msg)
}
class SockerClient {
private ClientHandler clientHandler = new ClientHandler();
public Channel connect(String host, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workerGroup );
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline() .addLast(clientHandler);
}
});
return b.connect(host, port).sync().channel();
}
public String sendMessage(String msg) throws Exception {
ChannelPromise promise = clientHandler.sendMessage(msg);
promise.await();
return clientHandler.getData();
}
}
public static class ClientHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
private ChannelPromise promise;
private String data;
//初始化ctx,用來後面傳送報文
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("---------------------執行active的執行緒"+Thread.currentThread());
super.channelActive(ctx);
this.ctx = ctx;
}
//傳送報文
public ChannelPromise sendMessage(String message) throws Exception {
System.out.println("---------------------執行sendMessage的執行緒"+Thread.currentThread());
if (ctx == null) {
throw new IllegalStateException();
}
ByteBuf encoded = ctx.alloc().buffer(4 * message.length());
encoded.writeBytes(message.getBytes("GBK"));
promise = ctx.writeAndFlush(encoded).channel().newPromise();
return promise;
}
public String getData() {
return data;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
data = new String(result1, "GBK");
promise.setSuccess();
result.release();
}
}
本來是想單獨寫個sendMessage方法去傳送message的,後來發現sendMessage這個方法的裡面的ctx時而為空,有時又正常,因為channelActive這個繼承的方法會在建立連結時執行,就初始化ctx,感覺沒道理會為空。想了半天,於是把這個兩個方法執行的執行緒打印出來,才發現執行activeThread方法的執行緒是nioEventLoopGroup,而執行sendMessageThread方法的執行緒是主執行緒main.
---------------------執行active的執行緒nioEventLoopGroup-2-1
---------------------執行sendMessage的執行緒main
雖然在main方法中是client.connect先執行,但是建立連線確是另一個執行緒完成的,不在是main執行緒,而sendMessage是main執行緒執行,會出現sendMessage和channelActive並不是按照固定順序執行,說白了就是誰搶的快誰執行。。。所以要想在獲取ctx後再發送message,直接將傳送報文這一步寫在channelActive裡面。
稍微改造下就這樣。
public class NettyClient {
public static void main(String aa[]){
String msg = "xxx";
System.out.println(NettyClient.sendMessage("xxx.xxx.xx.xx", 0000, msg));
}
public static String sendMessage(String host, int port, String msg) {
final ClientHandler clientHandler = new ClientHandler(msg);
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer() {
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(clientHandler);
}
});
// 等待客戶端連結成功
ChannelFuture future = bootstrap.connect(host, port).sync();
System.out.println("客戶端連結成功!");
// 等待客戶端連結關閉
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("請求異常:",e);
return null;
} finally {
group.shutdownGracefully();
}
return clientHandler.getData();
}
}
class ClientHandler extends ChannelInboundHandlerAdapter {
private String data;
private String message;
public ClientHandler(String message) {
this.message = message;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf encoded = ctx.alloc().buffer(4 * this.message.length());
encoded.writeBytes(this.message.getBytes("GBK"));
ctx.writeAndFlush(encoded);
encoded.release();
}
public String getData() {
return data;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
data = new String(result1, "GBK");
result.release();
}
}