1. 程式人生 > 實用技巧 >netty之---netty的的原理以及介紹使用

netty之---netty的的原理以及介紹使用

第一章 netty的介紹

官網介紹

The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance and high-scalability protocol servers and clients.
In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.
'Quick and easy' does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences learned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

netty是由JBOSS提供的一個java開源框架,是一個非同步的,基於事件驅動的網路應用框架,用以快速開發高效能、高可靠性的網路IO程式,netty本質上是一個NIO框架,適用於伺服器通訊相關應用場景。從線圖可以看出關係(稍微有點醜)Netty是對NIO進一步封裝。

現在一些流行的框架比如Dubbo,grpc都使用了netty,官網上有宣告https://netty.io/wiki/related-projects.html。但是kafka是一股清流,為了方式依賴地獄或者受限制,自己手擼了一套套接字,強悍!

第二章 Java BIO、NIO、AIO程式設計

首選需要搞清楚同步、非同步、阻塞與非阻塞的概念,以銀行取款為例:

  • 同步 : 自己親自出馬持銀行卡到銀行取錢(使用同步IO時,Java自己處理IO讀寫);
  • 非同步 : 委託一小弟拿銀行卡到銀行取錢,然後給你(使用非同步IO時,Java將IO讀寫委託給OS處理,需要將資料緩衝區地址和大小傳給OS(銀行卡和密碼),OS需要支援非同步IO操作API);
  • 阻塞 : ATM排隊取款,你只能等待(使用阻塞IO時,Java呼叫會一直阻塞到讀寫完成才返回);
  • 非阻塞 : 櫃檯取款,取個號,然後坐在椅子上做其它事,等號廣播會通知你辦理,沒到號你就不能去,你可以不斷問大堂經理排到了沒有,大堂經理如果說還沒到你就不能去(使用非阻塞IO時,如果不能讀寫Java呼叫會馬上返回,當IO事件分發器會通知可讀寫時再繼續進行讀寫,不斷迴圈直到讀寫完成)

搞清楚netty之前,需要先搞清楚NIO是什麼?目前java共支援3種網路變成模型I/O模式 BIO,NIO,AIO,首先對比一下這三種模式的區別

Java對BIO、NIO、AIO的支援:

  • Java BIO : 同步並阻塞,伺服器實現模式為一個連線一個執行緒,即客戶端有連線請求時伺服器端就需要啟動一個執行緒進行處理,如果這個連線不做任何事情會造成不必要的執行緒開銷,當然可以通過執行緒池機制改善。

  • Java NIO : 同步非阻塞,伺服器實現模式為一個請求一個執行緒,即客戶端傳送的連線請求都會註冊到多路複用器上,多路複用器輪詢到連線有I/O請求時才啟動一個執行緒進行處理。(對於NIO,有的地方叫no-blocking io有的叫 new I/O,其實都是一樣的意思)

  • Java AIO(NIO.2) : 非同步非阻塞,伺服器實現模式為一個有效請求一個執行緒,客戶端的I/O請求都是由OS先完成了再通知伺服器應用去啟動執行緒進行處理

另外,I/O屬於底層操作,需要作業系統支援,併發也需要作業系統的支援,所以效能方面不同作業系統差異會比較明顯。

在高效能的I/O設計中,有兩個比較著名的模式Reactor和Proactor模式,其中Reactor模式用於同步I/O,而Proactor運用於非同步I/O操作。

在比較這兩個模式之前,我們首先的搞明白幾個概念,什麼是阻塞和非阻塞,什麼是同步和非同步,同步和非同步是針對應用程式和核心的互動而言的,同步指的是使用者程序觸發IO操作並等待或者輪詢的去檢視IO操作是否就緒,而非同步是指使用者程序觸發IO操作以後便開始做自己的事情,而當IO操作已經完成的時候會得到IO完成的通知。而阻塞和非阻塞是針對於程序在訪問資料的時候,根據IO操作的就緒狀態來採取的不同方式,說白了是一種讀取或者寫入操作函式的實現方式,阻塞方式下讀取或者寫入函式將一直等待,而非阻塞方式下,讀取或者寫入函式會立即返回一個狀態值。

一般來說I/O模型可以分為:同步阻塞,同步非阻塞,非同步阻塞,非同步非阻塞IO

同步阻塞IO:在此種方式下,使用者程序在發起一個IO操作以後,必須等待IO操作的完成,只有當真正完成了IO操作以後,使用者程序才能執行。JAVA傳統的IO模型屬於此種方式!

同步非阻塞IO:在此種方式下,使用者程序發起一個IO操作以後邊可返回做其它事情,但是使用者程序需要時不時的詢問IO操作是否就緒,這就要求使用者程序不停的去詢問,從而引入不必要的CPU資源浪費。其中目前JAVA的NIO就屬於同步非阻塞IO。

非同步阻塞IO:此種方式下是指應用發起一個IO操作以後,不等待核心IO操作的完成,等核心完成IO操作以後會通知應用程式,這其實就是同步和非同步最關鍵的區別,同步必須等待或者主動的去詢問IO是否完成,那麼為什麼說是阻塞的呢?因為此時是通過select系統呼叫來完成的,而select函式本身的實現方式是阻塞的,而採用select函式有個好處就是它可以同時監聽多個檔案控制代碼,從而提高系統的併發性!

非同步非阻塞IO:在此種模式下,使用者程序只需要發起一個IO操作然後立即返回,等IO操作真正的完成以後,應用程式會得到IO操作完成的通知,此時使用者程序只需要對資料進行處理就好了,不需要進行實際的IO讀寫操作,因為真正的IO讀取或者寫入操作已經由核心完成了。

下面示意圖簡單描述了BIO到NIO的變遷

BIO、NIO、AIO適用場景分析:

  • BIO方式適用於連線數目比較小且固定的架構,這種方式對伺服器資源要求比較高,併發侷限於應用中,JDK1.4以前的唯一選擇,但程式直觀簡單易理解。

  • NIO方式適用於連線數目多且連線比較短(輕操作)的架構,比如聊天伺服器,併發侷限於應用中,程式設計比較複雜,JDK1.4開始支援。

  • AIO方式使用於連線數目多且連線比較長(重操作)的架構,比如相簿伺服器,充分呼叫OS參與併發操作,程式設計比較複雜,JDK7開始支援。

2.1 Java BIO 程式設計

BIO提供了一種端到端的通訊,相當於對傳輸層的一種封裝,服務端啟動,等待客戶端的連線,在客戶端連線到服務端後,服務端啟動一個執行緒去監聽客戶端訊息,客戶端傳送訊息,並等待服務端返回(客戶端一直阻塞),服務端收到訊息,將訊息返回給客戶端,此時一次互動完成。若還需互動,則不釋放連線,客戶端再次將訊息傳送給服務端,並等待返回,若不需要互動,則客戶端釋放連線。

簡單demo:

服務端:

package com.yang.java.main.bio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/9
 */
public class BioServer {

    public static void main(String[] args) throws IOException {
        // 建立一個執行緒池,當有socket連結,就建立一個執行緒,機型通訊
        ExecutorService executorService = Executors.newCachedThreadPool();
        ServerSocket serverSocket = new ServerSocket(9999);
        System.out.println("socket server started.");
        // 迴圈監聽
        do {
            System.out.println("start to listen, current thread is :" + Thread.currentThread().getName());
            // 接受連線
            Socket socket = serverSocket.accept();
            // 建立一個新的執行緒去處理接受的連線
            executorService.execute(() -> handler(socket));
        } while (true);
    }

    private static void handler(Socket socket) {
        System.out.println("current thread is :" + Thread.currentThread().getName());
        try {
            InputStream inputStream = socket.getInputStream();
            byte[] bytes = new byte[1024];
            int read;
            do {
                read = inputStream.read(bytes);
                System.out.println("receive the message is: " + new String(bytes));
            } while (read != -1);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("close the socket");
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

客戶端:

package com.yang.java.main.bio;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/9
 */
public class BioClient {

    public static void main(String[] args) throws IOException {
        // 繫結伺服器
        Socket client = new Socket("127.0.0.1", 9999);
        // 建立輸入
        Scanner scanner = new Scanner(System.in);
        System.out.println("please input the content: ");
        for (; ; ) {
            if (scanner.hasNext()) {
                String str = scanner.next();
                System.out.println(str);
                OutputStream outputStream = client.getOutputStream();
                outputStream.write(str.getBytes());
            }
        }
    }
}

起三個客戶端,進行檢視服務端輸出:

socket server started.
start to listen, current thread is :main
start to listen, current thread is :main
current thread is :pool-1-thread-1
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-2
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-3
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-4
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-5
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

從結果可以看出,每次進來一個就會啟動一個新執行緒。

因此可以確認BIO的弊端

  1. 當併發量過大的時候,服務端就會建立大量的執行緒來處理連結,造成資源的浪費,系統資源佔用較大。
  2. 當連線建立之後,如果當前執行緒沒有資料可讀,那麼當前執行緒就阻塞在read操作上,會造成執行緒資源浪費。

2.2 Java NIO 程式設計

  1. java NIO 也叫 java no-blocking IO 或者 new IO
  2. BIO基於位元組流和字元流進行操作,而NIO基於Channel和Buffer(緩衝區)進行操作,資料總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(選擇器)用於監聽多個通道的事件(比如:連線開啟,資料到達)。因此,單個執行緒可以監聽多個數據通道。
  3. NIO和傳統IO(一下簡稱IO)之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味著每次從流中讀一個或多個位元組,直至讀取所有位元組,它們沒有被快取在任何地方。此外,它不能前後移動流中的資料。如果需要前後移動從流中讀取的資料,需要先將它快取到一個緩衝區。NIO的緩衝導向方法略有不同。資料讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是否該緩衝區中包含所有您需要處理的資料。而且,需確保當更多的資料讀入緩衝區時,不要覆蓋緩衝區裡尚未處理的資料。
  4. IO的各種流是阻塞的。這意味著,當一個執行緒呼叫read() 或 write()時,該執行緒被阻塞,直到有一些資料被讀取,或資料完全寫入。該執行緒在此期間不能再幹任何事情了。 NIO的非阻塞模式,使一個執行緒從某通道傳送請求讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會獲取。而不是保持執行緒阻塞,所以直至資料變得可以讀取之前,該執行緒可以繼續做其他的事情。 非阻塞寫也是如此。一個執行緒請求寫入一些資料到某通道,但不需要等待它完全寫入,這個執行緒同時可以去做別的事情。 執行緒通常將非阻塞IO的空閒時間用於在其它通道上執行IO操作,所以一個單獨的執行緒現在可以管理多個輸入和輸出通道(channel)。

因此NIO 的三大核心部分就是buffer,channel, selector

2.2.1 Buffer

Buffer:緩衝區,本質上是一個可以繼續寧讀寫資料的記憶體塊,可以理解為一個容器物件(包含資料),這個物件提供了一組方法,可以輕鬆的使用記憶體塊,緩衝區物件內建了一些機制,可以跟蹤和記錄緩衝的狀態變化

public abstract class Buffer {
    ...
    // Invariants: mark <= position <= limit <= capacity 欄位大小
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
    ...
}

這幾個欄位就是Buffer實現管理記憶體塊的保證,也就是儲存這裡面數組的位置狀態,分析一下欄位:

索引

說明

capacity

緩衝區陣列的總長度

position

下一個要操作的資料元素的位置,不能超過limit

limit

緩衝區陣列中不可操作的下一個元素的位置:limit<=capacity

mark

用於記錄當前position的前一個位置或者預設是-1,這個欄位一般很少用到

Buffer的一些方法

package com.yang.java.main.nio.buffer;


import java.nio.ByteBuffer;
import java.util.Arrays;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/9
 */
public class buffer {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1,2,3,4,5,6});
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]
        // 移動position以及limit
        buffer.position(2);
        buffer.limit(4);
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=2 lim=4 cap=6]
        System.out.println("isReadOnly: " + buffer.isReadOnly()); // isReadOnly: false
        System.out.println("isDirect: " + buffer.isDirect()); // isDirect: false
        buffer.get();
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]
        System.out.println("slice: " + buffer.slice()); // slice: java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]
        ByteBuffer duplicate = buffer.duplicate(); // 原始碼 new DirectByteBuffer(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0);
        System.out.println("duplicate compareTo: " + duplicate.compareTo(buffer)); // duplicate: true
        System.out.println("duplicate: " + duplicate.equals(buffer)); // duplicate: true
        System.out.println("duplicate: " + duplicate + "  ByteBuffer " + buffer); // duplicate: java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]  ByteBuffer java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]
        System.out.println("hasArray: " + buffer.hasArray()); // hasArray: true
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]
        System.out.println("array: " + Arrays.toString(buffer.array())); // array: [1, 2, 3, 4, 5, 6]
        buffer.flip();
        System.out.println("flip ByteBuffer: " + buffer); // flip ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=3 cap=6]
        System.out.println("flip array: " + Arrays.toString(buffer.array())); // flip array: [1, 2, 3, 4, 5, 6]
        buffer.position(2);
        System.out.println("ByteBuffer: " + buffer); // ByteBuffer: java.nio.HeapByteBuffer[pos=2 lim=3 cap=6]
        buffer.compact();
        System.out.println("compact ByteBuffer: " + buffer); // compact ByteBuffer: java.nio.HeapByteBuffer[pos=1 lim=6 cap=6]
        System.out.println("compact array: " + Arrays.toString(buffer.array())); // compact array: [3, 2, 3, 4, 5, 6]
        buffer.rewind();
        System.out.println("rewind ByteBuffer: " + buffer); // rewind ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]
        buffer.clear();
        System.out.println("clear ByteBuffer: " + buffer); // clear ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]
        System.out.println("array: " + Arrays.toString(buffer.array())); // array: [3, 2, 3, 4, 5, 6]
    }
}

2.2.2 Channel

Channel一般稱為通道,java NIO 的通道類似於流,但又有一些不同

  • Channel既可以從通道中讀取資料,又可以讀資料到通道。但流的讀寫通常是單向的
  • Channel可以非同步的讀寫
  • Channel中的資料總是要先讀到一個Buffer中或者總是要從一個Buffer中寫入

Channel最重要的通道的實現:

  • FileChannel:從檔案中讀寫資料
  • DatagramChannel:能通過UDP讀寫網路中的資料
  • SocketChannel:能通過TCP讀寫網路中的資料
  • ServerSocketChannel:可以監聽新進來的TCP連線,像WEB伺服器那樣。對每一個新進來的連結都會建立一個SocketChannel。

2.2.2.1 FileChannel類

FileChannel主要就是針對本地檔案進行IO操作,常見的方法有:

package com.yang.java.main.nio.channel;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/10
 */
public class NIOFileChannel01 {

    public static void write() throws IOException {
        String str = "hell, mark";
        // 建立一個輸出流
        FileOutputStream fileOutputStream = new FileOutputStream("d:\\file01.txt");
        // 獲取通道
        FileChannel channel = fileOutputStream.getChannel();
        // 建立一個緩衝區,並將str放入緩衝區
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        System.out.println("buffer: " + buffer); // buffer: java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]
        // 如果使用allocate建立最好之後flip
        ByteBuffer allocate = ByteBuffer.allocate(512);
        allocate.put(str.getBytes());
        System.out.println("allocate: " + allocate); // allocate: java.nio.HeapByteBuffer[pos=10 lim=512 cap=512]
        allocate.flip();
        System.out.println("allocate flip: " + allocate);  // allocate flip: java.nio.HeapByteBuffer[pos=0 lim=10 cap=512]
        channel.write(buffer);
        fileOutputStream.close();
        // 進行flip的必要
        /**
         * static int write(FileDescriptor fd, ByteBuffer src, long position,
         *                      boolean directIO, int alignment, NativeDispatcher nd)
         *         throws IOException
         *     {
         *         if (src instanceof DirectBuffer) {
         *             return writeFromNativeBuffer(fd, src, position, directIO, alignment, nd);
         *         }
         *
         *         // Substitute a native buffer
         *         int pos = src.position();
         *         int lim = src.limit();
         *         assert (pos <= lim);
         *         int rem = (pos <= lim ? lim - pos : 0);  // 主要看這裡
         *         ByteBuffer bb;
         *         if (directIO) {
         *             Util.checkRemainingBufferSizeAligned(rem, alignment);
         *             bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
         *         } else {
         *             bb = Util.getTemporaryDirectBuffer(rem);
         *         }
         *         try {
         *             bb.put(src);
         *             bb.flip();  // 看這裡
         *             // Do not update src until we see how many bytes were written
         *             src.position(pos);
         *
         *             int n = writeFromNativeBuffer(fd, bb, position, directIO, alignment, nd);
         *             if (n > 0) {
         *                 // now update src
         *                 src.position(pos + n);
         *             }
         *             return n;
         *         } finally {
         *             Util.offerFirstTemporaryDirectBuffer(bb);
         *         }
         *     }
         */
    }

    public static void read() throws IOException {
        File file = new File("d:\\file01.txt");
        // 建立輸入流
        FileInputStream fileInputStream = new FileInputStream(file);
        // 獲取Channel
        FileChannel channel = fileInputStream.getChannel();
        // 建立緩衝區
        ByteBuffer buffer = ByteBuffer.allocate((int) file.length());
        // 將通道內資料讀入緩衝區,因為設定的緩衝區大小等於檔案大小,因此一次是可以讀完的,否則需要迴圈讀取,返回值是讀取的長度,如果等於-1,則說明無資訊可讀取
        int read = channel.read(buffer);
        System.out.println("read " + read);  // read 10
        System.out.println("read message: " + new String(buffer.array())); // read message: hell, mark
    }

    public static void transForm() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("d:\\package\\sogou_pinyin_98a.exe");
        FileOutputStream fileOutputStream = new FileOutputStream("d:\\sogou_pinyin_98a.exe");
        // 獲取各個流的channel
        FileChannel fileInputChannel = fileInputStream.getChannel();
        FileChannel fileOutputChannel = fileOutputStream.getChannel();
        System.out.println("fileInputChannel isOpen: " + fileInputChannel.isOpen()); // fileInputChannel isOpen: true
        System.out.println("fileOutputChannel isOpen: " + fileOutputChannel.isOpen()); // fileOutputChannel isOpen: true
        // 使用transform進行copy一個可讀的channel
        fileOutputChannel.transferFrom(fileInputChannel, 0, fileInputChannel.size());  // copy完成的檔案與原始檔一直
        // 關閉通道以及流
        fileInputChannel.close();
        fileOutputChannel.close();
        fileInputStream.close();
        fileOutputStream.close();
    }

    public static void position() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("d:\\file01.txt");
        FileChannel fileInputStreamChannel = fileInputStream.getChannel();
        System.out.println("fileInputStreamChannel size: " + fileInputStreamChannel.size());  // fileInputStreamChannel size: 10
        System.out.println("fileInputStreamChannel position: " + fileInputStreamChannel.position()); // fileInputStreamChannel position: 0
        fileInputStreamChannel.position(2);
        System.out.println("fileInputStreamChannel position: " + fileInputStreamChannel.position()); // fileInputStreamChannel position: 2
        ByteBuffer allocate = ByteBuffer.allocate((int) fileInputStreamChannel.size());
        // 從檔案的position開始讀取,一直讀到結尾
        fileInputStreamChannel.read(allocate);
        System.out.println("byteBuffer: " + allocate);  // byteBuffer: java.nio.HeapByteBuffer[pos=8 lim=10 cap=10]
        System.out.println("message: " + new String(allocate.array()));  // message: ll, mark
        fileInputStreamChannel.close();
        fileInputStream.close();
    }

    public static void transferTo() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("d:\\file01.txt"); // 原始檔內容:hello, mark
        FileOutputStream fileOutputStream = new FileOutputStream("d:\\file02.txt");
        // 獲取各個流的channel
        FileChannel fileInputChannel = fileInputStream.getChannel();
        FileChannel fileOutputChannel = fileOutputStream.getChannel();
        System.out.println("fileInputChannel isOpen: " + fileInputChannel.isOpen());  // fileInputChannel isOpen: true
        System.out.println("fileOutputChannel isOpen: " + fileOutputChannel.isOpen());  // fileOutputChannel isOpen: true
        // 傳送資料給一個可寫的channel,傳送位置為2(從0開始計數),傳送的位元組數量為3
        fileInputChannel.transferTo(2, 3, fileOutputChannel); // 傳送給的檔案內容:ll,
        // 關閉通道以及流
        fileInputChannel.close();
        fileOutputChannel.close();
        fileInputStream.close();
        fileOutputStream.close();
    }

    public static void main(String[] args) throws IOException {
        write();
        read();
        transForm();
        position();
        transferTo();
    }
}
MappedByteBuffer,可以讓檔案直接在記憶體中進行修改,作業系統不需要拷貝一次,因為涉及檔案所以放在這裡面,在這裡面使用RandomAcessFIle進行操作。、
RandomAcessFIle是一個可以對檔案隨機訪問,包括讀寫,該讀寫是基於指標的操作。可以把它理解為一個文字編輯器。
看構造器原始碼:
private RandomAccessFile(File file, String mode, boolean openAndDelete) throws FileNotFoundException {
        String name = (file != null ? file.getPath() : null);
        int imode = -1;
        if (mode.equals("r"))
            imode = O_RDONLY;
        else if (mode.startsWith("rw")) {
            imode = O_RDWR;
            rw = true;
            if (mode.length() > 2) {
                if (mode.equals("rws"))
                    imode |= O_SYNC;
                else if (mode.equals("rwd"))
                    imode |= O_DSYNC;
                else
                    imode = -1;
            }
        }
        ...      
}
該操作符定義了四種模式:
r 以只讀的方式開啟文字,也就意味著不能用write來操作檔案
rw 讀操作和寫操作都是允許的
rws 每當進行寫操作,同步的重新整理到磁碟,重新整理內容和元資料
rwd 每當進行寫操作,同步的重新整理到磁碟,重新整理內容

接下來看一下簡單的demo

public class MappedByteBufferDemo {
    public static void main(String[] args) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile("d:\\file01.txt", "rw");
        FileChannel channel = randomAccessFile.getChannel();
        /**
         * 引數1:制定該channel的模式:
         *      public static final MapMode READ_ONLY = new MapMode("READ_ONLY");   只讀模式
         *      public static final MapMode READ_WRITE =new MapMode("READ_WRITE");  讀寫模式
         *      public static final MapMode PRIVATE =new MapMode("PRIVATE");        寫時複製
         * 引數2:可以直接修改的起始位置
         * 引數3:是對映到記憶體的大小,即當前檔案有多少個位元組可以對映到記憶體中,,一共有size大小的位元組對映到記憶體
         */
        MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 12);
        map.put(0, (byte) 'A');
        map.put(1, (byte) 'B');
        randomAccessFile.close();  // 修改值之後檔案內容(有兩個空格):‘ABll, mark  ’
    }
}

2.2.2.2 DatagramChannel類

使用與ServerSocketChannel類似,一個是UDP,一個是TCP

2.2.2.3 SocketChannel 與ServerSocketChannel類

詳見Selector。

2.2.3 Selector

Java NIO的選擇器允許一個單獨的執行緒來監視多個輸入通道,你可以註冊多個通道使用一個選擇器,然後使用一個單獨的執行緒來“選擇”通道:這些通道里已經有可以處理的輸入,或者選擇已準備寫入的通道。這種選擇機制,使得一個單獨的執行緒很容易來管理多個通道。

僅用單個執行緒來處理多個Channels的好處是,只需要更少的執行緒來處理通道。事實上,可以只用一個執行緒處理所有的通道。對於作業系統來說,執行緒之間上下文切換的開銷很大,而且每個執行緒都要佔用系統的一些資源(如記憶體)。但是隨著現代計算機的增強,對於多CPU需要考慮實際情況進行選擇執行緒數。

Selector的特點:

  • Netty的IO執行緒NIOEventLoop聚合了Selector(選擇器,或者多路複用器),可以同時併發處理成百上千個客戶端的連結
  • 當執行緒從客戶端的Socket童丹進行讀寫資料時,如沒有資料可用溼,該執行緒可以機型其他任務
  • 執行緒通常將非阻塞IO的空閒時間用於在其他通道上執行IO操作,所以單獨的執行緒可以管理多個輸入與輸出通道
  • 由於讀寫操作都是非阻塞的,這就可以充分提升IO執行緒的執行效率,避免由於頻繁I/O阻塞導致的執行緒掛起
  • 一個I/O執行緒可以併發處理N個客戶端連結和讀寫操作,這從根本上解決了傳統同步阻塞I/O,已連線一執行緒模型,架構的效能、彈性伸縮能力以及可靠性都得到了提升

selector的一些方法:

public abstract class Selector implements Closeable {

    public static java.nio.channels.Selector open() throws IOException // 得到一個選擇物件

    public abstract boolean isOpen();

    public abstract SelectorProvider provider();

    /**
     * // The set of keys registered with this Selector
     *     private final Set<SelectionKey> keys;
     *
     *     // The set of keys with data ready for an operation
     *     private final Set<SelectionKey> selectedKeys;
     *
     *     // Public views of the key sets
     *     private final Set<SelectionKey> publicKeys;             // Immutable
     *     private final Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
     *
     *     // used to check for reentrancy
     *     private boolean inSelect;
     *
     *     protected SelectorImpl(SelectorProvider sp) {
     *         super(sp);
     *         keys = ConcurrentHashMap.newKeySet();
     *         selectedKeys = new HashSet<>();
     *         publicKeys = Collections.unmodifiableSet(keys);
     *         publicSelectedKeys = Util.ungrowableSet(selectedKeys);
     *     }
     * @return
     */
    public abstract Set<SelectionKey> keys();  // 從內部集合獲取所有的selectionKey,實際上就是:publicKeys,也就是:keys

    public abstract Set<SelectionKey> selectedKeys();  // 從內部集合獲取所有的selectionKey,實際上就是:publicSelectedKeys,也就是:selectedKeys

    public abstract int select() throws IOException; // 監控所有的註冊通道,當其中有IO操作可以進行,將對應的selectionKey加入到內部稽核並返回,引數用來設定超時時間。會進行阻塞

    public abstract int selectNow() throws IOException;  // 不阻塞,立馬返回

    public abstract java.nio.channels.Selector wakeup();  // 喚醒selector

    public abstract void close() throws IOException;    // 關閉
}

NIO,我們首先需要註冊當這幾個事件到來的時候所對應的處理器。然後在合適的時機告訴事件選擇器:我對這個事件感興趣。例如對於讀操作,就是完成連線和系統沒有辦法承載新讀入的資料的時;對於accept,一般是伺服器剛啟動的時候;而對於connect,一般是connect失敗需要重連或者直接非同步呼叫connect的時候。

其次,用一個死迴圈選擇就緒的事件,會執行系統呼叫(Linux 2.6之前是select、poll,2.6之後是epoll,Windows是IOCP),還會阻塞的等待新事件的到來。新事件到來的時候,會在selector上註冊標記位,標示可讀、可寫或者有連線到來。

注意,select是阻塞的,無論是通過作業系統的通知(epoll)還是不停的輪詢(select,poll),這個函式是阻塞的。所以在一個while(true)裡面呼叫這個函式而不用擔心CPU空轉。接下來看一張圖:

一個NIO的連線過程

  1. 當客戶端連線時,會通過ServerSocketChannel得到SocketChannel
  2. Selector進行監聽select方法,返回有事件發生的通道個數
  3. 將SocketChannel註冊到Selector上,及呼叫register(Selector sel, int ops),一個selector可以註冊多個SocketChannel
  4. 註冊後返回一個SelectionKey,回合該Selector關聯
  5. 進一步得到各個SelectionKey(有事件發生時)
  6. 可以通過得到的Channel,完成業務處理

一個簡單的 NIO demo

package com.yang.java.main.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/10
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        // 建立ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 繫結埠,進行服務監聽
        serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 9999));
        // 設定為非阻塞
        serverSocketChannel.configureBlocking(false);
        // 得到Selector物件
        Selector selector = Selector.open();
        // 將serverSocketChannel註冊到selector中,並設定關心事件為accept
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 迴圈等待客戶端的連線
        while (true){
            if(selector.select(2000) == 0){
                System.out.println("server wait for 2 seconds, but no connect");
                continue;
            }
            // 如果返回的大於0,就說明已經獲取到關注的事件,然後獲取關注事件的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 使用迭代器
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                //判斷髮生的事件
                if(selectionKey.isAcceptable()){
                    // 有客戶端連線進來了,生成一個Channel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("a new client is connect");
                    // 將socketChannel設定為非阻塞的
                    socketChannel.configureBlocking(false);
                    // 註冊事件,同時可以為該channel關聯一個ByteBuffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                if(selectionKey.isReadable()){
                    // 通過selectionKey反向獲取通道
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    // 獲取關聯的buffer
                    ByteBuffer attachment = (ByteBuffer) selectionKey.attachment();
                    channel.read(attachment);
                    System.out.println("receive message: " + new String(attachment.array()));
                    channel.close();
                }
                iterator.remove();
            }
        }
    }
}
package com.yang.java.main.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/10
 */
public class NIOClient {

    public static void main(String[] args) throws IOException {
        // 獲取一個socketChannel
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        // 連線伺服器
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        while (!socketChannel.finishConnect()){
            System.out.println("not finish connect, do other");
        }
        String str = "hello, service";
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(byteBuffer);
        socketChannel.close();
    }
}

2.3 Java AIO 程式設計

AIO 即 NIO2.0,叫做非同步不阻塞的 IO。AIO 引入非同步通道的概念,採用了 Proactor 模式,簡化了程式編寫,有效的請求才啟動執行緒,它的特點是先由作業系統完成後才通知服務端程式啟動執行緒去處理,一般適用於連線數較多且連線時間較長的應用

Java 非同步 IO 提供了兩種使用方式,分別是返回 Future 例項和使用回撥函式。但是AIO程式設計程式碼量實在是太多了,就是各種非同步,回撥,到現在也感覺不是很流行,netty出來了用的就更少了。

3 NIO與零拷貝

首先楊浦兩個概念:磁碟到核心空間屬於DMA(direct memory access)拷貝(DMA即直接記憶體存取,原理是外部裝置不通過CPU而直接與系統記憶體交換資料)。而核心空間到使用者空間則需要CPU的參與進行拷貝,既然需要CPU參與,也就涉及到了核心態和使用者態的相互切換。

傳統IO操作(作圖資料拷貝,右圖核心態與使用者態切換):

  1. 資料需要從磁碟拷貝到核心空間,再從核心空間拷到使用者空間(JVM)。
  2. 程式可能進行資料修改等操作
  3. 再將資料拷貝到核心空間,核心空間再拷貝到網絡卡記憶體,通過網路傳送出去(或拷貝到磁碟)。

目前NIO(linux 2.4版本之後)(作圖資料拷貝,右圖核心態與使用者態切換)

改進的地方:

  • 將上下文切換次數從4次減少到了2次;
  • 將資料拷貝次數從4次減少到了2次(DMA直接存取)。

NIO的零拷貝由transferTo()方法實現,在fileChannel中已經使用。transferTo()方法將資料從FileChannel物件傳送到可寫的位元組通道(如Socket Channel等)。在內部實現中,由native方法transferTo()來實現,它依賴底層作業系統的支援。在UNIX和Linux系統中,呼叫這個方法將會引起sendfile()系統呼叫。

注意:零拷貝是建立在不需要進行資料檔案操作的情況下使用的

零拷貝是指沒有系統CPU參與,但是這裡其實是有一次CPU拷貝的,也就是從核心態拷貝到傳送通道,但是拷貝的資訊比較少,比如length,offset,消耗很低,可以忽略

零拷貝的理解

  • 零拷貝是從作業系統角度來講,也就是說核心緩衝區之間,沒有資料是重複的只有kernel buffer有一份資料
  • 零拷貝不僅僅帶來更少的資料複製,還帶來效能優勢,例如更少的上下文切換,更少的CPU快取。偽共享以及無CPU校驗和計算

demo檢視區別,首先起一個服務端,不需要儲存檔案,主要看客戶端

public class Service {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 9999));
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int read;
            do {
                read = socketChannel.read(buffer);
                buffer.clear();
            } while (read != -1);
            System.out.println("finished read");
            socketChannel.close();
        }
    }
}

客戶端

package com.yang.java.main.nio.zeroCopy;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/11
 */
public class Client {

    public static void main(String[] args) throws IOException {
        zeroCopy();
        tradition();
    }

    public static void zeroCopy() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        FileInputStream inputStream = new FileInputStream("D:\\package\\netty-4.1.zip");
        FileChannel channel = inputStream.getChannel();
        System.out.println("file size: " + channel.size()); // file size: 5375946
        long startTime = System.currentTimeMillis();
        long l = channel.transferTo(0, channel.size(), socketChannel);
        System.out.println("send the total size: " + l + "用時:=" + (System.currentTimeMillis() - startTime)); // send the total size: 5375946用時�?=5
        channel.close();
        inputStream.close();
        socketChannel.close();
    }

    public static void tradition() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        FileInputStream inputStream = new FileInputStream("D:\\package\\netty-4.1.zip");
        FileChannel channel = inputStream.getChannel();
        System.out.println("file size: " + channel.size());  // file size: 5375946
        ByteBuffer buffer = ByteBuffer.allocate(4096);
        int read;
        long startTime = System.currentTimeMillis();
        do {
            read = channel.read(buffer);
            socketChannel.write(buffer);
            buffer.rewind();
        } while (read != -1);
        System.out.println("用時:=" + (System.currentTimeMillis() - startTime)); // 用時:=12
        channel.close();
        inputStream.close();
        socketChannel.close();
    }
}

從結果可以看出,零拷貝模式比傳統拷貝速度快很多。

但是大部分時候我們是需要對資料進行操作,但又想快速的,這個時候就可以使用NIO的直接記憶體,一個典型的例子就是Buffer中的例子MappedByteBuffer

直接記憶體則介於傳動拷貝以及零拷貝之間,效率一般且可操作檔案資料。直接記憶體(mmap技術)將檔案直接對映到核心空間的記憶體,返回一個操作地址(address),它解決了檔案資料需要拷貝到JVM才能進行操作的窘境。而是直接在核心空間直接進行操作,省去了核心空間拷貝到使用者空間這一步操作。

NIO的直接記憶體(mmap技術)是由MappedByteBuffer實現的。核心即是map()方法,該方法把檔案對映到記憶體中,獲得記憶體地址addr,然後通過這個addr構造MappedByteBuffer類,以暴露各種檔案操作API。mmap適合小資料量讀寫,他需要4次上下文切換,3次資料拷貝

4 Netty

來一張netty的官網照片,顯示其厲害之處。

  1. netty的設計比較優雅,適用於各種傳輸型別的同意API阻塞和非阻塞Socket;基於靈活且可擴充套件的事件模型,可以清晰地分離關注點,高度可定製的執行緒模型 - 單執行緒,一個或多個執行緒池。
  2. netty使用方便,對比NIO程式設計較多的API,netty基本做好了封裝,使用者基本只需要關注業務程式碼的實現,並且netty還有比較詳細的javadoc
  3. netty對比NIO新歌能更好,吞吐量更高,減少資源消耗,最小化不必要的記憶體複製
  4. 完全的SSL\TLS以及StartTLS支援

4.1 同步和非同步、阻塞與非阻塞概念

同步和非同步的概念描述的是使用者執行緒與核心的互動方式:同步是指使用者執行緒發起IO請求後需要等待或者輪詢核心IO操作完成後才能繼續執行;而非同步是指使用者執行緒發起IO請求後仍繼續執行,當核心IO操作完成後會通知使用者執行緒,或者呼叫使用者執行緒註冊的回撥函式。

阻塞和非阻塞的概念描述的是使用者執行緒呼叫核心IO操作的方式:阻塞是指IO操作需要徹底完成後才返回到使用者空間;而非阻塞是指IO操作被呼叫後立即返回給使用者一個狀態值,無需等到IO操作徹底完成。

4.2 Reactor

4.2.1 概述

Reactor對應的叫法:反應器模式、分發者模式、通知者模式

Reactor模式同夥一個或多個輸入同時傳遞v給伺服器的模式(基於事件驅動),服務端程式處理傳入的多個請求,並將它們同步分派到相應的處理執行緒。

Reactor核心組成

  1. Reactor在一個單獨的執行緒中執行,負責監聽和分發事件,分發給適當的處理程式來對IO睡覺可哦啊嗎作出反應(也就是分派到相應的處理執行緒)
  2. Handler,處理程式執行I/O事件要完成的實際事件。Reactor通過排程適當的處理程式來響應I/O事件,處理程式執行非阻塞操作。

4.2.2 模式分類

根據Reactor的數量以及處理資源執行緒池數量不同,有3種實現

  • 單Reactor 單執行緒
  • 單Reactor 多執行緒
  • 主從 Reactor 多執行緒

4.2.2.1 單Reactor單執行緒

流程:

  1. Reactor通過Select監控客戶端請求事件,收到事件後通過Dispatch進行分發
  2. 如果是建立連線請求事件,擇優Acceptor通過Accept處理連線請求,然後建立一個handler完成後續業務處理
  3. 如果不是連線事件,則會分發給連線對應的Handler進行處理業務
  4. Handler進行進行 read -> 業務處理 -> send 流程

優缺點:

優點:模型簡單,沒有多執行緒。程序通訊、競爭的問題,全部都愛一個執行緒完成

缺點:

  • 效能問題,只有一個執行緒,無法完全發揮多核CPU的效能。Handler在處理某個連線業務室,整個執行緒無法再次處理其他連線事件,較容易造成效能瓶頸
  • 可靠性問題,執行緒意外終止,或進入思緒換,會導致整合通訊模組不可用

使用場景:客戶端的數量有限,業務處理非常迅速,比如Redis在業務處理的事件複雜度O(1)情況

4.2.2.2 單Reactor多執行緒

流程:

  1. Reactor會通過select進行監聽客戶端的請求事件,收到事件之後會通過dispatch進行分發
  2. 如果是建立連線的請求,灰分發給Acceptor通過accept接受事件,並建立一個Handler物件處理完成連線之後的事件
  3. 不是連線請求,則分發給連線對應的handler處理
  4. handler只負責相應事件,不做具體業務處理,通過read讀取資料之後,分發給後面worker執行緒池處理業務
  5. worker執行緒池會分配執行緒完成真正的業務,並將結果返回給handler
  6. handler收到響應之後,通過send將結果返回給客戶端

優點:可以充分利用多核能力

缺點:多執行緒資料共享以及訪問比較複雜,Reactor處理所有的監聽以及響應事件,在單執行緒執行,高併發場景容易出現瓶頸

4.2.2.3 主從Reactor多執行緒

流程

  1. Reactor主執行緒MainReactor物件通過select監聽連線事件,收到事件之後灰分發給Acceptor處理連線事件
  2. Acceptor處理完連線事件之後,MainReactor將連結分配給SubReactor
  3. Subreactor姜麗娜姐加入到連結佇列進行監聽,並建立handler進行各種業務處理
  4. 當有新事件發生時,SubReactor會呼叫對應的handler進行處理
  5. handler通過read讀取資料,分發給後面的worker執行緒池處理
  6. worker執行緒池分配獨立的worker執行緒進行業務處理,並返回結果給handler
  7. handler接收響應,在通過send將結果返回給clieng

注意:Reactor主執行緒可以有多個Reactor子執行緒,即一個MainReactor可以關聯多個SunReactor

優點:

  • 父執行緒與子啊執行緒資料互動簡單職責明確,父執行緒只需要接受新連線,子執行緒完成業務處理
  • 父執行緒與子執行緒的資料互動簡單,Reactor主執行緒只需要包新連線傳給子執行緒,子執行緒無需返回資料

缺點:編碼複雜性高

4.2.3 Reactor工作原理

EventHandler抽象類表示IO事件處理器,它擁有IO檔案控制代碼Handle(通過get_handle獲取),以及對Handle的操作handle_event(讀/寫等)。繼承於EventHandler的子類可以對事件處理器的行為進行定製。Reactor類用於管理EventHandler(註冊、刪除等),並使用handle_events實現事件迴圈,不斷呼叫同步事件多路分離器(一般是核心)的多路分離函式select,只要某個檔案控制代碼被啟用(可讀/寫等),select就返回(阻塞),handle_events就會呼叫與檔案控制代碼關聯的事件處理器的handle_event進行相關操作。

通過Reactor的方式,可以將使用者執行緒輪詢IO操作狀態的工作統一交給handle_events事件迴圈進行處理。使用者執行緒註冊事件處理器之後可以繼續執行做其他的工作(非同步),而Reactor執行緒負責呼叫核心的select函式檢查socket狀態。當有socket被啟用時,則通知相應的使用者執行緒(或執行使用者執行緒的回撥函式),執行handle_event進行資料讀取、處理的工作。由於select函式是阻塞的,因此多路IO複用模型也被稱為非同步阻塞IO模型。注意,這裡的所說的阻塞是指select函式執行時執行緒被阻塞,而不是指socket。一般在使用IO多路複用模型時,socket都是設定為NONBLOCK的,不過這並不會產生影響,因為使用者發起IO請求時,資料已經到達了,使用者執行緒一定不會被阻塞。

事件迴圈不斷地呼叫select獲取被啟用的socket,然後根據獲取socket對應的EventHandler,執行器handle_event函式即可。

IO多路複用是最常使用的IO模型,但是其非同步程度還不夠“徹底”,因為它使用了會阻塞執行緒的select系統呼叫。因此IO多路複用只能稱為非同步阻塞IO,而非真正的非同步IO。

4.3 Netty模型

Netty就是基於上述的Reactor主從多執行緒模型實現的

介紹:

  • Netty抽象出兩個執行緒池,BossGroup 專門負責處理接受客戶端的連結,WorkerGroup專門負責網路的讀寫
  • BossGroup和WorkerGroup型別都是NioEventLoopGroup
  • NioEventLoopGroup相當於一個事件迴圈組,這個組中含有多個事件迴圈,每一個事件迴圈是NioEventLoop
  • NioEventLoop表示一個不斷迴圈的處理執任務的執行緒,每一個NioEventLoop都有一個selector,用於監聽繫結在其上的socket通訊
  • NioEventLoopGroup可以有多個執行緒,既可以含有多個NioEventLoop
  • 每個BossGroup中NioEventLoop迴圈執行的步驟:
    • 輪詢accept事件
    • 處理accept事件,與client建立連線之後,生成NioSocketChannel,並將其註冊到WorkerGroup的NioEventLoop上面的selector上面
    • 處理任務佇列中的任務,runAllTasks
  • 每個WorkerGroup的NioEventLoop迴圈執行的步驟:
    • 輪詢read,write事件
    • 處理I/O事件,即read,write事件,在對應NioSocketChannel處理
    • 處理任務佇列的任務,runAllTasks
  • 每個WorkGroup中NioEventLoop處理業務時,會使用pipeline(管道),pipeline中包含了channel,即通過pipeline可以獲取對印度個童丹,管道中維護了很多的handler

一個Demo:

服務端:

package com.yang.java.main.netty.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/11
 */
public class NettyServer {
    public static void main(String[] args) {
        // 建立兩個執行緒組,預設的子執行緒NioEventLoop個數為cpu核數*2,也可以自己自定數量
        // DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        /* 最終會呼叫這個方法
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }         */
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        // 建立伺服器端的啟動物件
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workGroup)  // 設定兩個執行緒組
                .channel(NioServerSocketChannel.class) // 設定伺服器通道實現的類
                .option(ChannelOption.SO_BACKLOG, 128) // 設定執行緒佇列得到執行緒連線的個數
                .handler(new LoggingHandler(LogLevel.DEBUG)) // handler是設定bossGroup的,這是一個日誌級別handler
                .childHandler(new ChannelInitializer<SocketChannel>() {  // childHandler是設定workerGroup的
                    // 微通道為通道設定處理器
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //addLast也就是在通道中標所有的handler字後面加(除了tail handler,這個是netty跪地必須要通道尾部)
                        ch.pipeline().addLast(new NettyServerHandler());  // 這裡面新增的類必須實現ChannelHandler
                    }
                });
        System.out.println("the server is ready ");
        // 繫結埠生成future物件
        ChannelFuture channelFuture = serverBootstrap.bind(9999);
        // 對關閉通道進行監聽
        try {
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 最終關閉兩個執行緒組
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

    }
}

class NettyServerHandler extends ChannelInboundHandlerAdapter {

    // 有連線上來
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive current thread is: " + Thread.currentThread().getName());
    }

    /**
     * 讀取資料
     *
     * @param ctx 上下文含有pipeline,channel等資訊
     * @param msg 客戶端傳送的資料
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("channelRead current thread is: " + Thread.currentThread().getName());
        ByteBuf buffer = (ByteBuf) msg;
        System.out.println("receive message is: " + buffer.toString(CharsetUtil.UTF_8));
        System.out.println("client address is: " + ctx.channel().remoteAddress());
    }

    // 資料讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete current thread is: " + Thread.currentThread().getName());
        ctx.writeAndFlush(Unpooled.copiedBuffer("server received the message", CharsetUtil.UTF_8));
    }

    // 異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 關閉
        ctx.close();
    }
}

客戶端:

package com.yang.java.main.netty.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/11
 */
public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        // 建立客戶端啟動物件
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)  // 設定客戶端通道的實現類
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 加入處理器
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });
        System.out.println("client ready");
        ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9999));
        try {
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

class NettyClientHandler extends ChannelInboundHandlerAdapter {
    // 通道就緒會觸發該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("client channel is ready, start send message");
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("channelReadComplete");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buffer = (ByteBuf) msg;
        System.out.println("receive from server: " + buffer.toString(CharsetUtil.UTF_8));
        System.out.println("server address is: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

服務端輸出結果:

the server is ready 
channelActive current thread is: nioEventLoopGroup-3-1
channelRead current thread is: nioEventLoopGroup-3-1
receive message is: hello, server
client address is: /127.0.0.1:59018
channelReadComplete current thread is: nioEventLoopGroup-3-1

使用的簡單例項,任務佇列典型使用場景

  • 使用者自定義的普通任務
  • 使用者自定義的定時任務
class NewNettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 1 使用者自定義的普通任務
        ctx.channel().eventLoop().execute(() -> lateSend(ctx));

        // 2 使用者自定義的定時場景
        ctx.channel().eventLoop().schedule(() ->lateSend(ctx), 5, TimeUnit.SECONDS);
    }

    private void lateSend(ChannelHandlerContext ctx) {
        try {
            System.out.println("current thread: " + Thread.currentThread().getName());
            Thread.sleep(5 * 1000);
            ctx.writeAndFlush(Unpooled.copiedBuffer("wait 5s", CharsetUtil.UTF_8));
            System.out.println("current channel" + ctx.channel().hashCode());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
結果
execute start time: 2020-09-11T16:38:26.929054500
schedule start time: 2020-09-11T16:38:26.932026300
execute lateSend time: 2020-09-11T16:38:26.933025600
execute current thread: nioEventLoopGroup-3-1
execute current channel-1557798060
schedule lateSend time: 2020-09-11T16:38:31.972724600
schedule current thread: nioEventLoopGroup-3-1
schedule current channel-1557798060

4.4 Netty的核心模組元件

4.4.1 Bootstrap、ServerBootstrap

這兩個類主要作用就是配置Netty程式,串聯各個元件,Netty中Bootstrap類是客戶端程式的啟動引導類,ServerBootstrap是服務端的啟動引導類

4.4.2 Future、ChannelFuture

Netty 中所有的 IO 操作都是非同步的,不能立刻得知訊息是否被正確處理。但是可以過一會等它執行完成或者直接註冊一個監聽,具體的實現就是通過 Future 和 ChannelFutures,他們可以註冊一個監聽,當操作執行成功或失敗時監聽會自動觸發註冊的監聽事件常見的方法有:
  • Channel channel(),返回當前正在進行 IO 操作的通道
  • ChannelFuture sync(),等待非同步操作執行完畢
  • ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); 新增監聽器
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");
        synchronized (this) {
            addListener0(listener);
        }
        // 收到結果,進行通知
        if (isDone()) {
            notifyListeners();
        }
        return this;
}

4.4.3 Channel

  1. Netty 網路通訊的元件,能夠用於執行網路 I/O 操作。
  2. 通過 Channel 可獲得當前網路連線的通道的狀態
  3. 通過 Channel 可獲得 網路連線的配置引數 (例如接收緩衝區大小)
  4. Channel 提供非同步的網路 I/O 操作(如建立連線,讀寫,繫結埠),非同步呼叫意味著任何 I/O 呼叫都將立即返回,並且不保證在呼叫結束時所請求的 I/O 操作已完成
  5. 呼叫立即返回一個 ChannelFuture 例項,通過註冊監聽器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時回撥通知呼叫方
  6. 支援關聯 I/O 操作與對應的處理程式
  7. 不同協議、不同的阻塞型別的連線都有不同的 Channel 型別與之對應

4.4.4 Selector

Netty基於Selcctor物件實現I/O多路複用,通過Selector一個執行緒可以監聽多個連線的Channel事件

當想一個Selector中註冊Channel後,Selector內部的機制就可以自動不斷的查詢(select)這些註冊的Channel是否有已就緒的I/O事件(可讀、可寫、網路連線等),這樣程式就可以使用一個執行緒的管理多個Channel。

4.4.5 ChannelHandler

ChannelHandler是一個介面,處理I/O事件或者攔截I/O操作,並將其轉發到對應的ChannelPipeline中的下一個處理程式

ChannelHandler本身並沒有提供很多方法。但是通常情況下我們必須實現他的子類:

  • ChannelInboundHandler處理進棧的I/O事件
型別描述
channelRegistered Invoked when a Channel is registered to its EventLoop and is able to handle I/O.
channelUnregistered Invoked when a Channel is deregistered from its EventLoop and cannot handle any I/O.
channelActive Invoked when a Channel is active; the Channel is connected/bound and ready.
channelInactive Invoked when a Channel leaves active state and is no longer connected to its remote peer.
channelReadComplete Invoked when a read operation on the Channel has completed.
channelRead Invoked if data are read from the Channel.
channelWritabilityChanged Invoked when the writability state of the Channel changes. The user can ensure writes are not done too fast (with risk of an OutOfMemoryError) or can resume writes when the Channel becomes writable again.Channel.isWritable() can be used to detect the actual writability of the channel. The threshold for writability can be set via Channel.config().setWriteHighWaterMark() and Channel.config().setWriteLowWaterMark().
userEventTriggered(...) Invoked when a user calls Channel.fireUserEventTriggered(...) to pass a pojo through the ChannelPipeline. This can be used to pass user specific events through the ChannelPipeline and so allow handling those events.
  • ChannelOutboundHandler處理出棧的I/O事件
型別描述
bind Invoked on request to bind the Channel to a local address
connect Invoked on request to connect the Channel to the remote peer
disconnect Invoked on request to disconnect the Channel from the remote peer
close Invoked on request to close the Channel
deregister Invoked on request to deregister the Channel from its EventLoop
read Invoked on request to read more data from the Channel
flush Invoked on request to flush queued data to the remote peer through the Channel
write Invoked on request to write data through the Channel to the remote peer

幾乎所有的方法都將 ChannelPromise 作為引數,一旦請求結束要通過 ChannelPipeline 轉發的時候,必須通知此引數。

ChannelPromise vs. ChannelFuture

ChannelPromise 是 特殊的 ChannelFuture,允許你的 ChannelPromise 及其 操作 成功或失敗。所以任何時候呼叫例如 Channel.write(...) 一個新的 ChannelPromise將會建立並且通過 ChannelPipeline傳遞。這次寫操作本身將會返回 ChannelFuture, 這樣只允許你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作為返回的 ChannelFuture 的通知,事實上在大多數時候就是 ChannelPromise 自身(ChannelPromise 擴充套件了 ChannelFuture)

ChannelDuplexHandler可以處理進站以及出站事件,但是通常不推薦使用

4.4.6 ChannelPipeline

ChannelPipeline是一個Handler的集合,他負責處理和攔截inbound或者outbound的事件和操作,相當於一個貫穿Netty的鏈(ChannelPipe是儲存ChannelHandler的連結串列,由於處理和攔截Channel的入棧和出棧事件操作)

ChannelPipeline實現了一種高階形式的攔截過濾器模式,使使用者可以完全控制事件的處理方式,以及Channel中各個ChannelHandler的互動

在Netty中每個Channel都有且僅有一個ChannelPipeline與之對應

一個Channel包含了一個ChannelPipeline,二ChannelPipeline 中維護了一個有ChannelHandlerContext組成的雙向連結串列,並且每一個ChannelHandlercontext中又關聯著一個ChannelHandler

入站事件和出站事件在同一個雙向連結串列中,入站事件會從連結串列head(預設的一個ChannelHandler,永遠在第一位)依次向後傳遞一直到最後一個handler,

出站事件會從連結串列tail(預設的handler,永遠在結尾,建立pipeline建立,屬於defaultChannelHandler)往前傳遞會傳遞到最前一個出站的handler,兩種型別的handler互不干擾,下圖描述了入站以及出站的路程

原始碼中有標識入站以及出站事件 ,socket read一般就是入站,socket write一般就是出站

4.4.7 ChannelHandlerContext

儲存Channel相關所有的上下文資訊,同時關聯一個ChannelHandler物件

channelHandlerContext中包含一個具體的事件處理器ChannelHandler,同時ChannelHandlerContext中也綁定了對應的pipeline和Channel的資訊,方便對ChannelHandler進行呼叫

重要的是要注意,雖然在 Channel 或者 ChannelPipeline 上呼叫write() 都會把事件在整個管道傳播,但是在 ChannelHandler 級別上,從一個處理程式轉到下一個卻要通過在 ChannelHandlerContext 呼叫方法實現。

  1. 事件傳遞給 ChannelPipeline 的第一個 ChannelHandler
  2. ChannelHandler 通過關聯的 ChannelHandlerContext 傳遞事件給 ChannelPipeline 中的 下一個
  3. ChannelHandler 通過關聯的 ChannelHandlerContext 傳遞事件給 ChannelPipeline 中的 下一個

4.4.8 EventLoopGroup

EventLoopGroup是一組EventLoop的抽象,Netty為了更好的利用多核CPU資源,一般會有多個EventLoop同時工作,沒喲呀EventLoop維護著一個Selector例項

EventLoopGroup提供next介面,可以從組裡面按照一定的規則獲取其中一個EventLoop來處理任務。

EventLoop就是一個事件迴圈,它執行在一個迴圈中,知道它停止。網路框架需要在一個迴圈中為一個特定的連線執行時間

    for (;;) {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }

            if (confirmShutdown()) {
                break;
            }
        }

EventLoop是從EventExecutor和ScheduledExecutorService擴充套件而來,所以可以將任務直接交給EventLoop執行,另外關於事件和任務的執行順序是FIFO

如果任務與EventLoop的Thread是相同的,那麼程式碼塊就會被執行,如果執行緒不同,會安排一個任務並在一個內部佇列後執行,先塗就是一個EventLoop中排程任務的執行邏輯

  • 應在EventLoop中執行的任務
  • 任務傳遞到執行方法時候,自行檢查呼叫執行緒是否與分配給EventLoop是一樣的
  • 執行緒一樣,則可以直接執行任務
  • 執行緒不一樣,則會家務任務佇列,當EventLoop事件執行時,佇列中的任務就會執行

對於長時間的任務或阻塞的I/O請使用EventExector,之前演示了這個機制,現在過一下步驟

在指定的延時時間後排程任務

  • 任務被插入到EventLoop的Schedul-Tasks-Queue(排程任務佇列)
  • 如果任務需要馬上執行,EventLoop檢查並執行
  • 如果有一個任務要執行,EventLoop將立即執行,並從佇列中刪除
  • EventLoop等待下一次執行,然乎從上一步開始重複

對於EventLoop,EventLoopGroup以及Channel的關係

  • 所有的EventLoop由EventLoopGroup分配
  • EventLoop處理所有分配給他的管道的事件和任務。每個EventLoop繫結到一個Thread
  • 管道繫結到EventLoop,所以所有的操作總是倍同一個執行緒在Channel的生命週期執行,一個管道只屬於一個連線

4.4.9 Unpooled

首先了解一個Netty提供的資料緩衝容器ByteBuf

ByteBuf中有兩個索引,一個用來寫,一個用來讀,這個是與ByteBuffer最大的不同,寫入資料到ByteBuf後,writeindex會增加寫入的位元組數。讀取位元組後,readerIndex會增加讀出去的位元組數。可以讀取位元組知道與寫入索引相同,再讀丟擲異常

Unpooled類就是一個專門用來操作緩衝區的工具類

5 編解碼器以及handler的呼叫機制

5.1 解碼器

解碼器就是負責將入站資料從一種格式轉化到另外一種格式,目前主要分為兩類:

  • 解碼位元組到訊息(ByteToMessageDevoder和ReplayingDecoder)
  • 解碼訊息到訊息(MessageToMessageDecoder)

Netty的解碼器是一種ChannelInboundHandler的抽象實現。就是將入站資料轉化格式之後傳遞到ChannelPipeline中的下一個ChannelInboundHandler進行處理。

主要看一下ByteToMessageDecoder這個類,將位元組碼轉化為訊息,下面是兩個比較重要的方法

  /**
     * 從位元組碼解析到另外一種資料格式,這個方法會在沒有任何資料輸入的時候呼叫
     *
     * @param ctx           上下文
     * @param in            入站資訊
     * @param out           傳遞給下一個ChannelInboundHandler資料
     * @throws Exception    異常
     */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

  /**
     * 這個方法將會在channelInActive之後呼叫,也是這個推出的時候,這個方法其實也是在呼叫decode方法*/
    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            // Only call decode() if there is something left in the buffer to decode.
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }

demo(我們解碼器的demo都在客戶端傳送1,2,3數字)

class ToIntegerDecoder extends ByteToMessageDecoder{

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes() >= 4){
            out.add(in.readInt());
        }
    }
}
receive message is: 1
receive message is: 2
receive message is: 3

從輸出結果也可以看出,每次從入站的ByteBuf讀取四個位元組,解碼成整形,所包含的內容會被髮送到下一個ChannelInboundHandler

ByteToMessageDecoder進行decode時,每次都需要我們進行校驗ByteBuf中是否還有足夠的位元組進行轉碼,Netty還提供ReplayingDecoder的抽象基類,繼承自ByteMessageByte,使用這個類就無需自己檢查,如果ByteBuf中有足夠的位元組會正常讀取,如果沒有,則會停止解碼,但是同時也會帶來一些侷限性:

  • 並非所有的標準的ByteBuf操作都支援
  • 速會慢一些

當然還有一些其他解碼器,不過大同小異。

Netty是一部框架,需要在記憶體中緩衝位元組,知道能夠急嘛,因此,我們不能讓解碼器快取太多資料以免耗盡可用記憶體。為了避免這個問題,可以在解碼器中設定最大位元組閾值,後續可以進行處理。

class SafeByteToMessageDecoder extends ByteToMessageDecoder{

    private static final int MAX_FRAME_SIZE = 2;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readableBytes = in.readableBytes();
        if(readableBytes > MAX_FRAME_SIZE){
            in.skipBytes(readableBytes);
            // 可以選擇丟擲異常,會被下一個handler的異常捕捉接收
//            throw new TooLongFrameException("frame to big");
            out.add(10);
        }
        if(in.readableBytes() >= 4){
            out.add(in.readInt());
        }
    }
}
receive message is: 10

從結果可以看出,因為接受到的自己超出我們設定的閾值,因此會被放棄,也就是不在讀取,可以看下屬程式碼,將ByteBuf的讀取下標向後移動放棄的數量

  @Override
    public ByteBuf skipBytes(int length) {
        checkReadableBytes(length);
        readerIndex += length;
        return this;
    }

5.2 編碼器

有解碼器當然就有編碼器,就是用來將出戰資料從一種格式轉化為另外一種格式,因為是出戰,因此他繼承了ChannelOutboundHandler,這個也有兩種型別

  • 編碼從訊息到位元組(MessageToByteEncoder)
  • 編碼從訊息到 訊息(MessageToMessageEncoder)

demo(從服務端傳遞1,2,3)

class IntegerToByteEncoder extends MessageToByteEncoder<Integer>{

    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
        out.writeInt(msg);
    }
}

客戶端接受

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buffer = (ByteBuf) msg;
        byte[] bytes = new byte[buffer.readableBytes()];
        buffer.readBytes(bytes);
        System.out.println("receive from server: " + Arrays.toString(bytes));  // receive from server: [0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3] 客戶端也可以定義解碼器,到時候就跟上述解碼器輸出結果一樣
        System.out.println("server address is: " + ctx.channel().remoteAddress());
    }

MessageToMessage類似

5.3 Codec

當然還有codec,就是結合編碼器與解碼器,不再贅述

5.4 呼叫機制

ChannelHandler充當了處理入站和出站的資料的應用程式邏輯的容器(只需要自定義的類實現ChannelInboundHandler或者ChannelOutboundHandler),從而可以處理入站事件和資料,如果需要傳送響應,直接呼叫ChannelContext的write進行寫入,flush進行重新整理
這裡面入站和出戰的區分:入站主要是從socket的管道向內做就是入站,反之就是出站

另外在呼叫過程中,發現不論編碼器還是解碼器,接收訊息的型別必須和待處理的訊息型別一直,否則這個handler就不會執行。

原始碼解析見下一篇