第一個netty程式--時間服務
阿新 • • 發佈:2021-09-24
服務端
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; import java.util.Date; public class TimeServer { public static void main(String... args) throws InterruptedException { // 只包含一個serverchannel,代表伺服器自身的已繫結到某個本地埠的正在監聽的套接字 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 包含所有已建立的用來處理傳入客戶端連線的channel EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) //可以只使用一個EventLoopGroup .channel(NioServerSocketChannel.class) //指定使用NIO傳輸Channel .localAddress(new InetSocketAddress(8080)) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new TimeChannelHandlerAdapter()); } }); // 非同步操作的結果的佔位符 ChannelFuture f = null; try { f = b.bind().sync(); // 非同步繫結伺服器,呼叫sync()方法阻塞等待直到繫結完成 f.channel().closeFuture().sync(); // 獲取channel的closeFuture,並且阻塞當前執行緒直到完成 } finally { // 釋放所有的資源 bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } public static class TimeChannelHandlerAdapter extends ChannelInboundHandlerAdapter { /** * 對於每個傳入的訊息都要呼叫 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req); System.out.println(body); ByteBuf resp = Unpooled.copiedBuffer(new Date(System.currentTimeMillis()).toString().getBytes()); ctx.write(resp); // 將訊息寫給傳送者,而不沖刷出站訊息 } /** * 通知ChannelInboundHandler最後一次對channelRead()的呼叫是當前批量讀取中的額最後一條訊息 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 將未決訊息沖刷到遠端節點,並且關閉該Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } /** * 讀取操作期間有異常丟擲時會呼叫 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } }
客戶端
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; public class TimeClient { public static void main(String...args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("127.0.0.1",8080)) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeClientHandler()); } }); try { // 非同步連線到遠端節點,阻塞等待直到連線完成 ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); }finally { group.shutdownGracefully().sync(); } } @ChannelHandler.Sharable // 標記該類的例項可以被多個Channel共享 private static class TimeClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 當一個新的連線已經被建立時,會被呼叫 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { byte[] req = "query time order".getBytes(); ByteBuf resp = Unpooled.buffer(req.length); resp.writeBytes(req); ctx.writeAndFlush(resp); } /** * 每當接收資料時,都會呼叫這個方法,伺服器傳送的訊息可能被分塊接收 */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 記錄已接收訊息的轉儲 System.out.println("client received: " + byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } }