1. 程式人生 > 其它 >第一個netty程式--時間服務

第一個netty程式--時間服務

服務端


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;
import java.util.Date;


public class TimeServer {
    public static void main(String... args) throws InterruptedException {
        // 只包含一個serverchannel,代表伺服器自身的已繫結到某個本地埠的正在監聽的套接字
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 包含所有已建立的用來處理傳入客戶端連線的channel
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup) //可以只使用一個EventLoopGroup
                .channel(NioServerSocketChannel.class)  //指定使用NIO傳輸Channel
                .localAddress(new InetSocketAddress(8080))
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        channel.pipeline().addLast(new TimeChannelHandlerAdapter());
                    }
                });

        // 非同步操作的結果的佔位符
        ChannelFuture f = null;
        try {
            f = b.bind().sync(); // 非同步繫結伺服器,呼叫sync()方法阻塞等待直到繫結完成
            f.channel().closeFuture().sync(); // 獲取channel的closeFuture,並且阻塞當前執行緒直到完成
        } finally {
            // 釋放所有的資源
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }

    public static class TimeChannelHandlerAdapter extends ChannelInboundHandlerAdapter {
        /**
         * 對於每個傳入的訊息都要呼叫
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req);
            System.out.println(body);
            ByteBuf resp = Unpooled.copiedBuffer(new Date(System.currentTimeMillis()).toString().getBytes());
            ctx.write(resp); // 將訊息寫給傳送者,而不沖刷出站訊息
        }

        /**
         * 通知ChannelInboundHandler最後一次對channelRead()的呼叫是當前批量讀取中的額最後一條訊息
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 將未決訊息沖刷到遠端節點,並且關閉該Channel
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }

        /**
         * 讀取操作期間有異常丟擲時會呼叫
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

客戶端


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;


public class TimeClient {
    public static void main(String...args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress("127.0.0.1",8080))
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new TimeClientHandler());
                    }
                });
        try {
            // 非同步連線到遠端節點,阻塞等待直到連線完成
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully().sync();
        }

    }

    @ChannelHandler.Sharable    // 標記該類的例項可以被多個Channel共享
    private static class TimeClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
        /**
         * 當一個新的連線已經被建立時,會被呼叫
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            byte[] req = "query time order".getBytes();
            ByteBuf resp = Unpooled.buffer(req.length);
            resp.writeBytes(req);
            ctx.writeAndFlush(resp);
        }


        /**
         * 每當接收資料時,都會呼叫這個方法,伺服器傳送的訊息可能被分塊接收
         */
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
            // 記錄已接收訊息的轉儲
            System.out.println("client received: " + byteBuf.toString(CharsetUtil.UTF_8));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}