(NIO-框架)--Netty(三)模擬Echo伺服器案例--DelimiterBasedFrameDecoder(以分隔符做訊息解碼)FixedLengthFrameDecoder(定長)
阿新 • • 發佈:2019-02-15
package com.taikang.echo.echoclient; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class EchoClient { public void connect(int port, String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoClientHandler()); } }); // 發起非同步連線操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO執行緒組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { } } new EchoClient().connect(port, "127.0.0.1"); } }
package com.taikang.echo.echoclient; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class EchoClientHandler extends SimpleChannelInboundHandler<String> { private int counter; static final String ECHO_REQ = "Hi,LiuYan.Welcome to Netty.$_"; public EchoClientHandler() { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("This is " + ++counter + "times receive server : [" + msg + "]"); } @Override public void channelActive(ChannelHandlerContext ctx) { for (int i = 0; i < 10; i++) { //Unpooled.copiedBuffer(ECHO_REQ.getBytes()) //看了下原始碼,確認了,上邊那個Unpooled...這一堆是把byte陣列變成ByteBuf物件,服務端在接受後才能識別啊 ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { } }
package com.taikang.echo.echoserver; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class EchoServer { public void bind(int port) throws Exception{ //配置服務端的NIO執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //$_作為分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //1024代表單條訊息最大長度,當達到最大長度還沒有找到分隔符,就拋異常了,後面的引數是分隔符緩衝物件 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoServerHandler()); } }); //繫結埠,同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服務端監聽埠關閉 f.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length>0){ try{ port = Integer.valueOf(args[0]); }catch(NumberFormatException e){ } } new EchoServer().bind(port); } }
package com.taikang.echo.echoserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
int counter = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
System.out.println("服務端channelRead讀取到訊息");
String body = (String) msg;
System.out.println("This is " + ++counter + "times receive client : [" + body + "]");
body += "$_";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("服務端channelRead0讀取到訊息");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
結果如下圖:
至於定長那個就是把ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
換成ch.pipeline().addLast(new FixedLengthFrameDecoder(20));