1. 程式人生 > >Netty入門與實戰教程總結分享

Netty入門與實戰教程總結分享

  前言:都說Netty是Java程式設計師必須要掌握的一項技能,帶著不止要知其然還要知其所以然的目的,在慕課上找了一個學習Netty原始碼的教程,看了幾章後著實有點懵逼。雖然用過Netty,並且在自己的個人網站上實現了聊天室的功能。但是使用的還是Netty很少一部分功能,很多元件都沒涉及,很多API也似懂非懂。基礎都沒打牢,學習原始碼肯定懵逼。正好在掘金小冊上找到了一個Netty入門的教程,連結放在最後【非廣告】。學習過後,整理出了這麼一篇部落格。寫這篇部落格的目的一個是為了分享,另一個目的就是為了做個筆記,之後自己也可以當資料回顧一下。

  在學習Netty前的必需知識:NIO。如果不瞭解可以看這篇:手動搭建I/O網路通訊框架3:NIO程式設計模型,升級改造聊天室。對於BIO和AIO可以只看文字瞭解一下,但是NIO程式設計模型最好還是動手實踐一下,畢竟NIO目前是使用最廣的。還有一篇Netty實戰SpringBoot+Netty+WebSocket實現實時通訊。這是我實現個人網站的聊天室時寫的一篇,文字內容很少,主要是程式碼,最好粗略看看程式碼,因為下面有幾個地方會和這篇程式碼做一些比較,下面統稱Netty實戰。如果現在看不懂,等你認真看到這篇部落格的pipeline那裡,應該都會看懂。Netty實戰中的客戶端是Web,配合一些前端IM框架,客戶端實現起來非常簡單。但是聊天通訊的功能在APP中多一些,所以下面會說到Netty在客戶端中的使用。下面所有程式碼都確保可以正確執行,如果哪裡有問題,請留言指出,謝謝。

Netty是什麼?

  官方定義:Netty 是一個非同步事件驅動的網路應用框架,用於快速開發可維護的高效能伺服器和客戶端。

  簡單地說Netty封裝了JDK的NIO,不用再寫一大堆複雜的程式碼。既然代替了原生的NIO,肯定有比它好的理由,主要有如下幾點:

  1.Netty底層IO模型可以隨意切換,比如可以從NIO切換到BIO,但一般很少會這麼做。

  2.Netty自帶拆包解包,從NIO各種繁複的細節中脫離出來,讓開發者重點關心業務邏輯。

  3.Netty解決了NIO中Selector空輪詢BUG,這個BUG應該很多人聽說過,雖然官方宣告jdk1.6的update18修復了該問題,只不過是降低了發生的概率。

  4.對Selector做了很多細小的優化,reactor執行緒模型能做到高效的併發處理。

 

服務端啟動類詳解

  精簡的服務端Demo,與上面那篇Netty實戰中的程式碼相比只有一個啟動類,少了業務程式碼和初始化器。

public class NettyServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                    }
                });

        serverBootstrap.bind(8000);
    }
}
  • 兩個NioEventLoopGroup物件,可以看作兩個執行緒組。bossGroup的作用是監聽客戶端請求。workerGroup的作用是處理每條連線的資料讀寫。

  • ServerBootstrap是一個引導類,其物件的作用是引導伺服器的啟動工作。

  • .group是配置上面兩個執行緒組的角色,也就是誰去監聽誰去處理讀寫。上面只是建立了兩個執行緒組,並沒有實際使用。

  • .channel是配置服務端的IO模型,上面程式碼配置的是NIO模型。也可以配置為BIO,如OioServerSocketChannel.class。

  • .childHandler用於配置每條連線的資料讀寫和業務邏輯等。上面程式碼用的是匿名內部類,並沒有什麼內容。實際使用中為了規範起見,一般會再寫一個單獨的類也就是初始化器,在裡面寫上需要的操作。就如Netty實戰那篇中的程式碼一樣。

  • 最後就是繫結監聽埠了。

  引導類最小化的引數配置就是如上四個:配置執行緒組、IO模型、處理邏輯、繫結埠。

引導類serverBootstrap 的其他方法:

  1.handler()方法:上面的cildHandler是處理連線的讀寫邏輯,這個是用於指定服務端啟動中的邏輯.

serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {
    protected void initChannel(NioServerSocketChannel ch) {
        System.out.println("服務端啟動中");
    }
})

  2.attr()方法:給服務端的channel指定一些自定義屬性。然後通過channel.attr()取出這個屬性,其實就是給channel維護一個map。一般也用不上。

  3.childAttr()方法:作用和上面一樣,這個是針對客戶端的channel。

  4.option()方法:給服務端的channel設定屬性,如

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)

  上面程式碼表示系統用於臨時存放已完成三次握手的請求的佇列的最大長度,如果連線建立頻繁,伺服器處理建立新連線較慢,可以適當調大這個引數

  5.childOption()方法:大家肯定已經明白了Netty的命名規律,這個是給每條客戶端連線設定TCP相關的屬性,如

serverBootstrap
        //開啟TCP底層心跳機制
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        //開啟Nagle演算法,如果要求高實時性,有資料傳送時就馬上傳送,就關閉,如果需要減少傳送次數減少網路互動,就開啟。
        .childOption(ChannelOption.TCP_NODELAY, true)

 

客戶端啟動類

  還是說說那篇Netty實戰,裡面的客戶端是Web,所以用到了WebSocket。主要的重點還是在服務端上,客戶端實現起來相對容易,因為它只用傳送訊息和接收訊息。下面依舊寫出一個精簡的客戶端Demo,可以根據自己的專案型別還選擇客戶端的實現。

public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                // 1.指定執行緒模型
                .group(workerGroup)
                // 2.指定 IO 型別為 NIO
                .channel(NioSocketChannel.class)
                // 3.IO 處理邏輯
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                    }
                });
        // 4.建立連線
        bootstrap.connect("127.0.0.1", 8000).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("連線成功!");
            } else {
                System.err.println("連線失敗!");
                //重新連線
            }
        });
    }
}

  弄懂服務端程式碼後,客戶端就很好理解了,就不再一一說明了。主要作用就在註釋中。

重新連線

  網路環境差的情況下,客戶端第一次連線可能會失敗,所以我們需要嘗試重新連線。可以把連線connect上面的程式碼封裝起來,然後傳入一個Bootstrap型別的物件,通過這個物件迴圈連線。但是一般情況下,連線失敗後不會馬上重連,而是會通過一個指數退避的方式,比如每隔1s、2s、4s、8s....重新連線。

int MAX_RETRY=5;
connect(bootstrap, "127.0.0.1", 8000, MAX_RETRY);
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
    bootstrap.connect(host, port).addListener(future -> {
        if (future.isSuccess()) {
            System.out.println("連線成功!");
        } else if (retry == 0) {
            System.err.println("重試次數已用完,放棄連線!");
        } else {
            // 第幾次重連
            int order = (MAX_RETRY - retry) + 1;
            // 本次重連的間隔,1<<order相當於1乘以2的order次方
            int delay = 1 << order;
            System.err.println(new Date() + ": 連線失敗,第" + order + "次重連……");
            bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
                    .SECONDS);
        }
    });
}

  在上面的程式碼中,我們看到,我們定時任務是呼叫 bootstrap.config().group().schedule(), 其中 bootstrap.config() 這個方法返回的是 BootstrapConfig,他是對 Bootstrap 配置引數的抽象,然後 bootstrap.config().group() 返回的就是我們在一開始的時候配置的執行緒模型 workerGroup,調 workerGroup 的 schedule 方法即可實現定時任務邏輯。

  在 schedule 方法塊裡面,前面四個引數我們原封不動地傳遞,最後一個重試次數引數減掉一,就是下一次建立連線時候的上下文資訊。我們可以自行修改程式碼,更改到一個連線不上的服務端 Host 或者 Port,檢視控制檯日誌就可以看到5次重連日誌。

  bootstrap的其他方法,attr()和option()。作用和服務端的方法一樣。attr設定自定義屬性,option設定TCP相關的屬性。

 

資料傳輸的載體:ByteBuf

  NIO中經常使用的ByteBuffer,但它還有一些缺陷:

  1.長度固定。2.讀寫狀態需要通過filp()和rewind()手動轉換。3.功能有限。

  如果看過上面NIO那篇部落格中聊天室Demo,其實都會發現這些問題。長度設定固定的1024個位元組,讀寫也要用filp()轉換。所以Netty為了解決ByteBuffer的這些缺陷,設計了ByteBuf。其結構如下:

  1. ByteBuf 是一個位元組容器,容器裡面的的資料分為三個部分,第一個部分是已經丟棄的位元組,這部分資料是無效的;第二部分是可讀位元組,這部分資料是 ByteBuf 的主體資料, 從 ByteBuf 裡面讀取的資料都來自這一部分;最後一部分的資料是可寫位元組,所有寫到 ByteBuf 的資料都會寫到這一段。最後一部分虛線表示的是該 ByteBuf 最多還能擴容多少容量。

  2. 以上三段內容是被兩個指標給劃分出來的,從左到右,依次是讀指標(readerIndex)、寫指標(writerIndex),然後還有一個變數 capacity,表示 ByteBuf 底層記憶體的總容量。

  3. 從 ByteBuf 中每讀取一個位元組,readerIndex 自增1,ByteBuf 裡面總共有 writerIndex-readerIndex 個位元組可讀, 由此可以推論出當 readerIndex 與 writerIndex 相等的時候,ByteBuf 不可讀。

  4. 寫資料是從 writerIndex 指向的部分開始寫,每寫一個位元組,writerIndex 自增1,直到增到 capacity,這個時候,表示 ByteBuf 已經不可寫了。

  5. ByteBuf 裡面其實還有一個引數 maxCapacity,當向 ByteBuf 寫資料的時候,如果容量不足,那麼這個時候可以進行擴容,直到 capacity 擴容到 maxCapacity,超過 maxCapacity 就會報錯。

ByteBuf的API

  capacity():表示ByteBuf底層佔用了多少位元組,包括丟棄位元組、可讀位元組、可寫位元組。

  maxCapacity():表示ByteBuf最大能佔用多少位元組,也就是包括後面的可擴容的記憶體。

  readableBytes() 與 isReadable():前者表示當前可讀位元組數,也就是寫指標-讀指標。後者表示是否可讀。

  writableBytes()、 isWritable() 與 maxWritableBytes():第一個表示可寫位元組數。第二個表示是否可寫。第三個表示最大可寫位元組數。

  readerIndex() 與 readerIndex(int):前者返回當前的讀指標。後者可以設定讀指標。

  writeIndex() 與 writeIndex(int):和上面一樣,只是讀指標變成了寫指標。

  markReaderIndex() 與 resetReaderIndex():前者表示把當前讀指標儲存起來。後者表示把當前的讀指標恢復到儲存時的值。他們的功能其實readerIndex() 與 readerIndex(int)一樣可以實現,但一般會選擇下面兩句,因為不用定義一個變數。

int readerIndex = buffer.readerIndex();
buffer.readerIndex(readerIndex);

//和上面兩句等價
buffer.markReaderIndex();
buffer.resetReaderIndex();

  writeBytes(byte[] src) 與 buffer.readBytes(byte[] dst):前者表示把src寫到ByteBuf。後者表示把ByteBuf全部資料讀取到dst。

  writeByte(byte b) 與 buffer.readByte():前者表示把位元組b寫道ByteBuf。後者表示從ByteBuf讀取一個位元組。類似的 API 還有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 與 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 這裡就不一一贅述了。

  release() 與 retain():由於 Netty 使用了堆外記憶體,而堆外記憶體是不被 jvm 直接管理的,也就是說申請到的記憶體無法被垃圾回收器直接回收,所以需要我們手動回收。Netty 的 ByteBuf 是通過引用計數的方式管理的,如果一個 ByteBuf 沒有地方被引用到,需要回收底層記憶體。預設情況下,當建立完一個 ByteBuf,它的引用為1,然後每次呼叫 retain() 方法, 它的引用就加一, release() 方法原理是將引用計數減一,減完之後如果發現引用計數為0,則直接回收 ByteBuf 底層的記憶體。

  slice()、duplicate()、copy():這三個都會返回一個新的ByteBuf。第一個是擷取讀指標到寫指標範圍內的一段內容。第二個是擷取整個ByteBuf,包括資料和指標資訊。第三個是拷貝所有資訊,除了第二個API的內容還包括底層資訊,因此拷貝後的新ByteBuf任何操作不會影響原始的ByteBuf。

 

 

實戰:服務端和客戶端雙向通訊

  瞭解客戶端、服務端的啟動類和ByteBuf以後,可以進行一個簡單的實戰了。

  首先看看前面的客戶端程式碼,.handler裡重寫的initChannel方法並沒實際內容。現在加上邏輯處理器,其實就是一個執行邏輯程式碼的類,怎麼叫無所謂,明白它的意思就行。

  客戶端

.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new FirstClientHandler());
    }
});

  ch.pipeline().addLast()就是新增一個邏輯處理器。我們在FirstClientHandler裡新增對應的邏輯程式碼就行。

public class FirstClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("客戶端傳送訊息...");
        // 1. 獲取資料
        ByteBuf buffer = getByteBuf(ctx);
        // 2. 寫資料
        ctx.channel().writeAndFlush(buffer);
    }
    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 獲取二進位制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        // 2. 準備資料,指定字串的字符集為 utf-8
        byte[] bytes = ("【客戶端】:這是客戶端傳送的訊息:"+new Date()).getBytes(Charset.forName("utf-8"));
        // 3. 填充資料到 ByteBuf
        buffer.writeBytes(bytes);
        return buffer;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        //接收服務端的訊息並列印
        System.out.println(byteBuf.toString(Charset.forName("utf-8")));
    }
}
  • channelActive()方法會在客戶端與伺服器建立連線後呼叫。所以我們可以在這裡面編寫邏輯程式碼

  • .alloc().buffer()的作用是把字串的二進位制資料填充到ByteBuf。

  • .writeBytes()的作用是把資料寫到伺服器。

  • channelRead()在接受到服務端的訊息後呼叫。

  服務端

  同樣的我們需要在initChannel()裡新增一個邏輯處理器。

.childHandler(new ChannelInitializer<NioSocketChannel>() {
    protected void initChannel(NioSocketChannel ch) {
        ch.pipeline().addLast(new FirstServerHandler());
    }
});

  邏輯處理器裡的程式碼

public class FirstServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(new Date() + ": 服務端讀到資料 -> " + byteBuf.toString(Charset.forName("utf-8")));
        //接收到客戶端的訊息後我們再回復客戶端
        ByteBuf out = getByteBuf(ctx);
        ctx.channel().writeAndFlush(out);
    }
    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "【伺服器】:我是伺服器,我收到你的訊息了!".getBytes(Charset.forName("utf-8"));
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);
        return buffer;
    }
}

  channelRead()方法在接收到客戶端發來的資料後呼叫。

  最後我們啟動客戶端和服務端的啟動類,效果如下:

 

 

自定義客戶端與服務端的通訊協議

什麼是通訊協議?

  TCP通訊的資料包格式均為二進位制,協議指的就是客戶端與服務端制定一個規則,這個規則規定了每個二進位制資料包中每一段的位元組代表什麼含義。客戶端與服務端通訊的過程如下。

 

  客戶端首先把一個Java物件轉換成二進位制資料,然後通過網路傳輸給服務端。這裡的傳輸過程屬於TCP/IP協議負責,與我們應用層無關。

通訊協議的設計

  1.第一個欄位是魔數,可以理解為識別這條二進位制資料型別的欄位。在《深入理解Java虛擬機器》中這麼解釋:使用魔數而不是副檔名來進行識別主要是基於安全方面的考慮,因為副檔名可以隨意地改動。檔案格式的制定者可以自由地選擇魔數值,只要這個魔數值還沒有被廣泛採用過同時又不會引起混淆即可。

  2.第二個是版本號,就像IPV4和IPV6一樣。能夠支援協議的升級。

  3.第三個表示如何把Java物件轉換成二進位制資料和把二進位制資料轉回Java物件。

  4.第四個用於區分這條資料是幹嘛的或者說叫資料型別,如:這是傳送的訊息,還是登入的請求等。服務端就可以根據這個指令執行不同的邏輯程式碼。

  5.第五個代表後面的資料長度。

  6.第六個代表傳送的資料,如果指令表明這是個登入資料,裡面儲存的就是賬號密碼。

通訊協議的實現

  以實現登入為例,下面介面和類有點多,建議先把程式碼拷貝到IDE裡,分好包寫好註釋,助於理解它們的關係。

  1.首先建立一個Java物件,這裡以登入時的請求響應為例

@Data
public abstract class Packet {
    //協議版本
    private Byte version = 1;
    //獲取資料型別
    public abstract Byte getCommand();
}

  @Date註解由lombok提供,不瞭解的可以看看這個https://www.cnblogs.com/lbhym/p/12551021.html

public interface Command {
    //定義登入請求指令和響應指令為1和2,其他的指令同理如MESSAGE_REQUEST等
    Byte LOGIN_REQUEST = 1;
    Byte LOGIN_RESPONSE = 2;
}
//這個是登入請求資料包的Java物件,所以呼叫的是上面介面的登入請求指令,其他型別的資料包同理
@Data
public class LoginRequestPacket extends Packet {
    //定義使用者資訊
    private Integer userId;
    private String username;
    private String password;
    @Override
    public Byte getCommand() {
        return LOGIN_REQUEST;
    }
}
@Data
public class LoginResponsePacket extends Packet {
    //是否登入成功
    private boolean success;
    //如果失敗,返回的資訊
    private String reason;
    @Override
    public Byte getCommand() {
        return LOGIN_RESPONSE;
    }
}

  2.Java物件建立完了,再定義Java物件轉換的規則

//序列化介面
public interface Serializer {
    Serializer DEFAULT = new JSONSerializer();
    //序列化演算法
    byte getSerializerAlogrithm();
    //java 物件轉換成二進位制
    byte[] serialize(Object object);
    //二進位制轉換成 java 物件
    <T> T deserialize(Class<T> clazz, byte[] bytes);
}

  介面定義完後開始實現介面。這裡的序列化演算法使用的是fastjson裡面的。需要在pom.xml裡匯入。

public interface SerializerAlgorithm {
    //json序列化標識,如果你有其他的序列化方式可以在這注明標識,類似上面的登入指令
    byte JSON = 1;
}
//實現上面定義的序列化介面
public class JSONSerializer implements Serializer {
    @Override
    public byte getSerializerAlgorithm() {
        //獲取上面的序列化標識
        return SerializerAlgorithm.JSON;
    } 
    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }
    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        return JSON.parseObject(bytes, clazz);
    }
}

  3.建立一個類PacketCodeC,裡面寫上編解碼的方法。這裡再說一點,因為使用了@Data註解,所以有的get方法在語法檢測階段會報錯,可以在IDEA裡面下載Lombok外掛。

public class PacketCodeC {
    //自定義一個魔數
    private static final int MAGIC_NUMBER = 0x12345678;
    //建立一個靜態例項供外部呼叫
    public static final PacketCodeC INSTANCE=new PacketCodeC();
    //建立兩個map,一個儲存資料型別,如:是登入資料還是普通訊息等。第二個是儲存序列化型別。
    //這樣在解碼時就可以把資料轉換為對應的型別。如:這個byte陣列是LOGIN_REQUEST型別,就把它轉換成LoginRequestPacket型別的Java物件
    private  final Map<Byte, Class<? extends Packet>> packetTypeMap;
    private  final Map<Byte, Serializer> serializerMap;

    private PacketCodeC() {
        //初始化map並新增資料型別和序列化型別,如果有其他資料型別,記得在這裡新增
        packetTypeMap = new HashMap<>();
        packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
        packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
        serializerMap = new HashMap<>();
        Serializer serializer = new JSONSerializer();
        serializerMap.put(serializer.getSerializerAlogrithm(), serializer);
    }
    //編碼
    public ByteBuf encode(ByteBufAllocator bufAllocator,Packet packet) {
        // 1. 建立 ByteBuf 物件
        ByteBuf byteBuf = bufAllocator.ioBuffer();
        // 2. 序列化 Java 物件
        byte[] bytes = Serializer.DEFAULT.serialize(packet);
        // 3. 實際編碼過程,把通訊協議幾個部分,一一編碼
        byteBuf.writeInt(MAGIC_NUMBER);
        byteBuf.writeByte(packet.getVersion());
        byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlogrithm());
        byteBuf.writeByte(packet.getCommand());
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);
        return byteBuf;
    }
    //解碼
    public Packet decode(ByteBuf byteBuf) {
        // 跳過魔數
        byteBuf.skipBytes(4);
        // 跳過版本號
        byteBuf.skipBytes(1);
        // 序列化演算法標識
        byte serializeAlgorithm = byteBuf.readByte();
        // 指令
        byte command = byteBuf.readByte();
        // 資料包長度
        int length = byteBuf.readInt();
        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes);
        Class<? extends Packet> requestType = getRequestType(command);
        Serializer serializer = getSerializer(serializeAlgorithm);
        if (requestType != null && serializer != null) {
            return serializer.deserialize(requestType, bytes);
        }
        return null;
    }
    //獲取序列化型別
    private Serializer getSerializer(byte serializeAlgorithm) {
        return serializerMap.get(serializeAlgorithm);
    }
    //獲取資料型別
    private Class<? extends Packet> getRequestType(byte command) {
        return packetTypeMap.get(command);
    }
}

使用自定義通訊協議

  最後通過一個登入示例,來使用一下上面自定義的通訊協議。

  基於上面的程式碼,首先更換一下客戶端和服務端的邏輯處理器,直接在原來的邏輯處理器裡面修改邏輯程式碼也行。

.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new ClientHandler());
            }
        });
.childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new ServerHandler());
            }
        }

  客戶端在連線上服務端後立即登入,下面為客戶端登入程式碼

public class ClientHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ": 客戶端開始登入");
        // 建立登入物件
        LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
        loginRequestPacket.setUserId(new Random().nextInt(10000));
        loginRequestPacket.setUsername("username");
        loginRequestPacket.setPassword("pwd");
        // 編碼
        ByteBuf buffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);
        // 寫資料
        ctx.channel().writeAndFlush(buffer);
    }
    //接收服務端資訊
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
        //如果資料型別是登入,就進行登入判斷
        //同理可以判斷資料是否是普通訊息,還是其他型別的資料
        if (packet instanceof LoginResponsePacket) {
            LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;
            if (loginResponsePacket.isSuccess()) {
                System.out.println(new Date() + ": 客戶端登入成功");
            } else {
                System.out.println(new Date() + ": 客戶端登入失敗,原因:" + loginResponsePacket.getReason());
            }
        }
    }
}

  下面是服務端程式碼

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf requestByteBuf = (ByteBuf) msg;
        // 解碼
        Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
        // 判斷是否是登入請求資料包
        if (packet instanceof LoginRequestPacket) {
            LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;
            LoginResponsePacket loginResponsePacket=new LoginResponsePacket();
            loginResponsePacket.setVersion(packet.getVersion());
            // 登入校驗
            if (valid(loginRequestPacket)) {
                // 校驗成功
                loginResponsePacket.setSuccess(true);
                System.out.println("客戶端登入成功!");
            } else {
                // 校驗失敗
                loginResponsePacket.setReason("賬號或密碼錯誤");
                loginResponsePacket.setSuccess(false);
                System.out.println("客戶端登入失敗!");
            }
            // 編碼,結果傳送給客戶端
            ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
            ctx.channel().writeAndFlush(responseByteBuf);
        }
    }
    private boolean valid(LoginRequestPacket loginRequestPacket) {
        //這裡可以查詢資料庫,驗證使用者的賬號密碼是否正確
        return true;
    }
}

  最後演示效果如圖所示:

自己實現收發訊息

  按照上面的登入功能,可以自己嘗試實現一下收發訊息的功能。首先還是要定義一個收發訊息的Java物件。還需要在Command裡面加上收發訊息的指令,在編解碼類裡面的map新增Java物件。收發訊息前,肯定需要登入。那麼怎麼判斷一個客戶端是否登入呢?可以通過ctx獲取channel,然後通過channel的attr方法設定屬性。如果登入成功這個屬性就設為true。然後在客戶端的啟動類裡連線成功後,新建一個執行緒專門監聽使用者的輸入,新建輸入執行緒前判斷一下登入屬性是否為true就行了。其他的地方跟登入就沒有太大差別了。

 

pipeline與channelHandler

  通過上面的一些實戰,可以發現我們所有的邏輯程式碼都寫在了一個Handler類裡面,幸好現在需要處理的業務不是很多。如果以後功能拓展,這個類會變得非常臃腫。Netty中的pipeline和channelHandler就是解決這個問題的,它們通過責任鏈設計模式來組織程式碼邏輯,並且能夠支援邏輯的新增和刪除,能夠支援各類協議拓展,如HTTP、Websocket等。可以看看Netty實戰部落格中的初始化器類,裡面就是通過pipeline添加了各類協議和一些邏輯程式碼。

pipeline與channelHandler的構成

  

  我們知道一個連線對應一個channel,這個channel的所有處理邏輯在一個ChannelPipeline物件裡,就是上圖中的pipeline,這是它的物件名。然後這個物件裡面是一個雙向連結串列結構,每個節點是一個ChannelHandlerContext物件。這個物件能拿到與channel相關的所有上下文資訊,這個物件還包含一個重要的物件:ChannelHandler,它的分類如下。

  簡單地說,它包含兩個介面和這兩個介面的實現類,圖中左邊的實現類是不是很熟悉,就是我們自己寫的邏輯處理器裡的繼承的類。從名字就可以看出,它們的作用分別是讀資料和寫資料,或理解為入站和出戰。最重要的兩個方法分別為channelRead():訊息入站。和write():訊息出戰。

構建客戶端與服務端的pipeline

  下面的程式碼基於上面的登入示例改造。

  我們先了解一下ByteToMessageDecoder這個類。不論客戶端還是服務端收到資料後,都會先進行解碼,這個類就是Netty提供的專門做這個事情的。使用如下:

public class PacketDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
        out.add(PacketCodeC.INSTANCE.decode(in));
    }
}

  這個類有一個好處就是,ByteBuf預設使用的是堆外記憶體,而它會幫你自動釋放記憶體,無需我們關心。上面是解碼,對應的Netty也準備了一個類來專門編碼:MessageToByteEncoder.

public class PacketEncoder extends MessageToByteEncoder<Packet> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {
        PacketCodeC.INSTANCE.encode(out, packet);
    }
}

  注意encode傳入的引數,第一個引數變成了ByteBuf的型別,所以我們需要把PacketCodeC裡的encode方法的引數改過來,也不需要第一行建立一個ByteBuf物件了。

  如果不明白為什麼要用到這兩個類的話,我先展示一段Netty實戰部落格裡面的程式碼:

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline= socketChannel.pipeline();
        //http解碼器
        pipeline.addLast(new HttpServerCodec());
        //....
        //websocket支援,設定路由
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //新增自定義的助手類(邏輯處理器)
        pipeline.addLast(new NettyHandler());
    }
}

  在這篇實戰中我沒有實現自己的編解碼器,而是直接使用了http的解碼器。類似的,我們可以把自己的編解碼器也通過pepeline新增到邏輯鏈中。就像前面說的,可以新增刪除邏輯程式碼,每個功能各司其職,而不是一股腦的全在一個地方。用這兩個類還有一個好處就是Netty會自動識別這兩個類,從而自動編解碼而不需要我們自己去呼叫。

  編解碼的問題解決了,再看看邏輯處理器類。看看登入的程式碼,如果我們不止實現登入功能,還有收發等其他功能,是不是要用大量的if else把各個訊息型別分開,然後執行不同的邏輯。不同的邏輯都擠在一個方法中,顯然也太擁擠了。因此Netty基於這種考慮,抽象出了SimpleChannelInboundHandler。下面看看它是如何解決這個問題的:

public class ClientLoginHandler extends SimpleChannelInboundHandler<LoginResponsePacket>{
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("客戶端開始登入....");
        // 建立登入物件
        LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
        loginRequestPacket.setUserId(new Random().nextInt(10000));
        loginRequestPacket.setUsername("username");
        loginRequestPacket.setPassword("pwd");
        // 寫資料
        ctx.channel().writeAndFlush(loginRequestPacket);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {
        if (loginResponsePacket.isSuccess()) {
            System.out.println(new Date() + ": 客戶端登入成功");
        } else {
            System.out.println(new Date() + ": 客戶端登入失敗,原因:" + loginResponsePacket.getReason());
        }
    }
}
public class ServerLoginHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
        //服務端登入邏輯
        ctx.channel().writeAndFlush(login(loginRequestPacket));
    }
    private LoginResponsePacket login(LoginRequestPacket loginRequestPacket) {
        LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
        loginResponsePacket.setVersion(loginRequestPacket.getVersion());
        // 登入校驗(自行判斷使用者資訊是否正確)
        if (true) {
            // 校驗成功
            loginResponsePacket.setSuccess(true);
            System.out.println("客戶端登入成功!");
            return loginResponsePacket;
        } else {
            // 校驗失敗
            loginResponsePacket.setReason("賬號或密碼錯誤");
            loginResponsePacket.setSuccess(false);
            System.out.println("客戶端登入失敗!");
            return loginResponsePacket;
        }
    }
}

  類似的,收發訊息也可以這麼做。Netty會自動根據抽象類後面的泛型來區分它要呼叫哪個類。比如我們傳送的是一個SendMessage型別的Java物件,它就會在繼承了SimpleChannelInboundHandler的類中找到泛型為SendMessage的類去執行。

  最後我們要把這些邏輯程式碼根據服務端和客戶端不同的需求新增到它們的pipeline中。

  客戶端

.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        //解碼一定要放在第一個,在這裡pipeline按順序執行,不然接收訊息無法正常使用
                        ch.pipeline().addLast(new PacketDecoder());
                        ch.pipeline().addLast(new LoginResponseHandler());
                        ch.pipeline().addLast(new PacketEncoder());
                    }
                });

  服務端

.childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new PacketDecoder());
                        ch.pipeline().addLast(new LoginRequestHandler());
                        ch.pipeline().addLast(new PacketEncoder());
                    }
                });

  最後的執行結果和登入示例一樣。看到這裡,Netty實戰中的程式碼,應該可以全部看懂了。

channelHandler的生命週期

  我們在重寫ChannelInboundHandlerAdapter或者SimpleChannelInboundHandler裡的方法的時候,只用到了讀和Active,其他一大堆沒用上的方法是幹嘛的?現在就一一說明一下,這些方法運作的整個過程,我們可以理解為這個channelHandler的生命週期。以ChannelInboundHandlerAdapter為例,SimpleChannelInboundHandler繼承於ChannelInboundHandlerAdapter,所以也差不多,個別方法名不一樣而已。下面的API,從上到下,就是觸發的順序。

  handlerAdded():邏輯處理器被新增後觸發。

  channelRegistered():channel繫結到執行緒組後觸發。

  channelActive():channel準備就緒,或者說連線完成。

  channelRead():channel有資料可讀。

  channelReadComplete():channel某次資料讀完了。

  channelInactive():channel被關閉

  channelUnregistered():channel取消執行緒的繫結

  handlerRemoved():邏輯處理器被移除。

 

拆包和粘包

  以上面客戶端和服務端雙向通訊的程式碼為例。簡單修改一下,在建立連線後,客戶端用一個迴圈向伺服器傳送訊息。然後服務端列印這些訊息。

等次數多了以後,服務端列印時會發現一些問題,比如我們傳送的字串為“123456789”,大部分列印的是123456789;有一部分變成了123456789123,這就是粘包;有一部分變為了1234,這就是拆包。

為什麼會有這種現象?

  雖然在我們程式碼層面,傳輸的資料單位是ByteBuf。但是到了更底層,用到了TCP協議,終究會按照位元組流傳送資料。而底層並不知道應用層資料的具體含義,它會根據TCP緩衝區的實際情況進行資料包的劃分。所以最終到達服務端的資料產生上面的現象。

如何解決?

  Netty為我們提供了4種解決方法:

  1.FixedLengthFrameDecoder:固定長度拆包器,每個資料包長度都是固定的。

  2.LineBasedFrameDecoder:行拆包器,每個資料包之間以換行符作為分隔。

  3.DelimiterBasedFrameDecoder:類似行拆包器,不過我們可以自定義分隔符。

  4.LengthFieldBasedFrameDecoder:基於長度域拆包器,最常用的,只要你的自定義協議中包含資料長度這個部分,就可以使用。它需要三個引數,第一個是資料包最大長度、第二個是引數長度域偏移量、第三個是長度域長度。

  

  看看前面通訊協議的圖,所謂長度域就是資料長度就是資料長度佔用的位元組,這裡是4。長度域偏移量就是資料長度這個部分在通訊協議組成部分中的位置,前面幾個部分加起來是7,所以它的偏移量就是7。

使用LengthFieldBasedFrameDecoder

  新增到客戶端和服務端pipeline中就行了,注意要放在第一個。以服務端為例。

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new PacketEncoder());

  如果和我一樣,客戶端用到的是WebSocket,沒有自定義協議,就不用新增拆包器了,Netty已經幫我們做好了。

 

 

心跳與空閒檢測

 

  網路應用程式普遍會遇到的一個問題就是連線假死。簡單地說就是因為網路或其他問題導致某一端實際上(TCP)已經斷開連線,但是應用程式沒有檢測到,以為還連線著。對服務端來說,這會浪費系統資源,導致效能下降。對於客戶端來說,假死會造成資料傳送超時,影響體驗。

 

如何解決這個問題?

 

  只需要客戶端每隔一段時間打個招呼,表示它還活著就行了,就是所謂的心跳。Netty自帶的IdleStateHandler 就可以實現這個功能。下面就來實現它。

 

  服務端

//心跳檢測類
public class IMIdleStateHandler extends IdleStateHandler {
    //讀空閒時間,也就是多久沒讀到資料了
    private static final int READER_IDLE_TIME = 15;
    public IMIdleStateHandler() {
        //呼叫父類建構函式,四個引數分別為:
        //讀空閒時間、寫空閒時間、讀寫空閒時間、時間單位
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒內未讀到資料,關閉連線");
        ctx.channel().close();
    }
}
//回覆客戶端傳送的心跳資料包
public class HeartBeatRequestHandler extends SimpleChannelInboundHandler<HeartBeatRequestPacket> {
    public static final HeartBeatRequestHandler INSTANCE = new HeartBeatRequestHandler();
    private HeartBeatRequestHandler() {
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HeartBeatRequestPacket requestPacket) {
        ctx.writeAndFlush(new HeartBeatResponsePacket());
    }
}
ch.pipeline().addLast(new IMIdleStateHandler());//新增到最前面
  ch.pipeline().addLast(HeartBeatRequestHandler.INSTANCE);//新增到解碼和登入請求之後

  客戶端

  服務端實現了檢測讀空閒,客戶端肯定就需要傳送一個數據。

public class HeartBeatTimerHandler extends ChannelInboundHandlerAdapter {
    //心跳資料包傳送時間間隔,這裡設為5秒,實際使用時建議服務端和客戶端都設成分鐘級別
    private static final int HEARTBEAT_INTERVAL = 5;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        scheduleSendHeartBeat(ctx);
        super.channelActive(ctx);
    }
    private void scheduleSendHeartBeat(ChannelHandlerContext ctx) {
        //schedule類似延時任務,每隔5秒傳送心跳資料包
        ctx.executor().schedule(() -> {
            if (ctx.channel().isActive()) {
                ctx.writeAndFlush(new HeartBeatRequestPacket());
                scheduleSendHeartBeat(ctx);
            }
        }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }
}

  客戶端傳送心跳資料包後,也需要檢測服務端是否回覆了自己,所以也需要個檢測類,與服務端的程式碼一樣,就不寫了。也需要和服務端一樣,pipeline新增到相同的位置。

 

  最後:如果服務端在讀到資料後,不要再read方法裡面直接訪問資料庫或者其他比較複雜的操作,可以把這些耗時的操作放進我們的業務執行緒池中去執行。如:

ThreadPool threadPool = xxx;
protected void channelRead0(ChannelHandlerContext ctx, T packet) {
    threadPool.submit(new Runnable() {
        // 1. balabala 一些邏輯
        // 2. 資料庫或者網路等一些耗時的操作
        // 3. writeAndFlush()
    })
}

  如果我們想統計某個操作的響應時間,直接用System.currentTimeMillis()其實是不準確的,因為有些操作是非同步的,它馬上就返回了,所以我們要判斷非同步結果是否完成再計算結束時間。

protected void channelRead0(ChannelHandlerContext ctx, T packet) {
    threadPool.submit(new Runnable() {
        long begin = System.currentTimeMillis();
        // 1. balabala 一些邏輯
        // 2. 資料庫或者網路等一些耗時的操作
        // 3. writeAndFlush
        xxx.writeAndFlush().addListener(future -> {
            if (future.isDone()) {
                long time =  System.currentTimeMillis() - begin;
            }
        });
    })
}

 

資料:https://juejin.im/book/5b4bc28bf265da0f60130116

 

&n