1. 程式人生 > >Netty基礎入門學習

Netty基礎入門學習

【1】Netty是什麼

Netty官網:https://netty.io/
4.x使用者指南:https://netty.io/wiki/user-guide-for-4.x.html
GitHub地址:https://github.com/netty/netty

Netty官網推薦使用4.x版本。這裡使用的是依賴如下:

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.31.Final</version>
</dependency>

① 有這樣幾個描述:

  • Netty是一個非同步事件驅動的網路應用程式框架, 用於快速開發可維護的高效能協議伺服器和客戶端。它極大地簡化了TCP和UDP套接字伺服器等網路程式設計。
  • Netty是基於Java NIO的網路應用框架,client-server框架。
  • Netty是一個高效能、非同步事件驅動的NIO框架,它提供了對TCP、UDP和檔案傳輸的支援,作為一個非同步NIO框架,Netty的所有IO操作都是非同步非阻塞的,通過Future-Listener機制,使用者可以方便的主動獲取或者通過通知機制獲得IO操作結果。
  • 作為當前最流行的NIO框架,Netty在網際網路領域、大資料分散式計算領域、遊戲行業、通訊行業等獲得了廣泛的應用。

② NIO通訊模型

如下圖所示:
在這裡插入圖片描述

Selector 一般稱 為選擇器 ,也可以翻譯為 多路複用器。四種狀態:Connect(連線就緒)、Accept(接受就緒)、Read(讀就緒)、Write(寫就緒)。


③ Netty基本原理

如下圖所示:

在這裡插入圖片描述


【2】丟棄伺服器discard

例項如下:

package com.netty.hanler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * Created by Janus on 2018/11/19.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {// (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        try {
            ((ByteBuf) msg).release(); // (3)
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }

}

① DiscardServerHandler extends ChannelInboundHandlerAdapter,這是一個實現ChannelInboundHandler。ChannelInboundHandler提供可以覆蓋的各種事件處理程式方法。目前,只需自己擴充套件ChannelInboundHandlerAdapter而不是實現處理程式介面。

ChannelInboundHandlerAdapter繼承圖如下:
在這裡插入圖片描述

② 我們在channelRead()這裡覆蓋事件處理程式方法。每當從客戶端接收到新資料時,都會使用收到的訊息呼叫此方法。在此示例中,接收訊息的型別是ByteBuf。

③ 要實現該DISCARD協議,處理程式必須忽略收到的訊息。ByteBuf是一個引用計數物件,必須通過該release()方法顯式釋放。請記住,處理程式有責任釋放傳遞給處理程式的任何引用計數物件。通常,channelRead()handler方法實現如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

④ 該exceptionCaught()事件處理方法被Netty在丟擲異常時呼叫,異常可能由於I / O錯誤或處理器在處理事件時丟擲。在大多數情況下,應記錄捕獲的異常並在此處關閉其關聯的通道,儘管此方法的實現可能會有所不同,具體取決於您要處理特殊情況的操作。例如,您可能希望在關閉連線之前傳送帶有錯誤程式碼的響應訊息。


DiscardServer 類和main方法如下:

package com.netty.server;

import com.netty.hanler.DiscardServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by Janus on 2018/11/19.
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}

① NioEventLoopGroup是一個處理I / O操作的多執行緒事件迴圈。Netty EventLoopGroup為不同型別的傳輸提供各種實現。我們在此示例中實現了伺服器端應用程式,因此NioEventLoopGroup將使用兩個。第一個,通常稱為“boss”,接受傳入連線。第二個,通常稱為“worker”,一旦"Boss"接受連線並將接受的連線註冊到"worker",就處理被接受連線。使用了多少個執行緒以及它們如何對映到建立的Channels取決於EventLoopGroup實現,甚至可以通過建構函式進行配置。

② ServerBootstrap是一個設定伺服器的幫助程式類。可以直接使用一個Channel設定伺服器。但請注意,這是一個繁瑣的過程,在大多數情況下您不需要這樣做。

③ 在這裡,我們指定使用NioServerSocketChannel用於例項化一個新的Channel來接受傳入連線。

④ 此處指定的處理程式將始終由新接受的Channel進行評估。這ChannelInitializer是一個特殊的處理程式,旨在幫助使用者配置新的Channel。您最有可能希望通過新增一些處理程式(例如DiscardServerHandler去實現網路應用)來配置ChannelPipeline of the new Channel。隨著應用程式變得複雜,您可能會向管道新增更多處理程式,並最終將此匿名類提取到頂級類中。

⑤ 您還可以設定特定於Channel實現的引數。我們正在編寫TCP / IP伺服器,因此我們可以設定套接字選項,如tcpNoDelay和keepAlive。請參考ChannelOption 和詳細的ChannelConfig 實現的介面文件以此可以對ChannelOption 的有一個大概的認識。

⑥ 你有沒有注意到option()和childOption()?option()英語於NioServerSocketChannel來接受傳入的連線。 childOption() 是提供給由父管道ServerChannel 接收到的連線,在這個例子中也是 NioServerSocketChannel。

⑦ 剩下的就是繫結到埠並啟動伺服器。在這裡,我們繫結到機器8080中所有NIC(網路介面卡)的埠。現在可以bind()根據需要多次呼叫該方法(使用不同的繫結地址。)

這裡可以使用telnet測試8080埠資料傳送,修改channelRead方法,列印其接受到的內容:

	 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) { // (1)
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg); // (2)
        }
    }

使用telnet測試步驟與效果參考博文:Telnet測試8080埠傳送資料


【3】編寫Echo應答伺服器

到目前為止,我們一直在使用資料而沒有響應。但是,伺服器通常應該響應請求。讓我們學習如何通過實現ECHO協議將響應訊息寫入客戶端,其中任何接收的資料都被髮回。

與我們在前面部分中實現的丟棄伺服器的唯一區別在於它將接收的資料發回,而不是將接收的資料列印到控制檯。因此,再次修改channelRead()方法就足夠了:

	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }

① ChannelHandlerContext 物件提供了許多操作,使你能夠觸發各種各樣的 I/O 事件和操作。這裡我們呼叫了 write(Object) 方法來逐字地把接受到的訊息寫入。請注意不同於 DISCARD 的例子我們並沒有釋放接受到的訊息,這是因為當寫入的時候 Netty 已經幫我們釋放了。

② ctx.write(Object) 方法不會使訊息寫入到通道上,他被緩衝在了內部,你需要呼叫 ctx.flush() 方法來把緩衝區中資料強行輸出。或者你可以用更簡潔的cxt.writeAndFlush(msg) 以達到同樣的目的。

總結:通過【2】【3】我們瞭解了一個基本的請求、應答netty server簡單流程。使用了 EventLoopGroup bossGroup , EventLoopGroup workerGroup = new NioEventLoopGroup();及ServerBootstrap等關鍵基礎元件。並通過繼承ChannelInboundHandlerAdapter覆蓋其channelRead方法來讀取訊息。接下來繼續學習其他事件方法,如channelActive()。


【4】Time Server

在這個部分被實現的協議是TIME 協議。和之前的例子不同的是在不接受任何請求時他會發送一個含32位的整數的訊息,並且一旦訊息傳送就會立即關閉連線。在這個例子中,你會學習到如何構建和傳送一個訊息,然後在完
成時關閉連線。

因為我們將會忽略任何接收到的資料,而只是在連線被建立傳送一個訊息,所以這次我們不能使用 channelRead() 方法了,代替他的是,我們需要覆蓋 channelActive() 方法,下面的就是實現的內容:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by Janus on 2018/11/19.
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

① channelActive() 方法將會在連線被建立並且準備進行通訊時被呼叫。因此讓我們在這個方法裡完成一個代表當前時間的32位整數訊息的構建工作。

② 為了傳送一個新的訊息,我們需要分配一個包含這個訊息的新的緩衝。因為我們需要寫入一個32位的整數,因此我們需要一個至少有4個位元組的ByteBuf。通過 ChannelHandlerContext.alloc() 得到一個當前的ByteBufAllocator,然後分配一個新的緩衝。

③ 和往常一樣我們需要編寫一個構建好的訊息。但是等一等,flip 在哪?難道我們使用 NIO 傳送訊息時不是呼叫 java.nio.ByteBuffer.flip() 嗎?ByteBuf 之所以沒有這個方法因為有兩個指標,一個對應讀操作一個對應寫操作。當你向 ByteBuf 裡寫入資料的時候寫指標的索引就會增加,同時讀指標的索引沒有變化。讀指標索引和寫指標索引分別代表了訊息的開始和結束。

比較起來,NIO 緩衝並沒有提供一種簡潔的方式來計算出訊息內容的開始和結尾,除非你呼叫 flip 方法。當你忘記呼叫 flip 方法而引起沒有資料或者錯誤資料被髮送時,你會陷入困境。這樣的一個錯誤不會發生在 Netty上,因為我們對於不同的操作型別有不同的指標。你會發現這樣的使用方法會讓你過程變得更加的容易,因為你已經習慣一種沒有使用 flip 的方式。

另外一個點需要注意的是 ChannelHandlerContext.write() (和 writeAndFlush() )方法會返回一個ChannelFuture 物件,一個 ChannelFuture 代表了一個還沒有發生的 I/O 操作。這意味著任何一個請求操作都不會馬上被執行,因為在 Netty 裡所有的操作都是非同步的。舉個例子下面的程式碼中在訊息被髮送之前可能會先關閉連線。

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();

因此你需要在 write() 方法返回的 ChannelFuture 完成後呼叫 close() 方法,然後當他的寫操作已經完成他會通知他的監聽者。請注意,close() 方法也可能不會立馬關閉,他也會返回一個ChannelFuture。

④ 當一個寫請求已經完成是如何通知到我們?這個只需要簡單地在返回的 ChannelFuture 上增加一個ChannelFutureListener。這裡我們構建了一個匿名的 ChannelFutureListener 類用來在操作完成時關閉 Channel。

或者,你可以使用簡單的預定義監聽器程式碼:

f.addListener(ChannelFutureListener.CLOSE);

為了測試我們的time服務如我們期望的一樣工作,你可以使用 UNIX 的 rdate 命令

$ rdate -o <port> -p <host>

Port 是你在main()函式中指定的埠,host 使用 locahost 就可以了。


【5】Time Client

不像 DISCARD 和 ECHO 的服務端,對於 TIME 協議我們需要一個客戶端,因為人們不能把一個32位的二進位制資料翻譯成一個日期或者日曆。

在 Netty 中,編寫服務端和客戶端最大的並且唯一不同的使用了不同的BootStrap 和Channel的實現。

TimeClient例項如下:

import com.netty.hanler.TimeClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by Janus on 2018/11/19.
 */
public class TimeClient {
    public static void main(String[] args) throws Exception {
//        String host = args[0];
//        int port = Integer.parseInt(args[1]);
        String host="localhost";
        //埠與Server保持一致
        int port=8080;
        // 這裡只有一個workerGroup 
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            //這裡為NioSocketChannel not NioServerSocketChannel
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

① BootStrap 和ServerBootstrap 類似,不過他是對非服務端的 channel 而言,比如客戶端或者無連線傳輸模式的 channel。

② 如果你只指定了一個EventLoopGroup,那他就會即作為一個 boss group ,也會作為一個 workder group,儘管客戶端不需要使用到 boss worker 。

③ 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel 被建立時使用。

④ 不像在使用 ServerBootstrap 時需要用 childOption() 方法,因為客戶端的SocketChannel 沒有父親。

⑤ 我們用 connect() 方法代替了 bind() 方法。


TimeClientHandler

正如你看到的,他和服務端的程式碼是不一樣的。ChannelHandler 是如何實現的?他應該從服務端接受一個32位的整數訊息,把他翻譯成人們能讀懂的格式,並列印翻譯好的時間,最後關閉連線:

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

① 在TCP/IP中,Netty 會把讀到的資料放到 ByteBuf 的資料結構中。

在這裡插入圖片描述

這樣看起來非常簡單,並且和服務端的那個例子的程式碼也相差不多。然而,處理器有時候會因為丟擲 IndexOutOfBoundsException 而拒絕工作。在下個部分我們會討論為什麼會發生這種情況。


【6】處理一個基於流的傳輸

① 關於 Socket Buffer的一個小警告

基於流的傳輸比如 TCP/IP, 接收到資料是存在 socket 接收的 buffer 中。不幸的是,基於流的傳輸並不是一個數據包佇列,而是一個位元組佇列。意味著,即使你傳送了2個獨立的資料包,作業系統也不會作為2個訊息處理而僅僅是作為一連串的位元組而言。因此這是不能保證你遠端寫入的資料就會準確地讀取。

舉個例子,讓我們假設作業系統的 TCP/TP 協議棧已經接收了3個數據包:

在這裡插入圖片描述

由於基於流傳輸的協議的這種普通的性質,在你的應用程式裡讀取資料的時候會有很高的可能性被分成下面的片段:
在這裡插入圖片描述

因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的資料整理成一個或者多個更有意思並且能夠讓程式的業務邏輯更好理解的資料。在上面的例子中,接收到的資料應該被構造成下面的格式:

在這裡插入圖片描述


② The First Solution

回到 TIME 客戶端例子。同樣也有類似的問題。一個32位整型是非常小的資料,他並不見得會被經常拆分到到不同的資料段內。然而,問題是他確實可能會被拆分到不同的資料段內,並且拆分的可能性會隨著通訊量的增加而增加。

最簡單的方案是構造一個內部的可積累的緩衝,直到4個位元組全部接收到了內部緩衝。下面的程式碼修改了 TimeClientHandler 的實現類修復了這個問題:

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

① ChannelHandler 有2個生命週期的監聽方法:handlerAdded()和 handlerRemoved()。你可以完成任意初始化任務只要他不會被阻塞很長的時間。

② 首先,所有接收的資料都應該被累積在 buf 變數裡。

③ 然後,處理器必須檢查 buf 變數是否有足夠的資料,在這個例子中是4個位元組,然後處理實際的業務邏輯。否則,Netty 會重複呼叫channelRead() 當有更多資料到達直到4個位元組的資料被積累。


③ The Second Solution

儘管第一個解決方案已經解決了 TIME 客戶端的問題了,但是修改後的處理器看起來不那麼的簡潔,想象一下如果由多個欄位比如可變長度的欄位組成的更為複雜的協議時,你的ChannelInboundHandler 的實現將很快地變得難以維護。

正如你所知的,你可以增加多個ChannelHandler 到ChannelPipeline ,因此你可以把一整個ChannelHandler拆分成多個模組以減少應用的複雜程度,比如你可以把TimeClientHandler 拆分成2個處理器:

  • TimeDecoder 處理資料拆分的問題
  • TimeClientHandler 原始版本的實現

幸運地是,Netty 提供了一個可擴充套件的類,幫你完成 TimeDecoder 的開發。

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}

① ByteToMessageDecoder 是ChannelInboundHandler 的一個實現類,他可以在處理資料拆分的問題上變得很簡單。

② 每當有新資料接收的時候,ByteToMessageDecoder 都會呼叫 decode() 方法來處理內部的那個累積緩衝。

③ Decode() 方法可以決定當累積緩衝裡沒有足夠資料時可以往 out 物件裡放任意資料。當有更多的資料被接收了 ByteToMessageDecoder 會再一次呼叫 decode() 方法。

④ 如果在 decode() 方法裡增加了一個物件到 out 物件裡,這意味著解碼器解碼訊息成功。ByteToMessageDecoder 將會丟棄在累積緩衝裡已經被讀過的資料。請記得你不需要對多條訊息呼叫 decode(),ByteToMessageDecoder 會持續呼叫 decode() 直到不放任何資料到 out 裡。

現在我們有另外一個處理器插入到ChannelPipeline 裡,我們應該在 TimeClient 裡修改 ChannelInitializer 的實現:

b.handler(new ChannelInitializer<SocketChannel>() {
	  @Override
	  public void initChannel(SocketChannel ch) throws Exception {
	      ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
	  }
});

如果你是一個大膽的人,你可能會嘗試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下 API文件來獲取更多的資訊。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

在這裡插入圖片描述
此外,Netty還提供了更多開箱即用的解碼器使你可以更簡單地實現更多的協議,幫助你避免開發一個難以維護的處理器實現。請參考下面的包以獲取更多更詳細的例子:


【7】用POJO代替ByteBuf

我們回顧了迄今為止的所有例子使用ByteBuf 作為協議訊息的主要資料結構。在本節中,我們將改善的 TIME 協議客戶端和伺服器例子,使用 POJO 代替 ByteBuf。

在ChannelHandler 使用 POIO 的好處很明顯:通過從ChannelHandler 中提取出 ByteBuf 的程式碼,將會使 ChannelHandler的實現變得更加可維護和可重用。

在 TIME 客戶端和伺服器的例子中,我們讀取的僅僅是一個32位的整形資料,直接使用 ByteBuf 不會是一個主要的問題。然而,你會發現當你需要實現一個真實的協議,分離程式碼變得非常的必要。

首先,讓我們定義一個新的型別叫做 UnixTime。

UnixTime例項如下:

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

現在我們可以修改下 TimeDecoder 類,返回一個 UnixTime,以替代ByteBuf:

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        System.out.println("進入TimeDecoder.decode方法");
        if (in.readableBytes() < 4) {
            System.out.println("Bytes is not reach 4");
            return; // (3)
        }
//        out.add(in.readBytes(4)); // (4)
        out.add(new UnixTime(in.readUnsignedInt())); // (4)
    }
}

當更新TimeDecoder 後,TimeClientHandler 不再使用任何的 ByteBuf 程式碼了。

	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

是不是變得更加簡單和優雅了?相同的技術可以被運用到服務端。讓我們修改一下 TimeServerHandler 的程式碼。

	 @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

現在,唯一缺少的功能是一個編碼器,是ChannelOutboundHandler的實現,用來將 UnixTime 物件重新轉化為一個 ByteBuf。這比編寫一個解碼器簡單得多,因為編碼訊息時不需要處理資料包拆分和組裝。

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

① 在這幾行程式碼裡還有幾個重要的事情:

  • 第一,通過ChannelPromise,當編碼後的資料被寫到了通道上 Netty
    可以通過這個物件標記是成功還是失敗。
  • 第二, 我們不需要呼叫 cxt.flush()。因為處理器已經單獨分離出了一個
    方法 void flush(ChannelHandlerContext cxt),如果像自己實現 flush() 方法內容可以自行覆蓋這個方法。

進一步簡化操作,你可以使用MessageToByteEncode:

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

最後的任務就是在 TimeServerHandler 之前把 TimeEncoder 插入到ChannelPipeline。 但這是不那麼重要的工作。

b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
	@Override
	public void initChannel(SocketChannel ch) throws Exception {
	ch.pipeline().addLast(new TimeEncoder(),new TimeServerHandler());
	}
})
.option(ChannelOption.SO_BACKLOG, 128)          // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

【8】關閉你的應用

關閉一個 Netty 應用往往只需要簡單地通過 shutdownGracefully() 方法來關閉你構建的所有的EventLoopGroup。

當EventLoopGroup 被完全地終止,並且對應的所有channel 都已經被關閉時,Netty 會返回一個Future物件來通知你。

workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();