1. 程式人生 > >Netty例項 - 多個Netty實戰小例項

Netty例項 - 多個Netty實戰小例項

Netty例項(多個實戰小例項)

瘋狂創客圈 Java 分散式聊天室【 億級流量】實戰系列之18 【 部落格園 總入口】 QQ群:104131248


文章目錄

原始碼工程

原始碼IDEA工程獲取連結Java 聊天室 實戰 原始碼

Netty是基於JDK NIO的網路框架

簡化了NIO程式設計, 不用程式自己維護selector, 將網路通訊和資料處理的部分做了分離

多用於做底層的資料通訊, 心跳檢測(keepalived)

1. 資料通訊

1.1 Hello World

public class Server {

    public static void main(String[] args) throws Exception {
        // 1 建立線兩個事件迴圈組
        // 一個是用於處理伺服器端接收客戶端連線的
        // 一個是進行網路通訊的(網路讀寫的)
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        // 2 建立輔助工具類ServerBootstrap,用於伺服器通道的一系列配置
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup) // 繫結倆個執行緒組
                .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel對應TCP, NioDatagramChannel對應UDP
                .option(ChannelOption.SO_BACKLOG, 1024) // 設定TCP緩衝區
                .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 設定傳送緩衝大小
                .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 這是接收緩衝大小
                .option(ChannelOption.SO_KEEPALIVE, true) // 保持連線
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {  //SocketChannel建立連線後的管道
                        // 3 在這裡配置 通訊資料的處理邏輯, 可以addLast多個...
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });

        // 4 繫結埠, bind返回future(非同步), 加上sync阻塞在獲取連線處
        ChannelFuture cf1 = b.bind(8765).sync();
        //ChannelFuture cf2 = b.bind(8764).sync();   //可以繫結多個埠
        // 5 等待關閉, 加上sync阻塞在關閉請求處
        cf1.channel().closeFuture().sync();
        //cf2.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}

SO_BACKLOG詳解:
伺服器的TCP核心維護兩個佇列A和B
客戶端向服務端請求connect時, 傳送SYN(第一次握手)
服務端收到SYN後, 向客戶端傳送SYN ACK(第二次握手), TCP核心將連線放入佇列A
客戶端收到後向服務端傳送ACK(第三次握手), TCP核心將連線從A->B, accept返回, 連線完成
A/B佇列的長度和即為BACKLOG, 當accept速度跟不上, A/B佇列使得BACKLOG滿了, 客戶端連線就會被TCP核心拒絕
可以調大backlog緩解這一現象, 經驗值~100

public class ServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "utf-8");
            System.out.println("Server :" + body );
            String response = "返回給客戶端的響應:" + body ;
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
            // future完成後觸發監聽器, 此處是寫完即關閉(短連線). 因此需要關閉連線時, 要通過server端關閉. 直接關閉用方法ctx[.channel()].close()
            //.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println("讀完了");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
            throws Exception {
        ctx.close();
    }
}
public class Client {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception { 
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
        //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();  //可以使用多個埠
        //傳送訊息, Buffer型別. write需要flush才傳送, 可用writeFlush代替
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
        Thread.sleep(2000);
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
        //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));
        
        cf1.channel().closeFuture().sync();
        //cf2.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClientHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "utf-8");
            System.out.println("Client :" + body );
        } finally {
            // 記得釋放xxxHandler裡面的方法的msg引數: 寫(write)資料, msg引用將被自動釋放不用手動處理; 但只讀資料時,!必須手動釋放引用數
             ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}

1.2 拆包粘包問題

TCP/IP確保了包的傳送, 包的順序等, 但程式設計中還需要解決拆包粘包問題

-> 接收的一連串包中的資料, 處理的分隔在哪裡? 基本解決方案:

1)特殊字元作為結束分隔符

2)訊息定長. 固定包的長度, 長度不夠用空格補全. 接收方需要trim, 效率不高不推薦

3)自定義協議. 在訊息頭中包含訊息總長度的欄位. 需要安全性時可以考慮.

特殊字元

public class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                // 使用DelimiterBasedFrameDecoder設定結尾分隔符$_
                ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                // 設定字串形式的解碼.  經過StringDecoder, Handler回撥方法中接收的msg的具體型別就是String了(不再是ByteBuffer). 但寫時仍需要傳入ByteBuffer
                sc.pipeline().addLast(new StringDecoder());
                // 通訊資料的處理邏輯
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        //4 繫結連線
        ChannelFuture cf = b.bind(8765).sync();
        
        //等待伺服器監聽埠關閉
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server :" + msg);
        String response = "伺服器響應: " + msg + "$_";
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
        ctx.close();
    }
}
public class Client {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                sc.pipeline().addLast(new StringDecoder()); 
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
        
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
        
    }
}
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String) msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

定長

public class Server {

    public static void main(String[] args) throws Exception{
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //設定定長字串接收, 定長為5, 積累到5個位元組才會把資料發出去
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                //設定字串形式的解碼
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server :" + msg);
        String response =  request ;
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
    }
}
public class Client {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); 
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes()));
        cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes()));
        
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String response = (String) msg;
        System.out.println("Client: " + response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    }
}

1.3 編解碼

即物件序列化技術, 目的是為了實現物件的網路傳輸和本地持久化
如果使用java的序列化, 碼流較大. 因此多用Marshalling, Kyro(基於Protobuf)

下面的例子, 使用編解碼傳輸javabean(Marshalling的javabean需要實現serializable), 並將message進行gzip壓縮

自定義編解碼器

public final class MarshallingCodeCFactory {
    // 解碼
    public static MarshallingDecoder buildMarshallingDecoder() {
        //建立工廠物件, 引數serial指建立的是java物件序列化的工廠物件
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //建立配置物件,版本號為5 
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根據工廠物件和配置物件建立解碼provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //建立解碼器物件. 第一個引數是provider, 第二個引數是單個訊息序列化後的最大長度, 超過後拒絕處理
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }

    // 編碼
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //建立編碼器物件. 用於將實現Serializable介面的JavaBean序列化為二進位制陣列
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

javabean

public class Request implements Serializable {  // 標記Serializable介面

    private String id ;
    private String name ;
    private String requestMessage ;
    private byte[] attachment;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getRequestMessage() {
        return requestMessage;
    }
    public void setRequestMessage(String requestMessage) {
        this.requestMessage = requestMessage;
    }
    public byte[] getAttachment() {
        return attachment;
    }
    public void setAttachment(byte[] attachment) {
        this.attachment = attachment;
    }
}
public class Response implements Serializable { // 標記Serializable介面
    
    private String id;
    private String name;
    private String responseMessage;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getResponseMessage() {
        return responseMessage;
    }
    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }
}

GZip壓縮的Util

public class GzipUtils {

    public static byte[] gzip(byte[] data) throws Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(bos);
        gzip.write(data);
        gzip.finish();
        gzip.close();
        byte[] ret = bos.toByteArray();
        bos.close();
        return ret;
    }
    
    public static byte[] ungzip(byte[] data) throws Exception{
        ByteArrayInputStream bis = new ByteArrayInputStream(data);
        GZIPInputStream gzip = new GZIPInputStream(bis);
        byte[] buf = new byte[1024];
        int num = -1;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while((num = gzip.read(buf)) != -1 ){
            bos.write(buf, 0, num);
        }
        gzip.close();
        bis.close();
        byte[] ret = bos.toByteArray();
        bos.close();
        return ret;
    }
}

服務端與客戶端

public class Server {
    public static void main(String[] args) throws Exception{
        
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         //設定日誌
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel sc) throws Exception {
                // 新增編解碼. 傳送自定義的型別, 而Handler的方法接收的msg引數的實際型別也是相應的自定義類了
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request req = (Request)msg;
        System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
        byte[] attachment = GzipUtils.ungzip(req.getAttachment());
        
        String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";
        FileOutputStream fos = new FileOutputStream(path);
        fos.write(attachment);
        fos.close();
        
        Response resp = new Response();
        resp.setId(req.getId());
        resp.setName("resp" + req.getId());
        resp.setResponseMessage("響應內容" + req.getId());
        ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
public class Client {
    public static void main(String[] args) throws Exception{
        
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        for(int i = 0; i < 5; i++){
            Request req = new Request();
            req.setId("" + i);
            req.setName("req" + i);
            req.setRequestMessage("資料資訊" + i);    
            String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";
            File file = new File(path);
            FileInputStream in = new FileInputStream(file);  
            byte[] data = new byte[in.available()];  
            in.read(data);  
            in.close(); 
            req.setAttachment(GzipUtils.gzip(data)); //壓縮
            cf.channel().writeAndFlush(req);
        }

        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClientHandler extends ChannelHandlerAdapter{
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Response resp = (Response) msg;
            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

1.4 長連線/短連線

1.長連線, 一致保持著連線不主動中斷, 實時性強
2.短連線. 資料放在快取, 一次性批量提交所有資料, 服務端接收後即關閉連線
以上兩種根據是否給ChannelHandlerContext新增ChannelFutureListener.ClOSE監聽器實現

3.長連線, 一定時間不活躍則關閉連線. 給SocketChannel新增ReadTimeoutHandler實現. 例項如下:

public final class MarshallingCodeCFactory {
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}
public class Request implements Serializable{
    private String id ;
    private String name ;
    private String requestMessage ;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getRequestMessage() {
        return requestMessage;
    }
    public void setRequestMessage(String requestMessage) {
        this.requestMessage = requestMessage;
    }
}
public class Response implements Serializable{
    private String id;
    private String name;
    private String responseMessage;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getResponseMessage() {
        return responseMessage;
    }
    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }
}
public class Server {

    public static void main(String[] args) throws Exception{
        
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         //設定日誌
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ReadTimeoutHandler(5));  // 時限, 讀客戶端超時沒資料則斷開
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;
        System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
        Response response = new Response();
        response.setId(request.getId());
        response.setName("response" + request.getId());
        response.setResponseMessage("響應內容" + request.getId());
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
public class Client {
    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf ;

    // 單例
    private static class SingletonHolder { 
        static final Client instance = new Client();
    }
    public static Client getInstance(){
        return SingletonHolder.instance;
    }
    
    private Client(){
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        //超時handler(當伺服器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉通道)
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));   // 時限5s, 讀服務端超時沒資料則斷開
                        sc.pipeline().addLast(new ClientHandler());
                    }
            });
    }
    
    public void connect(){
        try {
            this.cf = b.connect("127.0.0.1", 8765).sync();
            System.out.println("遠端伺服器已經連線, 可以進行資料交換");                
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public ChannelFuture getChannelFuture(){
        if(this.cf == null) {   //初次連線
            this.connect();
        }
        if(!this.cf.channel().isActive()){  //重連
            this.connect();
        }
        return this.cf;
    }
    
    public static void main(String[] args) throws Exception{
        final Client c = Client.getInstance();
        
        ChannelFuture cf = c.getChannelFuture();
        for(int i = 1; i <= 3; i++ ){
            Request request = new Request();
            request.setId("" + i);
            request.setName("request" + i);
            request.setRequestMessage("資料資訊" + i);
            cf.channel().writeAndFlush(request);
            TimeUnit.SECONDS.sleep(4);  //間隔4s傳送一次資料
        }

        cf.channel().closeFuture().sync(); //阻塞至超時關閉
        
        // 這裡用子執行緒重連併發送資料一次
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("進入子執行緒重連一次");
                    ChannelFuture cf = c.getChannelFuture();
                    assert true == cf.channel().isActive(); //斷言
                    //再次傳送資料
                    Request request = new Request();
                    request.setId("" + 4);
                    request.setName("request" + 4);
                    request.setRequestMessage("資料資訊" + 4);
                    cf.channel().writeAndFlush(request);                    
                    cf.channel().closeFuture().sync();
                    System.out.println("子執行緒完成");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        System.out.println("斷開連線,主執行緒結束..");
    }
    
}
public class ClientHandler extends ChannelHandlerAdapter{
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Response resp = (Response) msg;
            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

1.5 使用UDP (較少使用)

public class Server {
    public void run(int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioDatagramChannel.class)  // UDP: NioDatagramChannel
                .option(ChannelOption.SO_BROADCAST, true) // 廣播
                .handler(new ServerHandler());
            b.bind(port).sync().channel().closeFuture().await();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Server().run(8765);
    }
}
public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    // 諺語列表
    private static final String[] DICTIONARY = { 
        "只要功夫深,鐵棒磨成針。",
        "舊時王謝堂前燕,飛入尋常百姓家。", 
        "洛陽親友如相問,一片冰心在玉壺。",
        "一寸光陰一寸金,寸金難買寸光陰。",
        "老驥伏櫪,志在千里。烈士暮年,壯心不已!"
    };

    private String nextQuote() {
        int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
        return DICTIONARY[quoteId];
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        String req = packet.content().toString(CharsetUtil.UTF_8);
        System.out.println(req);
        if ("諺語字典查詢?".equals(req)) {
            ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("諺語查詢結果: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
        ctx.close();
        cause.printStackTrace();
    }
}
public class Client {

    public void run(int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ClientHandler());
            Channel ch = b.bind(0).sync().channel();
            // 向網段內的所有機器廣播UDP訊息
            ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("諺語字典查詢?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync();
            if (!ch.closeFuture().await(15000)) {
                System.out.println("查詢超時!");
            }
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Client().run(8765);
    }
}
public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
        String response = msg.content().toString(CharsetUtil.UTF_8);
        if (response.startsWith("諺語查詢結果: ")) {
            System.out.println(response);
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2. 心跳檢測

叢集中主伺服器需要知道從伺服器的狀態
因此client每隔5~10秒給server傳送心跳包

可通過netty與定時任務來實現

public final class MarshallingCodeCFactory {
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}
public class RequestInfo implements Serializable {
    private String ip ;
    private HashMap<String, Object> cpuPercMap ;
    private HashMap<String, Object> memoryMap;
    
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public HashMap<String, Object> getCpuPercMap() {
        return cpuPercMap;
    }
    public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
        this.cpuPercMap = cpuPercMap;
    }
    public HashMap<String, Object> getMemoryMap() {
        return memoryMap;
    }
    public void setMemoryMap(HashMap<String, Object> memoryMap) {
        this.memoryMap = memoryMap;
    }
}
public class Server {

    public static void main(String[] args) throws Exception{
        
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         //設定日誌
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ServerHeartBeatHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
    
    private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
    private static final String SUCCESS_KEY = "auth_success_key";
    
    static {
        AUTH_IP_MAP.put("127.0.0.1", "1234");
    }
    
    private boolean auth(ChannelHandlerContext ctx, Object msg){
        String [] ret = ((String) msg).split(",");
        String auth = AUTH_IP_MAP.get(ret[0]);
        if(auth != null && auth.equals(ret[1])){
            // 認證成功, 返回確認資訊
            ctx.writeAndFlush(SUCCESS_KEY);
            return true;
        } else {
            ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
            return false;
        }
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof String){
            auth(ctx, msg);
        } else if (msg instanceof RequestInfo) {
            RequestInfo info = (RequestInfo) msg;
            System.out.println("--------------------------------------------");
            System.out.println("當前主機ip為: " + info.getIp());
            System.out.println("當前主機cpu情況: ");
            HashMap<String, Object> cpu = info.getCpuPercMap();
            System.out.println("總使用率: " + cpu.get("combined"));
            System.out.println("使用者使用率: " + cpu.get("user"));
            System.out.println("系統使用率: " + cpu.get("sys"));
            System.out.println("等待率: " + cpu.get("wait"));
            System.out.println("空閒率: " + cpu.get("idle"));
            
            System.out.println("當前主機memory情況: ");
            HashMap<String, Object> memory = info.getMemoryMap();
            System.out.println("記憶體總量: " + memory.get("total"));
            System.out.println("當前記憶體使用量: " + memory.get("used"));
            System.out.println("當前記憶體剩餘量: " + memory.get("free"));
            System.out.println("--------------------------------------------");
            
            ctx.writeAndFlush("info received!");
        } else {
            ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
        }
    }
}
public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ClienHeartBeatHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClienHeartBeatHandler extends ChannelHandlerAdapter {

    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private ScheduledFuture<?> heartBeat;  //定時任務
    //主動向伺服器傳送認證資訊
    private InetAddress addr ;
    private static final String SUCCESS_KEY = "auth_success_key";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        addr = InetAddress.getLocalHost();
        //String ip = addr.getHostAddress();
        String ip = "127.0.0.1";
        String key = "1234";
        //證書
        String auth = ip + "," + key;
        // 傳送認證
        ctx.writeAndFlush(auth);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if(msg instanceof String){
                String ret = (String) msg;
                if(SUCCESS_KEY.equals(ret)){
                    // 收到認證 確認資訊,設定每隔5秒傳送心跳訊息
                    this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS);
                    System.out.println(msg);                
                } else {  
                    // 收到心跳包 確認資訊
                    System.out.println(msg);
                }
            }
        } finally {
            // 只讀, 需要手動釋放引用計數
            ReferenceCountUtil.release(msg);
        }
    }

    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;
        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        @Override
        public void run() {
            try {
                RequestInfo info = new RequestInfo();
                //ip
                info.setIp(addr.getHostAddress());
                Sigar sigar = new Sigar();
                //cpu prec
                CpuPerc cpuPerc = sigar.getCpuPerc();
                HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
                cpuPercMap.put("combined", cpuPerc.getCombined());
                cpuPercMap.put("user", cpuPerc.getUser());
                cpuPercMap.put("sys", cpuPerc.getSys());
                cpuPercMap.put("wait", cpuPerc.getWait());
                cpuPercMap.put("idle", cpuPerc.getIdle());
                // memory
                Mem mem = sigar.getMem();
                HashMap<String, Object> memoryMap = new HashMap<String, Object>();
                memoryMap.put("total", mem.getTotal() / 1024L);
                memoryMap.put("used", mem.getUsed() / 1024L);
                memoryMap.put("free", mem.getFree() / 1024L);
                info.setCpuPercMap(cpuPercMap);
                info.setMemoryMap(memoryMap);
                
                ctx.writeAndFlush(info);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            // 取消定時傳送心跳包的任務
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }
    }
}

3. HTTP

3.1 Hello World

public final class HttpHelloWorldServer {
  
      static final boolean SSL = System.getProperty("ssl") != null;
      static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
  
      public static void main(String[] args) throws Exception {
          final SslContext sslCtx;
          if (SSL) {
              SelfSignedCertificate ssc = new SelfSignedCertificate();
              sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
          } else {
              sslCtx = null;
          }
  
          EventLoopGroup bossGroup = new NioEventLoopGroup();
          EventLoopGroup workerGroup = new NioEventLoopGroup();
          try {
              ServerBootstrap b = new ServerBootstrap();
              b.option(ChannelOption.SO_BACKLOG, 1024);
              b.group(bossGroup, workerGroup)
               .channel(NioServerSocketChannel.class)
               .handler(new LoggingHandler(LogLevel.INFO))
               .childHandler(new HttpHelloWorldServerInitializer(sslCtx));
  
              Channel ch = b.bind(PORT).sync().channel();
  
              System.err.println("Open your web browser and navigate to " +
                      (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
  
              ch.closeFuture().sync();
          } finally {
              bossGroup.shutdownGracefully();
              workerGroup.shutdownGracefully();
          }
      }
}
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    public HttpHelloWorldServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }
        p.addLast(new HttpServerCodec());   // !使用http通訊, HttpRequest和HttpResponse
        p.addLast(new HttpHelloWorldServerHandler());
    }
}
public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter {
    private static final byte[] CONTENT = "HELLO WORLD".getBytes();

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;

            if (HttpHeaderUtil.is100ContinueExpected(req)) {
                ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
            }
            boolean keepAlive = HttpHeaderUtil.isKeepAlive(req);
            // 構造響應
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
            response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8");
            response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());

            if (!keepAlive) {
                // Request短連線, 寫完後直接關閉
                ctx.write(response).addListener(ChannelFutureListener.CLOSE);
            } else {
                // 長連線, response也設定為KEEP_ALIVE
                response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                ctx.write(response);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

3.2 HTTP下載檔案

public class HttpDownloadServer {

    private static final String DEFAULT_URL = "/sources/";

    public void run(final int port, final String url) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // addLast的第一項為key, 自定義的
                    // request解碼器
                    ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
                    // response的編碼器
                    ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
                    // chunked, 傳輸檔案時分多個response分解地傳輸檔案
                    ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                    // ObjectAggregator, 將多個response合併為一個FullHttpResponse
                    ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                    // 自定義業務邏輯handler
                    ch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url));
                }
                });
            ChannelFuture future = b.bind("127.0.0.1", port).sync();
            System.out.println("HTTP檔案目錄伺服器啟動,網址是 : " + "http://localhost:"  + port + url);
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8765;
        new HttpDownloadServer().run(port, DEFAULT_URL);
    }
}
// 注意這裡繼承了SimpleChannelInboundHandler<T>, 含泛型, 即指定了傳入引數msg的型別
public class HttpDownoadServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    
    private final String url;

    public HttpDownoadServerHandler(String url) {
        this.url = url;
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        //是否能理解(解碼)請求
        if (!request.decoderResult().isSuccess()) {
            // 400
            sendError(ctx, BAD_REQUEST);
            return;
        }
        //對請求的方法進行判斷:如果不是GET方法則返回異常
        if (request.method() != GET) {
            // 405
            sendError(ctx, METHOD_NOT_ALLOWED);
            return;
        }
        //獲取請求uri路徑
        final String uri = request.uri();
        //對url進行分析,返回本地路徑
        final String path = parseURI(uri);
        //如果 路徑構造不合法,則path為null
        if (path == null) {
            //403
            sendError(ctx, FORBIDDEN);
            return;
        }
        
        // 建立file物件
        File file = new File(path);
        // 檔案隱藏或不存在
        if (file.isHidden() || !file.exists()) {
            // 404 
            sendError(ctx, NOT_FOUND);
            return;
        }
        // 是資料夾
        if (file.isDirectory()) {
            if (uri.endsWith("/")) {
                //如果以正常"/"結束 說明是訪問的一個檔案目錄:則進行展示檔案列表
                sendListing(ctx, file);
            } else {
                //如果非"/"結束 則重定向,讓客戶端補全"/"並再次請求
                sendRedirect(ctx, uri + '/');
            }
            return;
        }
        // 如果所建立的file物件不是檔案型別
        if (!file.isFile()) {
            // 403
            sendError(ctx, FORBIDDEN);
            return;
        }
        
        //隨機檔案讀寫物件
        RandomAccessFile randomAccessFile = null;
        try {
            randomAccessFile = new RandomAccessFile(file, "r");// 以只讀的方式開啟檔案
        } catch (FileNotFoundException fnfe) {
            // 404
            sendError(ctx, NOT_FOUND);
            return;
        }
        
        //獲取檔案長度
        long fileLength = randomAccessFile.length();
        //建立響應物件
        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
        //設定響應資訊
        HttpHeaderUtil.setContentLength(response, fileLength);
        //設定Content-Type
        setContentTypeHeader(response, file);
        //設定為KeepAlive
        if (HttpHeaderUtil.isKeepAlive(request)) {
            response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        //輸出response header, HttpObjectAggregator能將其與下面輸出整合合併
        ctx.write(response);
        
        //寫出ChunkedFile. 建立ChunkedFile需要使用RandomAccessFile並設定分段. 這裡每次傳輸8192個位元組
        ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
        //新增傳輸監聽
        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
                if (total < 0) { 
                    System.err.println("Transfer progress: " + progress);
                } else {
                    System.err.println("Transfer progress: " + progress + " / " + total);
                }
            }
            @Override
            public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                System.out.println("Transfer complete.");
            }
        });
        
        //使用Chunked, 完成時需要傳送標記結束的空訊息體!
        ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        //如果當前連線請求非Keep-Alive, 最後一包訊息傳送完後, 伺服器主動關閉連線
        if (!HttpHeaderUtil.isKeepAlive(request)) {
            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (ctx.channel().isActive()) {
           // 500
            sendError(ctx, INTERNAL_SERVER_ERROR);
            ctx.close();
        }
    }

    //判斷非法URI的正則
    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
    private String parseURI(String uri) {
        try {
            //使用UTF-8字符集
            uri = URLDecoder.decode(uri, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            try {
                //嘗試ISO-8859-1
                uri = URLDecoder.decode(uri, "ISO-8859-1");
            } catch (UnsupportedEncodingException e1) {
                //丟擲預想外異常資訊
                throw new Error();
            }
        }
        // 對uri進行細粒度判斷:4步驗證操作
        // step 1 基礎驗證
        if (!uri.startsWith(url)) {
            return null;
        }
        // step 2 基礎驗證
        if (!uri.startsWith("/")) {
            return null;
        }
        // step 3 將檔案分隔符替換為本地作業系統的檔案路徑分隔符
        uri = uri.replace('/', File.separatorChar);
        // step 4 驗證路徑合法性
        if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || 
                uri.startsWith(".") || uri.endsWith(".") || 
                INSECURE_URI.matcher(uri).matches()) {
            return null;
        }
        //利用當前工程所在目錄 + URI相對路徑 構造絕對路徑 
        return System.getProperty("user.dir") + File.separator + uri;
    }
    
    //用正則表示式過濾檔名
    private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
    //檔案列表, 拼html檔案
    private static void sendListing(ChannelHandlerContext ctx, File dir) {
        // 設定響應物件
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
        // 響應頭
        response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
        // 構造文字內容
        StringBuilder ret = new StringBuilder();
        String dirPath = dir.getPath();
        ret.append("<!DOCTYPE html>\r\n");
        ret.append("<html><head><title>");
        ret.append(dirPath);
        ret.append(" 目錄:");
        ret.append("</title></head><body>\r\n");
        ret.append("<h3>");
        ret.append(dirPath).append(" 目錄:");
        ret.append("</h3>\r\n");
        ret.append("<ul>");
        ret.append("<li>連結:<a href=\"../\">..</a></li>\r\n");
        
        // 遍歷檔案, 生成超連結
        for (File f : dir.listFiles()) {
            //step 1: 跳過隱藏檔案和不可讀檔案 
            if (f.isHidden() || !f.canRead()) {
                continue;
            }
            String name = f.getName();
            //step 2: 跳過正則過濾的檔名
            if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
                continue;
            }
            ret.append("<li>連結:<a href=\"");
            ret.append(name);
            ret.append("\">");
            ret.append(name);
            ret.append("</a></li>\r\n");
        }
        ret.append("</ul></body></html>\r\n");
        //構造ByteBuf,寫入緩衝區
        ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8);
        //進行寫出操作
        response.content().writeBytes(buffer);
        //重置ByteBuf
        buffer.release();
        //傳送完成並主動關閉連線
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    //重定向操作
    private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
        response.headers().set(LOCATION, newUri);
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    //錯誤資訊
    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private static void setContentTypeHeader(HttpResponse response, File file) {
        //使用mime物件獲取檔案對應的Content-Type
        MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
        response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
    }
}

3.3 HTTP上傳檔案 (較少使用)

實際應用中檔案上傳服務端有成熟的框架fastDFS(小檔案)和HDFS(大檔案)

如要實現斷點續傳, 需要記錄上傳進度. 參考HTTP頭的Range和Content-Range

public final class HttpUploadServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
        } else {
            sslCtx = null;
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup);
            b.channel(NioServerSocketChannel.class);
            b.handler(new LoggingHandler(LogLevel.INFO));
            b.childHandler(new HttpUploadServerInitializer(sslCtx));

            Channel ch = b.bind(PORT