JAVA 網路程式設計(6) Netty TCP 示例
阿新 • • 發佈:2018-12-30
maven使用的netty版本如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.4.Final</version>
</dependency>
示例程式碼:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import util.LogCore; public class TcpServer { private static final String IP = "127.0.0.1"; private static final int PORT = 9999; /** 用於分配處理業務執行緒的執行緒組個數 */ protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors() * 2; // 預設 /** 業務出現執行緒大小 */ protected static final int BIZTHREADSIZE = 4; /* * NioEventLoopGroup實際上就是個執行緒池, * NioEventLoopGroup在後臺啟動了n個NioEventLoop來處理Channel事件, * 每一個NioEventLoop負責處理m個Channel, * NioEventLoopGroup從NioEventLoop數組裡挨個取出NioEventLoop來處理Channel */ private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); protected static void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler()); } }); b.bind(IP, PORT).sync(); LogCore.BASE.info("TCP伺服器已啟動"); } protected static void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { LogCore.BASE.info("啟動TCP伺服器..."); TcpServer.run(); // TcpServer.shutdown(); } }
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import util.LogCore; public class TcpServerHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { LogCore.BASE.info("SERVER接收到訊息:" + msg); ctx.channel().writeAndFlush("server accepted msg:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LogCore.BASE.warn("exceptionCaught!", cause); ctx.close(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import util.LogCore; public class TcpClient { public static String HOST = "127.0.0.1"; public static int PORT = 9999; public static Bootstrap bootstrap = getBootstrap(); public static Channel channel = getChannel(HOST, PORT); /** * 初始化Bootstrap */ public static final Bootstrap getBootstrap() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast("handler", new TcpClientHandler()); } }); b.option(ChannelOption.SO_KEEPALIVE, true); return b; } public static final Channel getChannel(String host, int port) { Channel channel = null; try { channel = bootstrap.connect(host, port).sync().channel(); } catch (Exception e) { LogCore.BASE.error("連線Server(IP{},PORT{})失敗", host, port, e); return null; } return channel; } public static void sendMsg(String msg) throws Exception { if (channel != null) { channel.writeAndFlush(msg).sync(); } else { LogCore.BASE.warn("訊息傳送失敗,連線尚未建立!"); } } public static void main(String[] args) throws Exception { try { long t0 = System.nanoTime(); for (int i = 0; i < 100; i++) { TcpClient.sendMsg(i + "你好1"); } long t1 = System.nanoTime(); LogCore.BASE.info("time used:{}", t1 - t0); } catch (Exception e) { LogCore.BASE.error("main err:", e); } } }
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import util.LogCore;
public class TcpClientHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
LogCore.BASE.info("client接收到伺服器返回的訊息:" + msg);
}
}