1. 程式人生 > >NIO教程 ——檢視閱讀

NIO教程 ——檢視閱讀

NIO教程 ——檢視閱讀

參考

BIO,NIO,AIO 總結

Java NIO淺析

Java NIO 教程——極客,藍本

Java NIO 系列教程 ——併發程式設計網

BIO,NIO——知乎

NIO 入門——IBM

Java NIO教程 ——易百

Java NIO Tutorial英文版

首先Java中的IO有以下三種:
BIO(Blocking IO)  同步式阻塞IO
NIO(Non-BlockingIO/New IO)  同步式非阻塞IO   JDK1.4提供 
AIO(AsynchronousIO)  非同步式非阻塞IO  JDK1.8提供

略讀:


ibm

NIO 的建立目的是為了讓 Java 程式設計師可以實現高速 I/O 而無需編寫自定義的本機程式碼。NIO 將最耗時的 I/O 操作(即填充和提取緩衝區)轉移回作業系統,因而可以極大地提高速度。

原來的 I/O 庫(在 java.io.*中) 與 NIO 最重要的區別是資料打包和傳輸的方式。正如前面提到的,原來的 I/O 以流的方式處理資料,而 NIO 以塊的方式處理資料。

通道和 緩衝區是 NIO 中的核心物件,幾乎在每一個 I/O 操作中都要使用它們。

通道是對原 I/O 包中的流的模擬。到任何目的地(或來自任何地方)的所有資料都必須通過一個 Channel 物件。一個 Buffer 實質上是一個容器物件。傳送給一個通道的所有物件都必須首先放到緩衝區中;同樣地,從通道中讀取的任何資料都要讀到緩衝區中。

Buffer 是一個物件, 它包含一些要寫入或者剛讀出的資料。 在 NIO 中加入 Buffer 物件,體現了新庫與原 I/O 的一個重要區別。在面向流的 I/O 中,您將資料直接寫入或者將資料直接讀到 Stream 物件中 。

緩衝區實質上是一個數組。通常它是一個位元組陣列,但是也可以使用其他種類的陣列。但是一個緩衝區不 僅僅 是一個數組。緩衝區提供了對資料的結構化訪問,而且還可以跟蹤系統的讀/寫程序。

Channel是一個物件,可以通過它讀取和寫入資料。拿 NIO 與原來的 I/O 做個比較,通道就像是流。

通道與流的不同之處在於通道是雙向的。而流只是在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類), 而 通道可以用於讀、寫或者同時用於讀寫。

因為它們是雙向的,所以通道可以比流更好地反映底層作業系統的真實情況。特別是在 UNIX 模型中,底層作業系統通道是雙向的。

在 NIO 系統中,任何時候執行一個讀操作,您都是從通道中讀取,但是您不是 直接 從通道讀取。因為所有資料最終都駐留在緩衝區中,所以您是從通道讀到緩衝區中。

因此讀取檔案涉及三個步驟:(1) 從 FileInputStream 獲取 Channel,(2) 建立 Buffer,(3) 將資料從 Channel 讀到 Buffer中。

clear() 方法重設緩衝區,使它可以接受讀入的資料。 flip() 方法讓緩衝區可以將新讀入的資料寫入另一個通道。

flip

現在我們要將資料寫到輸出通道中。在這之前,我們必須呼叫 flip() 方法。這個方法做兩件非常重要的事:

  1. 它將 limit 設定為當前 position。
  2. 它將 position 設定為 0。

clear

最後一步是呼叫緩衝區的 clear() 方法。這個方法重設緩衝區以便接收更多的位元組。 Clear 做兩種非常重要的事情:

  1. 它將 limit 設定為與 capacity 相同。
  2. 它設定 position 為 0。

read() 和 write() 呼叫得到了極大的簡化,因為許多工作細節都由緩衝區完成了。 clear() 和 flip() 方法用於讓緩衝區在讀和寫之間切換。

建立不同型別的緩衝區以達到不同的目的,如可保護資料不被修改的 只讀 緩衝區,和直接對映到底層作業系統緩衝區的 直接 緩衝區。

使用靜態方法 allocate() 來分配緩衝區:

ByteBuffer buffer = ByteBuffer.allocate( 1024 );

將一個現有的陣列轉換為緩衝區,如下所示:

byte array[] = new byte[1024];``ByteBuffer buffer = ByteBuffer.wrap( array );

本例使用了 wrap() 方法將一個數組包裝為緩衝區。必須非常小心地進行這類操作。一旦完成包裝,底層資料就可以通過緩衝區或者直接訪問。

建立一個包含槽 3 到槽 6 的子緩衝區。在某種意義上,子緩衝區就像原來的緩衝區中的一個視窗 。

視窗的起始和結束位置通過設定 position 和 limit 值來指定,然後呼叫 Buffer 的 slice() 方法:

buffer.position( 3 );buffer.limit( 7 );ByteBuffer slice = buffer.slice();

片是緩衝區的 子緩衝區。不過, 片段和 緩衝區共享同一個底層資料陣列

只讀緩衝區非常簡單 ― 您可以讀取它們,但是不能向它們寫入。可以通過呼叫緩衝區的 asReadOnlyBuffer() 方法,將任何常規緩衝區轉換為只讀緩衝區,這個方法返回一個與原緩衝區完全相同的緩衝區(並與其共享資料),只不過它是隻讀的。

只讀緩衝區對於保護資料很有用。在將緩衝區傳遞給某個物件的方法時,您無法知道這個方法是否會修改緩衝區中的資料。建立一個只讀的緩衝區可以 保證 該緩衝區不會被修改。

不能將只讀的緩衝區轉換為可寫的緩衝區。

//直接緩衝區
ByteBuffer buffer = ByteBuffer.allocateDirect( 1024 );

分散/聚集 I/O

通道可以有選擇地實現兩個新的介面: ScatteringByteChannel 和 GatheringByteChannel。一個 ScatteringByteChannel是一個具有兩個附加讀方法的通道:

  • long read( ByteBuffer[] dsts );
  • long read( ByteBuffer[] dsts, int offset, int length );

這些 long read() 方法很像標準的 read 方法,只不過它們不是取單個緩衝區而是取一個緩衝區陣列。緩衝區陣列就像一個大緩衝區。

以socket.read()為例子:

傳統的BIO裡面socket.read(),如果TCP RecvBuffer裡沒有資料,函式會一直阻塞,直到收到資料,返回讀到的資料。


meituan

對於NIO,如果TCP RecvBuffer有資料,就把資料從網絡卡讀到記憶體,並且返回給使用者;反之則直接返回0,永遠不會阻塞。

最新的AIO(Async I/O)裡面會更進一步:不但等待就緒是非阻塞的,就連資料從網絡卡到記憶體的過程也是非同步的。

換句話說,BIO裡使用者最關心“我要讀”,NIO裡使用者最關心”我可以讀了”,在AIO模型裡使用者更需要關注的是“讀完了”。

NIO一個重要的特點是:socket主要的讀、寫、註冊和接收函式,在等待就緒階段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但效能非常高)。

回憶BIO模型,之所以需要多執行緒,是因為在進行I/O操作的時候,一是沒有辦法知道到底能不能寫、能不能讀,只能”傻等”,即使通過各種估算,算出來作業系統沒有能力進行讀寫,也沒法在socket.read()和socket.write()函式中返回,這兩個函式無法進行有效的中斷。所以除了多開執行緒另起爐灶,沒有好的辦法利用CPU。

NIO的讀寫函式可以立刻返回,這就給了我們不開執行緒利用CPU的最好機會:如果一個連線不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來,記錄的方式通常是在Selector上註冊標記位,然後切換到其它就緒的連線(channel)繼續進行讀寫。

NIO的主要事件有幾個:讀就緒、寫就緒、有新連線到來。

仔細分析一下我們需要的執行緒,其實主要包括以下幾種: 1. 事件分發器,單執行緒選擇就緒的事件。 2. I/O處理器,包括connect、read、write等,這種純CPU操作,一般開啟CPU核心個執行緒就可以。 3. 業務執行緒,在處理完I/O後,業務一般還會有自己的業務邏輯,有的還會有其他的阻塞I/O,如DB操作,RPC等。只要有阻塞,就需要單獨的執行緒。

NIO給我們帶來了些什麼:

  • 事件驅動模型
  • 避免多執行緒
  • 單執行緒處理多工
  • 非阻塞I/O,I/O讀寫不再阻塞,而是返回0
  • 基於block的傳輸,通常比基於流的傳輸更高效
  • 更高階的IO函式,zero-copy
  • IO多路複用大大提高了Java網路應用的可伸縮性和實用性

BIO,NIO,AIO 總結

如何區分 “同步/非同步 ”和 “阻塞/非阻塞” 呢?

同步/非同步是從行為角度描述事物的,而阻塞和非阻塞描述的當前事物的狀態(等待呼叫結果時的狀態)。

阻塞模式使用就像傳統中的支援一樣,比較簡單,但是效能和可靠性都不好;非阻塞模式正好與之相反。對於低負載、低併發的應用程式,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網路)應用,應使用 NIO 的非阻塞模式來開發。

Buffer是一個物件,它包含一些要寫入或者要讀出的資料。在NIO類庫中加入Buffer物件,體現了新庫與原I/O的一個重要區別。在面向流的I/O中·可以將資料直接寫入或者將資料直接讀到 Stream 物件中。雖然 Stream 中也有 Buffer 開頭的擴充套件類,但只是流的包裝類,還是從流讀到緩衝區,而 NIO 卻是直接讀到 Buffer 中進行操作。

NIO 通過Channel(通道) 進行讀寫。

通道是雙向的,可讀也可寫,而流的讀寫是單向的。無論讀寫,通道只能和Buffer互動。因為 Buffer,通道可以非同步地讀寫。

NIO有選擇器,而IO沒有。

選擇器用於使用單個執行緒處理多個通道。因此,它需要較少的執行緒來處理這些通道。執行緒之間的切換對於作業系統來說是昂貴的。 因此,為了提高系統效率選擇器是有用的。

AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改進版 NIO 2,它是非同步非阻塞的IO模型。非同步 IO 是基於事件和回撥機制實現的,也就是應用操作之後會直接返回,不會堵塞在那裡,當後臺處理完成,作業系統會通知相應的執行緒進行後續的操作。

每當要從快取區的時候讀取資料時,就呼叫filp()“切換成讀模式”。

讀完我們還想寫資料到緩衝區,那就使用clear()函式,這個函式會“清空”緩衝區 。


簡介

NIO中的N可以理解為Non-blocking ,不單純是New 。

不同點:

  • 標準的IO程式設計介面是面向位元組流和字元流的。而NIO是面向通道和緩衝區的,資料總是從通道中讀到buffer緩衝區內,或者從buffer寫入到通道中。
  • Java NIO使我們可以進行非阻塞IO操作。比如說,單執行緒中從通道讀取資料到buffer,同時可以繼續做別的事情,當資料讀取到buffer中後,執行緒再繼續處理資料。寫資料也是一樣的。
  • NIO中有一個“slectors”的概念。selector可以檢測多個通道的事件狀態(例如:連結開啟,資料到達)這樣單執行緒就可以操作多個通道的資料。

概覽

NIO包含下面3個核心的元件,Channel,Buffer和Selector組成了這個核心的API:

  • Channels ——通道
  • Buffers ——緩衝區
  • Selectors ——選擇器

通常來說NIO中的所有IO都是從Channel開始的。Channel和流有點類似。通過Channel,我們即可以從Channel把資料寫到Buffer中,也可以把資料衝Buffer寫入到Channel 。

有很多的Channel,Buffer型別。下面列舉了主要的幾種:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

正如你看到的,這些channel基於於UDP和TCP的網路IO,以及檔案IO。 和這些類一起的還有其他一些比較有趣的介面,在本節中暫時不多介紹。為了簡潔起見,我們會在必要的時候引入這些概念。 下面是核心的Buffer實現類的列表:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

這些Buffer涵蓋了可以通過IO操作的基礎型別:byte,short,int,long,float,double以及characters. NIO實際上還包含一種MappedBytesBuffer,一般用於和記憶體對映的檔案。

選擇器允許單執行緒操作多個通道。如果你的程式中有大量的連結,同時每個連結的IO頻寬不高的話,這個特性將會非常有幫助。比如聊天伺服器。 下面是一個單執行緒中Slector維護3個Channel的示意圖:

要使用Selector的話,我們必須把Channel註冊到Selector上,然後就可以呼叫Selector的select()方法。這個方法會進入阻塞,直到有一個channel的狀態符合條件。當方法返回後,執行緒可以處理這些事件。

Java NIO Channel通道

Java NIO Channel通道和流非常相似,主要有以下3點區別:

  • 通道可以讀也可以寫,流一般來說是單向的(只能讀或者寫)。
  • 通道可以非同步讀寫。
  • 通道總是基於緩衝區Buffer來讀寫。

Channel的實現

下面列出Java NIO中最重要的集中Channel的實現:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

FileChannel用於檔案的資料讀寫。 DatagramChannel用於UDP的資料讀寫。 SocketChannel用於TCP的資料讀寫。 ServerSocketChannel允許我們監聽TCP連結請求,每個請求會建立會一個SocketChannel.

RandomAccessFile擴充套件:

RandomAccessFile(隨機訪問檔案)類。該類是Java語言中功能最為豐富的檔案訪問類 。RandomAccessFile類支援“隨機訪問”方式,這裡“隨機”是指可以跳轉到檔案的任意位置處讀寫資料。在訪問一個檔案的時候,不必把檔案從頭讀到尾,而是希望像訪問一個數據庫一樣“隨心所欲”地訪問一個檔案的某個部分,這時使用RandomAccessFile類就是最佳選擇。

四種模式:R RW RWD RWS

r 以只讀的方式開啟文字,也就意味著不能用write來操作檔案
rw 讀操作和寫操作都是允許的
rws 每當進行寫操作,同步的重新整理到磁碟,重新整理內容和元資料
rwd 每當進行寫操作,同步的重新整理到磁碟,重新整理內容

RandomAccessFile的用處:

1、大型文字日誌類檔案的快速定位獲取資料:

得益於seek的巧妙設計,我認為我們可以從超大的文字中快速定位我們的遊標,例如每次存日誌的時候,我們可以建立一個索引快取,索引是日誌的起始日期,value是文字的poiniter 也就是游標,這樣我們可以快速定位某一個時間段的文字內容

2、併發讀寫

也是得益於seek的設計,我認為多執行緒可以輪流操作seek控制游標的位置,從未達到不同執行緒的併發寫操作。

3、更方便的獲取二進位制檔案

通過自帶的讀寫轉碼(readDouble、writeLong等),我認為可以快速的完成位元組碼到字元的轉換功能,對使用者來說比較友好。

RandomAccessFile參考

例項:

public class FileChannelTest {

    public static void main(String[] args) throws IOException {
        RandomAccessFile file = new RandomAccessFile("D:\\text\\1_loan.sql", "r");
        //mode只有4中,如果不是讀寫的mode或者給的不是4種中的,就會報錯。
        RandomAccessFile copyFile = new RandomAccessFile("D:\\text\\1_loan_copy.sql", "r");
        try {
            FileChannel fileChannel = file.getChannel();
            FileChannel copyFileChannel = copyFile.getChannel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int read = fileChannel.read(byteBuffer);
            while (read != -1) {
                System.out.println("read:" + read);
                //byteBuffer緩衝區切換為讀模式
                byteBuffer.flip();
                copyFileChannel.write(byteBuffer);
                //“清空”byteBuffer緩衝區,以滿足後續寫入操作
                byteBuffer.clear();
                //注意,每次讀時都要返回讀後的狀態read值賦值給迴圈判斷體read,否則會陷入死迴圈true
                read = fileChannel.read(byteBuffer);
            }
        } finally {
            file.close();
            copyFile.close();
        }
    }
}

報錯:

RandomAccessFile copyFile = new RandomAccessFile("D:\\text\\1_loan_copy.sql", "w");
//因為沒有"w"的mode
Exception in thread "main" java.lang.IllegalArgumentException: Illegal mode "w" must be one of "r", "rw", "rws", or "rwd"
	at java.io.RandomAccessFile.<init>(RandomAccessFile.java:221)
	
RandomAccessFile copyFile = new RandomAccessFile("D:\\text\\1_loan_copy.sql", "r");
//因為沒有"w"的許可權
Exception in thread "main" java.nio.channels.NonWritableChannelException
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:194)
	at com.niotest.FileChannelTest.main(FileChannelTest.java:33)	

NIO Buffer緩衝區

Java NIO Buffers用於和NIO Channel互動。正如你已經知道的,我們從channel中讀取資料到buffers裡,從buffer把資料寫入到channels.

buffer本質上就是一塊記憶體區,可以用來寫入資料,並在稍後讀取出來。這塊記憶體被NIO Buffer包裹起來,對外提供一系列的讀寫方便開發的介面。

Buffer基本用法

利用Buffer讀寫資料,通常遵循四個步驟:

  • 把資料寫入buffer;
  • 呼叫flip;
  • 從Buffer中讀取資料;
  • 呼叫buffer.clear()或者buffer.compact()

當寫入資料到buffer中時,buffer會記錄已經寫入的資料大小。當需要讀資料時,通過flip()方法把buffer從寫模式調整為讀模式;在讀模式下,可以讀取所有已經寫入的資料。

當讀取完資料後,需要清空buffer,以滿足後續寫入操作。清空buffer有兩種方式:呼叫clear()或compact()方法。clear會清空整個buffer,compact則只清空已讀取的資料,未被讀取的資料會被移動到buffer的開始位置,寫入位置則近跟著未讀資料之後。

Buffer的容量,位置,上限(Buffer Capacity, Position and Limit)

buffer緩衝區實質上就是一塊記憶體,用於寫入資料,也供後續再次讀取資料。這塊記憶體被NIO Buffer管理,並提供一系列的方法用於更簡單的操作這塊記憶體。

一個Buffer有三個屬性是必須掌握的,分別是:

  • capacity容量
  • position位置
  • limit限制

position和limit的具體含義取決於當前buffer的模式。capacity在兩種模式下都表示容量。

下面有張示例圖,描訴了不同模式下position和limit的含義:

容量(Capacity)

作為一塊記憶體,buffer有一個固定的大小,叫做capacity容量。也就是最多隻能寫入容量值的位元組,整形等資料。一旦buffer寫滿了就需要清空已讀資料以便下次繼續寫入新的資料。

位置(Position)

當寫入資料到Buffer的時候需要中一個確定的位置開始,預設初始化時這個位置position為0,一旦寫入了資料比如一個位元組,整形資料,那麼position的值就會指向資料之後的一個單元,position最大可以到capacity-1.

當從Buffer讀取資料時,也需要從一個確定的位置開始。buffer從寫入模式變為讀取模式時,position會歸零,每次讀取後,position向後移動。

上限(Limit)

在寫模式,limit的含義是我們所能寫入的最大資料量。它等同於buffer的容量。

一旦切換到讀模式,limit則代表我們所能讀取的最大資料量,他的值等同於寫模式下position的位置。

資料讀取的上限時buffer中已有的資料,也就是limit的位置(原寫模式下position所指的位置)。

Buffer Types

Java NIO有如下具體的Buffer型別:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

正如你看到的,Buffer的型別代表了不同資料型別,換句話說,Buffer中的資料可以是上述的基本型別;

分配一個Buffer(Allocating a Buffer)

為了獲取一個Buffer物件,你必須先分配。每個Buffer實現類都有一個allocate()方法用於分配記憶體。

    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    CharBuffer charBuffer = CharBuffer.allocate(48);

寫入資料到Buffer(Writing Data to a Buffer)

寫資料到Buffer有兩種方法:

  • 從Channel中寫資料到Buffer

  • 手動寫資料到Buffer,呼叫put方法

    //從Channel中寫資料到Buffer
    int read = fileChannel.read(byteBuffer);
    //呼叫put方法寫
    buf.put(3);
    //把資料寫到特定的位置
    public ByteBuffer put(int i, byte x);
    //把一個具體型別資料寫入buffer
    public ByteBuffer putInt(int x);

flip()——翻轉

flip()方法可以把Buffer從寫模式切換到讀模式。呼叫flip方法會把position歸零,並設定limit為之前的position的值。 也就是說,現在position代表的是讀取位置,limit標示的是已寫入的資料位置。

從Buffer讀取資料(Reading Data from a Buffer)

衝Buffer讀資料也有兩種方式。

  • 從buffer讀資料到channel。

  • 從buffer直接讀取資料,呼叫get方法。

    //讀取資料到channel的例子:
    int bytesWritten = inChannel.write(buf);
    //呼叫get讀取資料的例子:
    byte aByte = buf.get();

rewind()——倒帶

Buffer.rewind()方法將position置為0,這樣我們可以重複讀取buffer中的資料。limit保持不變。

clear() and compact()

一旦我們從buffer中讀取完資料,需要複用buffer為下次寫資料做準備。只需要呼叫clear或compact方法。

clear方法會重置position為0,limit為capacity,也就是整個Buffer清空。實際上Buffer中資料並沒有清空,我們只是把標記為修改了。(重新寫入的時候這些存在的資料就會被新的資料覆蓋)

如果Buffer還有一些資料沒有讀取完,呼叫clear就會導致這部分資料被“遺忘”,因為我們沒有標記這部分資料未讀。

針對這種情況,如果需要保留未讀資料,那麼可以使用compact()。 因此compact()和clear()的區別就在於對未讀資料的處理,是保留這部分資料還是一起清空。

mark() and reset()

通過mark方法可以標記當前的position,通過reset來恢復mark的位置,這個非常像canva的save和restore:

buffer.mark();

//call buffer.get() a couple of times, e.g. during parsing.

buffer.reset();  //set position back to mark.

equals() and compareTo()

可以用eqauls和compareTo比較兩個buffer

equals()

判斷兩個buffer相對,需滿足:

  • 型別相同
  • buffer中剩餘位元組數相同
  • 所有剩餘位元組相等

從上面的三個條件可以看出,equals只比較buffer中的部分內容,並不會去比較每一個元素。

compareTo()

compareTo也是比較buffer中的剩餘元素,只不過這個方法適用於比較排序的:

NIO Scatter (分散)/ Gather(聚集)

——分散讀和聚集寫的場景。

Java NIO釋出時內建了對scatter / gather的支援。scatter / gather是通過通道讀寫資料的兩個概念。

Scattering read指的是從通道讀取的操作能把資料寫入多個buffer,也就是scatters代表了資料從一個channel到多個buffer的過程。

gathering write則正好相反,表示的是從多個buffer把資料寫入到一個channel中。

Scatter/gather在有些場景下會非常有用,比如需要處理多份分開傳輸的資料。舉例來說,假設一個訊息包含了header和body,我們可能會把header和body儲存在不同獨立buffer中,這種分開處理header與body的做法會使開發更簡明。

Scattering Reads

"scattering read"是把資料從單個Channel寫入到多個buffer,下面是示意圖:

觀察程式碼可以發現,我們把多個buffer寫在了一個數組中,然後把陣列傳遞給channel.read()方法。read()方法內部會負責把資料按順序寫進傳入的buffer陣列內。一個buffer寫滿後,接著寫到下一個buffer中。

實際上,scattering read內部必須寫滿一個buffer後才會向後移動到下一個buffer,因此這並不適合訊息大小會動態改變的部分,也就是說,如果你有一個header和body,並且header有一個固定的大小(比如128位元組),這種情形下可以正常工作。

athering Writes

"gathering write"把多個buffer的資料寫入到同一個channel中.

似的傳入一個buffer陣列給write,內部機會按順序將陣列內的內容寫進channel,這裡需要注意,寫入的時候針對的是buffer中position到limit之間的資料。也就是如果buffer的容量是128位元組,但它只包含了58位元組資料,那麼寫入的時候只有58位元組會真正寫入。因此gathering write是可以適用於可變大小的message的,這和scattering reads不同。

NIO Channel to Channel Transfers通道傳輸介面

在Java NIO中如果一個channel是FileChannel型別的,那麼他可以直接把資料傳輸到另一個channel。這個特性得益於FileChannel包含的transferTo和transferFrom兩個方法。

transferFrom()——目標channel用,引數為源資料channel。

transferFrom的引數position和count表示目標檔案的寫入位置和最多寫入的資料量。如果通道源的資料小於count那麼就傳實際有的資料量。 另外,有些SocketChannel的實現在傳輸時只會傳輸哪些處於就緒狀態的資料,即使SocketChannel後續會有更多可用資料。因此,這個傳輸過程可能不會傳輸整個的資料。

transferTo()——源資料用,引數為目標channel

SocketChannel的問題也存在於transferTo.SocketChannel的實現可能只在傳送的buffer填充滿後才傳送,並結束。

例項:

public class ChannelTransferTest {

    public static void main(String[] args) throws IOException {
        RandomAccessFile fromfile = new RandomAccessFile("D:\\text\\1_loan.sql", "rw");
        //mode只有4中,如果不是讀寫的mode或者給的不是4種中的,就會報錯。
        RandomAccessFile toFile = new RandomAccessFile("D:\\text\\1_loan_copy.sql", "rw");

        FileChannel fromfileChannel = fromfile.getChannel();
        FileChannel toFileChannel = toFile.getChannel();
        //==========================transferTo=================================
        //transferTo方法把fromfileChannel資料傳輸到另一個toFileChannel
        //long transferSize = fromfileChannel.transferTo(0, fromfileChannel.size(), toFileChannel);
        //System.out.println(transferSize);

        //=============================transferFrom==============================
        //把資料從通道源傳輸到toFileChannel,相比通過buffer讀寫更加的便捷
        long transferSize1 = toFileChannel.transferFrom(fromfileChannel, 0, fromfileChannel.size());
        //引數position和count表示目標檔案的寫入位置和最多寫入的資料量
        //long transferSize1 = toFileChannel.transferFrom(fromfileChannel, 0, fromfileChannel.size()-1000);
        //如果通道源的資料小於count那麼就傳實際有的資料量。
        //long transferSize1 = toFileChannel.transferFrom(fromfileChannel, 0, fromfileChannel.size()+1000);
        System.out.println(transferSize1);
    }
}

NIO Selector選擇器

Selector是Java NIO中的一個元件,用於檢查一個或多個NIO Channel的狀態是否處於可讀、可寫。如此可以實現單執行緒管理多個channels,也就是可以管理多個網路連結。

為什麼使用Selector

用單執行緒處理多個channels的好處是我需要更少的執行緒來處理channel。實際上,你甚至可以用一個執行緒來處理所有的channels。從作業系統的角度來看,切換執行緒開銷是比較昂貴的,並且每個執行緒都需要佔用系統資源,因此暫用執行緒越少越好。

需要留意的是,現代作業系統和CPU在多工處理上已經變得越來越好,所以多執行緒帶來的影響也越來越小。如果一個CPU是多核的,如果不執行多工反而是浪費了機器的效能。不過這些設計討論是另外的話題了。簡而言之,通過Selector我們可以實現單執行緒操作多個channel。

建立Selector

建立一個Selector可以通過Selector.open()方法:

Selector selector = Selector.open();

註冊Channel到Selector上

先把Channel註冊到Selector上,這個操作使用SelectableChannel的register()。SocketChannel等都有繼承此抽象類。

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Channel必須是非阻塞的。所以FileChannel不適用Selector,因為FileChannel不能切換為非阻塞模式。Socket channel可以正常使用。

注意register的第二個引數,這個引數是一個“關注集合”,代表我們關注的channel狀態,有四種基礎型別可供監聽:

  1. Connect——連線就緒
  2. Accept——可連線就緒
  3. Read——讀就緒
  4. Write——寫就緒

一個channel觸發了一個事件也可視作該事件處於就緒狀態。因此當channel與server連線成功後,那麼就是“連線就緒”狀態。server channel接收請求連線時處於“可連線就緒”狀態。channel有資料可讀時處於“讀就緒”狀態。channel可以進行資料寫入時處於“寫就緒”狀態。

上述的四種就緒狀態用SelectionKey中的常量表示如下:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

如果對多個事件感興趣可利用位的或運算結合多個常量,比如:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;  

SelectionKey's

在上一小節中,我們利用register方法把Channel註冊到了Selectors上,這個方法的返回值是SelectionKeys,這個返回的物件包含了一些比較有價值的屬性:

  • The interest set
  • The ready set
  • The Channel
  • The Selector
  • An attached object (optional)

Interest Set

這個“關注集合”實際上就是我們希望處理的事件的集合,它的值就是註冊時傳入的引數,我們可以用按為與運算把每個事件取出來:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE; 

Ready Set

"就緒集合"中的值是當前channel處於就緒的值,一般來說在呼叫了select方法後都會需要用到就緒狀態

int readySet = selectionKey.readyOps();

從“就緒集合”中取值的操作類似於“關注集合”的操作,當然還有更簡單的方法,SelectionKey提供了一系列返回值為boolean的的方法:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

從SelectionKey操作Channel和Selector非常簡單:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();  

Attaching Objects

我們可以給一個SelectionKey附加一個Object,這樣做一方面可以方便我們識別某個特定的channel,同時也增加了channel相關的附加資訊。例如,可以把用於channel的buffer附加到SelectionKey上:

selectionKey.attach(theObject);

Object attachedObj = selectionKey.attachment();

附加物件的操作也可以在register的時候就執行:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

從Selector中選擇channel

一旦我們向Selector註冊了一個或多個channel後,就可以呼叫select來獲取channel。select方法會返回所有處於就緒狀態的channel。 select方法具體如下:

  • int select()
  • int select(long timeout)
  • int selectNow()

select()方法在返回channel之前處於阻塞狀態。 select(long timeout)和select做的事一樣,不過他的阻塞有一個超時限制。

selectNow()不會阻塞,根據當前狀態立刻返回合適的channel。

select()方法的返回值是一個int整形,代表有多少channel處於就緒了。也就是自上一次select後有多少channel進入就緒。舉例來說,假設第一次呼叫select時正好有一個channel就緒,那麼返回值是1,並且對這個channel做任何處理,接著再次呼叫select,此時恰好又有一個新的channel就緒,那麼返回值還是1,現在我們一共有兩個channel處於就緒,但是在每次呼叫select時只有一個channel是就緒的。

selectedKeys()

在呼叫select並返回了有channel就緒之後,可以通過選中的key集合來獲取channel,這個操作通過呼叫selectedKeys()方法:

Set<SelectionKey> selectedKeys = selector.selectedKeys();    

還記得在register時的操作吧,我們register後的返回值就是SelectionKey例項,也就是我們現在通過selectedKeys()方法所返回的SelectionKey。

遍歷這些SelectionKey可以通過如下方法:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
}

上述迴圈會迭代key集合,針對每個key我們單獨判斷他是處於何種就緒狀態。

注意:keyIterater.remove()方法的呼叫,Selector本身並不會移除SelectionKey物件,這個操作需要我們手動執行。當下次channel處於就緒是,Selector任然會吧這些key再次加入進來。

SelectionKey.channel返回的channel例項需要強轉為我們實際使用的具體的channel型別,例如ServerSocketChannel或SocketChannel.

wakeUp()

由於呼叫select而被阻塞的執行緒,可以通過呼叫Selector.wakeup()來喚醒即便此時已然沒有channel處於就緒狀態。具體操作是,在另外一個執行緒呼叫wakeup,被阻塞與select方法的執行緒就會立刻返回。

close()

當操作Selector完畢後,需要呼叫close方法。close的呼叫會關閉Selector並使相關的SelectionKey都無效。channel本身不會被關閉。

示例:首先開啟一個Selector,然後註冊channel,最後監聽Selector的狀態。

public class NIOServer {

    public static void main(String[] args) throws IOException {

        // 1.獲取通道
        ServerSocketChannel server = ServerSocketChannel.open();

        // 2.切換成非阻塞模式
        server.configureBlocking(false);

        // 3. 繫結連線
        server.bind(new InetSocketAddress(6666));

        // 4. 獲取選擇器
        Selector selector = Selector.open();

        // 4.1將通道註冊到選擇器上,指定接收“監聽通道”事件
        server.register(selector, SelectionKey.OP_ACCEPT);

        // 5. 輪訓地獲取選擇器上已“就緒”的事件--->只要select()>0,說明已就緒
        while (selector.select() > 0) {
            // 6. 獲取當前選擇器所有註冊的“選擇鍵”(已就緒的監聽事件)
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            // 7. 獲取已“就緒”的事件,(不同的事件做不同的事)
            while (iterator.hasNext()) {

                SelectionKey selectionKey = iterator.next();

                // 接收事件就緒
                if (selectionKey.isAcceptable()) {

                    // 8. 獲取客戶端的連結
                    SocketChannel client = server.accept();

                    // 8.1 切換成非阻塞狀態
                    client.configureBlocking(false);

                    // 8.2 註冊到選擇器上-->拿到客戶端的連線為了讀取通道的資料(監聽讀就緒事件)
                    client.register(selector, SelectionKey.OP_READ);

                } else if (selectionKey.isReadable()) { // 讀事件就緒

                    // 9. 獲取當前選擇器讀就緒狀態的通道
                    SocketChannel client = (SocketChannel) selectionKey.channel();

                    // 9.1讀取資料
                    ByteBuffer buffer = ByteBuffer.allocate(1024);

                    // 9.2得到檔案通道,將客戶端傳遞過來的圖片寫到本地專案下(寫模式、沒有則建立)
                    FileChannel outChannel = FileChannel.open(Paths.get("2_loan.sql"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);

                    while (client.read(buffer) > 0) {
                        // 在讀之前都要切換成讀模式
                        buffer.flip();

                        outChannel.write(buffer);

                        // 讀完切換成寫模式,能讓管道繼續讀取檔案的資料
                        buffer.clear();
                    }
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    byteBuffer.put("yeah,i know,i got your message!".getBytes());
                    byteBuffer.flip();
                    client.write(byteBuffer);
                }
                // 10. 取消選擇鍵(已經處理過的事件,就應該取消掉了)
                iterator.remove();
            }
        }
    }
}

public class NIOClientTwo {

    public static void main(String[] args) throws IOException {

        // 1. 獲取通道
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));

        // 1.1切換成非阻塞模式
        socketChannel.configureBlocking(false);

        // 1.2獲取選擇器
        Selector selector = Selector.open();

        // 1.3將通道註冊到選擇器中,獲取服務端返回的資料
        socketChannel.register(selector, SelectionKey.OP_READ);

        // 2. 傳送一張圖片給服務端吧
        FileChannel fileChannel = FileChannel.open(Paths.get("D:\\text\\1_loan.sql"), StandardOpenOption.READ);

        // 3.要使用NIO,有了Channel,就必然要有Buffer,Buffer是與資料打交道的呢
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // 4.讀取本地檔案(圖片),傳送到伺服器
        while (fileChannel.read(buffer) != -1) {

            // 在讀之前都要切換成讀模式
            buffer.flip();

            socketChannel.write(buffer);

            // 讀完切換成寫模式,能讓管道繼續讀取檔案的資料
            buffer.clear();
        }


        // 5. 輪訓地獲取選擇器上已“就緒”的事件--->只要select()>0,說明已就緒
        while (selector.select() > 0) {
            // 6. 獲取當前選擇器所有註冊的“選擇鍵”(已就緒的監聽事件)
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            // 7. 獲取已“就緒”的事件,(不同的事件做不同的事)
            while (iterator.hasNext()) {

                SelectionKey selectionKey = iterator.next();

                // 8. 讀事件就緒
                if (selectionKey.isReadable()) {

                    // 8.1得到對應的通道
                    SocketChannel channel = (SocketChannel) selectionKey.channel();

                    ByteBuffer responseBuffer = ByteBuffer.allocate(1024);

                    // 9. 知道服務端要返回響應的資料給客戶端,客戶端在這裡接收
                    int readBytes = channel.read(responseBuffer);

                    if (readBytes > 0) {
                        // 切換讀模式
                        responseBuffer.flip();
                        System.out.println(new String(responseBuffer.array(), 0, readBytes));
                    }
                }
                // 10. 取消選擇鍵(已經處理過的事件,就應該取消掉了)
                iterator.remove();
            }
        }
    }
}

NIO FileChannel檔案通道

Java NIO中的FileChannel是用於連線檔案的通道。通過檔案通道可以讀、寫檔案的資料。Java NIO的FileChannel是相對標準Java IO API的可選介面。

FileChannel不可以設定為非阻塞模式,他只能在阻塞模式下執行。

開啟檔案通道

在使用FileChannel前必須開啟通道,開啟一個檔案通道需要通過輸入/輸出流或者RandomAccessFile,下面是通過RandomAccessFile開啟檔案通道的案例:

RandomAccessFile aFile = new RandomAccessFile("D:\text\1_loan.sql", "rw");
FileChannel inChannel = aFile.getChannel();

從檔案通道內讀取資料

讀取檔案通道的資料可以通過read方法:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

首先開闢一個Buffer,從通道中讀取的資料會寫入Buffer內。接著就可以呼叫read方法,read的返回值代表有多少位元組被寫入了Buffer,返回-1則表示已經讀取到檔案結尾了。

向檔案通道寫入資料

寫資料用write方法,入參是Buffer:

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    channel.write(buf);
}

注意這裡的write呼叫寫在了wihle迴圈彙總,這是因為write不能保證有多少資料真實被寫入,因此需要迴圈寫入直到沒有更多資料。

關閉通道

操作完畢後,需要把通道關閉:

channel.close();    

FileChannel Position

當操作FileChannel的時候讀和寫都是基於特定起始位置的(position),獲取當前的位置可以用FileChannel的position()方法,設定當前位置可以用帶引數的position(long pos)方法。

//獲取當前的位置
long position = fileChannel.position();
//設定當前位置為pos +123
fileChannel.position(pos +123);

假設我們把當前位置設定為檔案結尾之後,那麼當我們檢視從通道中讀取資料時就會發現返回值是-1,表示已經到達檔案結尾了。 如果把當前位置設定為檔案結尾之後,再向通道中寫入資料,檔案會自動擴充套件以便寫入資料,但是這樣會導致檔案中出現類似空洞,即檔案的一些位置是沒有資料的。

FileChannel Size

size()方法可以返回FileChannel對應的檔案的檔案大小:

long fileSize = channel.size();    

FileChannel Truncate

利用truncate方法可以擷取指定長度的檔案:

FileChannel truncateFile = fileChannel.truncate(1024);

FileChannel Force

force方法會把所有未寫磁碟的資料都強制寫入磁碟。這是因為在作業系統中出於效能考慮回把資料放入緩衝區,所以不能保證資料在呼叫write寫入檔案通道後就及時寫到磁碟上了,除非手動呼叫force方法。 force方法需要一個布林引數,代表是否把meta data也一併強制寫入。

channel.force(true);

NIO SocketChannel套接字通道

在Java NIO體系中,SocketChannel是用於TCP網路連線的套接字介面,相當於Java網路程式設計中的Socket套接字介面。建立SocketChannel主要有兩種方式,如下:

  1. 開啟一個SocketChannel並連線網路上的一臺伺服器。
  2. 當ServerSocketChannel接收到一個連線請求時,會建立一個SocketChannel。

建立一個SocketChannel連線

開啟一個SocketChannel可以這樣操作:

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://www.google.com", 80));  

關閉一個SocketChannel連線

關閉一個SocketChannel只需要呼叫他的close方法,如下:

socketChannel.close();

從SocketChannel中讀資料

從一個SocketChannel連線中讀取資料,可以通過read()方法,如下:

ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = socketChannel.read(buf);

首先需要開闢一個Buffer。從SocketChannel中讀取的資料將放到Buffer中。

接下來就是呼叫SocketChannel的read()方法.這個read()會把通道中的資料讀到Buffer中。read()方法的返回值是一個int資料,代表此次有多少位元組的資料被寫入了Buffer中。如果返回的是-1,那麼意味著通道內的資料已經讀取完畢,到底了(連結關閉)。

向SocketChannel寫資料

向SocketChannel中寫入資料是通過write()方法,write也需要一個Buffer作為引數。下面看一下具體的示例:

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    channel.write(buf);
}

非阻塞模式

我們可以把SocketChannel設定為non-blocking(非阻塞)模式。這樣的話在呼叫connect(), read(), write()時都是非同步的。

socketChannel.configureBlocking(false);

connect()

如果我們設定了一個SocketChannel是非阻塞的,那麼呼叫connect()後,方法會在連結建立前就直接返回。為了檢查當前連結是否建立成功,我們可以呼叫finishConnect(),如下:

socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://www.google.com", 80));

while(! socketChannel.finishConnect() ){
    //wait, or do something else...    
}

write()

在非阻塞模式下,呼叫write()方法不能確保方法返回後寫入操作一定得到了執行。因此我們需要把write()呼叫放到迴圈內。這和前面在講write()時是一樣的,此處就不在程式碼演示。

read()

在非阻塞模式下,呼叫read()方法也不能確保方法返回後,確實讀到了資料。因此我們需要自己檢查的整型返回值,這個返回值會告訴我們實際讀取了多少位元組的資料。

Selector結合非阻塞模式

SocketChannel的非阻塞模式可以和Selector很好的協同工作。把一個或多個SocketChannel註冊到一個Selector後,我們可以通過Selector指導哪些channels通道是處於可讀,可寫等等狀態的。

NIO ServerSocketChannel服務端套接字通道

在Java NIO中,ServerSocketChannel是用於監聽TCP連結請求的通道,正如Java網路程式設計中的ServerSocket一樣。

ServerSocketChannel實現類位於java.nio.channels包下面。

void test() throws IOException {
    //開啟一個ServerSocketChannel我們需要呼叫他的open()方法
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(9999));
    while(true) {
        SocketChannel socketChannel = serverSocketChannel.accept();
        //do something with socketChannel...
        if (socketChannel.isConnected()) {
            break;
        }
    }
    //關閉一個ServerSocketChannel我們需要呼叫close()方法
    serverSocketChannel.close();
}

監聽連結

通過呼叫accept()方法,我們就開始監聽埠上的請求連線。當accept()返回時,他會返回一個SocketChannel連線例項,實際上accept()是阻塞操作,他會阻塞帶去執行緒知道返回一個連線; 很多時候我們是不滿足於監聽一個連線的,因此我們會把accept()的呼叫放到迴圈中,就像這樣:

while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    //do something with socketChannel...
}

當然我們可以在迴圈體內加上合適的中斷邏輯,而不是單純的在while迴圈中寫true,以此來結束迴圈監聽;

非阻塞模式

實際上ServerSocketChannel是可以設定為非阻塞模式的。在非阻塞模式下,呼叫accept()函式會立刻返回,如果當前沒有請求的連結,那麼返回值為空null。因此我們需要手動檢查返回的SocketChannel是否為空,例如:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
//設定為非阻塞模式
serverSocketChannel.configureBlocking(false);
while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();

    if(socketChannel != null){
        //do something with socketChannel...
    }
}

Non-blocking Server非阻塞伺服器

非阻塞伺服器程式碼

非阻塞IO通道(Non-blocking IO Pipelines)

非阻塞的IO管道(Non-blocking IO Pipelines)可以看做是整個非阻塞IO處理過程的鏈條。包括在以非阻塞形式進行的讀與寫操作。

一個非阻塞的IO管道不必同時需要讀和寫資料,通常來說有些管道只需要讀資料,而另一些管道則只需寫資料。

當然一個非阻塞的IO管道他也可以同時從多個Channel中讀取資料,例如同時從多個SocketChannel中讀取資料;

非阻塞和阻塞通道比較(Non-blocking vs. Blocking IO Pipelines)

非阻塞IO管道和阻塞IO管道之間最大的區別是他們各自如何從Channel(套接字socket或檔案file)讀寫資料。

  1. IO管道通常直接從流中(來自於socket活file的流)讀取資料,然後把資料分割為連續的訊息。這個處理與我們讀取流資訊,用tokenizer進行解析非常相似,不同的是我們在這裡會把資料流分割為更大一些的訊息塊。
  2. 一個阻塞IO管道的使用可以和輸入流一樣呼叫,每次從Channel中讀取一個位元組的資料,阻塞自身直到有資料可讀。
  3. 使用阻塞IO大大簡化了Message Reader的實現成本。阻塞的Message Reader無需關注沒有資料返回的情形,無需關注返回部分資料或者資料解析需要被複用的問題。

阻塞IO通道的缺點(Blocking IO Pipeline Drawbacks)

上面提到了阻塞的Message Reader易於實現,但是阻塞也給他帶了不可避免的缺點,必須為每個資料數量都分配一個單獨執行緒。原因就在於IO介面在讀取資料時在有資料返回前會一直被阻塞住。這直接導致我們無法用單執行緒來處理一個流沒有資料返回時去讀取其他的流。每當一個執行緒嘗試去讀取一個流的資料,這個執行緒就會被阻塞直到有資料真正返回。

如果這樣的IO管道運用到伺服器去處理高併發的連結請求,伺服器將不得不為每一個到來的連結分配一個單獨的執行緒。如果併發數不高比如每一時刻只有幾百併發,也行不會有太大問題。一旦伺服器的併發數上升到百萬級別,這種設計就缺乏伸縮性。每個執行緒需要為堆疊分配320KB(32位JVM)到1024KB(64位JVM)的記憶體空間。這就是說如果有1,000,000個執行緒,需要1TB的記憶體。而這些在還沒開始真正處理接收到的訊息前就需要(訊息處理中還需要為物件開闢記憶體)。

為了減少執行緒數,很多伺服器都設計了執行緒池,把所有接收到的請求放到佇列內,每次讀取一條連線進行處理。這種設計可以用下圖表示:

但是這種設計要求緩衝的連線程序傳送有意義的資料。如果這些連線長時間處於非活躍的狀態,那麼大量非活躍的連線會阻塞執行緒池中的所有執行緒。這會導致伺服器的響應速度特別慢甚至無響應。

有些伺服器為了減輕這個問題,採取的操作是適當增加執行緒池的彈性。例如,當執行緒池所有執行緒都處於飽和時,執行緒池可能會自動擴容,啟動更多的執行緒來處理事務。這個解決方案會使得伺服器維護大量不活躍的連結。但是需要謹記伺服器所能開闢的執行緒數是有限制的。所有當有1,000,000個低速的連結時(大量非活躍連結時),伺服器還是不具備伸縮性。

基礎的非阻塞通道設計(Basic Non-blocking IO Pipeline Design)

一個非阻塞的IO通道可以用單執行緒讀取多個數據流。這個前提是相關的流可以切換為非阻塞模式(並不是所有流都可以以非阻塞形式操作,FileChannel就不能切換非阻塞模式)。在非阻塞模式下,讀取一個流可能返回0個或多個位元組。如果流還沒有可供讀取的資料那麼就會返回0,其他大於1的返回都表明這是實際讀取到的資料;

為了避開沒有資料可讀的流,我們結合Java NIO中的Selector。一個Selector可以註冊多個SelectableChannel例項。當我們呼叫select()或selectorNow()方法時Selector會返回一個有資料可讀的SelectableChannel例項。這個設計可以如下插圖:

讀取部分資訊(Reading Partial Messages)

當我們衝SelectableChannel中讀取一段資料後,我們並不知道這段資料是否是完整的一個message。因為一個數據段可能包含部分message,也就是說即可能少於一個message,也可能多一個message(0到多個message),正如下面這張插圖所示意的那樣:

要處理這種截斷的message,我們會遇到兩個問題(非阻塞讀取資料時):

  1. 檢測資料段中是否包含一個完整的message
  2. 在message剩餘部分獲取到之前,我們如何處理不完整的message

檢測完整message要求Message Reader檢視資料段中的資料是否至少包含一個完整的message。如果包含一個或多個完整message,這些message可以被下發到通道中處理。查詢完整message的過程是個大量重複的操作,所以這個操作必須是越快越好的。

當資料段中有一個不完整的message時,無論不完整訊息是整個資料段還是說在完整message前後,這個不完整的message資料都需要在剩餘部分獲得前儲存起來。

檢查message完整性和儲存不完整message都是Message Reader的職責。為了避免混淆來自不同Channel的資料,我們為每一個Channel分配一個Message Reader。整個設計大概是這樣的:

當我們通過Selector獲取到一個有資料可以讀取的Channel之後,該Channel關聯的Message Reader會讀取資料,並且把資料打斷為Message塊。得到完整的message後就可以通過通道下發到其他元件進行處理。

一個Message Reader自然是協議相關的。他需要知道message的格式以便讀取。如果我們的伺服器是跨協議複用的,那他必須實現Message Reader的協議-大致類似於接收一個Message Reader工廠作為配置引數。

儲存不完整的Message(Storing Partial Messages)

現在我們已經明確了由Message Reader負責不完整訊息的儲存直到接收到完整的訊息。現在我們還需要知道這個儲存過程需要如何來實現。

在設計的時候我們需要考慮兩個關鍵因素:

  1. 我們希望在拷貝訊息資料的時候資料量能儘可能的小,拷貝量越大則效能相對越低;
  2. 我們希望完整的訊息是以順序的位元組儲存,這樣方便進行資料的解析;

為每個Message Reade分配Buffer(A Buffer Per Message Reader)

固定大小buffer

顯然不完整的訊息資料需要儲存在某種buffer中。比較直接的辦法是我們為每個Message Reader都分配一個內部的buffer成員。但是,多大的buffer才合適呢?這個buffer必須能儲存下一個message最大的大小。如果一個message最大是1MB,那每個Message Reader內部的buffer就至少有1MB大小。

在百萬級別的併發連結數下,1MB的buffer基本沒法正常工作。舉例來說,1,000,000 x 1MB就是1TB的記憶體大小!如果訊息的最大資料量是16MB又需要多少記憶體呢?128MB呢?

缺點:這種直接分配一個message最大的大小值的buffer是非常浪費空間的。

可伸縮Buffer(Resizable Buffers)

另一個方案是在每個Message Reader內部維護一個容量可變的buffer。一個可變的buffer在初始化時佔用較少空間,在訊息變得很大超出容量時自動擴容。這樣每個連結就不需要都佔用比如1MB的空間。每個連結只使用承載下一個訊息所必須的記憶體大小。

容量可變的buffer優點就是高效利用記憶體空間,不會浪費記憶體。

要實現一個可伸縮的buffer有幾種不同的辦法。每一種都有它的優缺點,下面幾個小結我會逐一討論它們。

拷貝擴容(Resize by Copy)

第一種實現可伸縮buffer的辦法是初始化buffer的時候只申請較少的空間,比如4KB。如果訊息超出了4KB的大小那麼開賠一個更大的空間,比如8KB,然後把4KB中的資料拷貝紙8KB的記憶體塊中。

拷貝方式擴容的優點:一個訊息的全部資料都被儲存在了一個連續的位元組陣列中。這使得資料解析變得更加容易。

缺點:會增加大量的資料拷貝操作。

拷貝擴容操作舉例分析:

為了減少資料的拷貝操作,你可以分析整個訊息流中的訊息大小,一次來找到最適合當前機器的可以減少拷貝操作的buffer大小。例如,你可能會注意到覺大多數的訊息都是小於4KB的,因為他們僅僅包含了一個非常請求和響應。這意味著訊息的處所榮校應該設定為4KB。

同時,你可能會發現如果一個訊息大於4KB,很可能是因為他包含了一個檔案。你會可能注意到 大多數通過系統的資料都是小於128KB的。所以我們可以在第一次擴容設定為128KB。

最後你可能會發現當一個訊息大於128KB後,沒有什麼規律可循來確定下次分配的空間大小,這意味著最後的buffer容量應該設定為訊息最大的可能資料量。

結合這三次擴容時的大小設定,可以一定程度上減少資料拷貝。4KB以下的資料無需拷貝。在1百萬的連線下需要的空間例如1,000,000x4KB=4GB,目前(2015)大多數伺服器都扛得住。4KB到128KB會僅需拷貝一次,即拷貝4KB資料到128KB的裡面。訊息大小介於128KB和最大容量的時需要拷貝兩次。首先4KB資料被拷貝第二次是拷貝128KB的資料,所以總共需要拷貝132KB資料。假設沒有很多的訊息會超過128KB,那麼這個方案還是可以接受的。

當一個訊息被完整的處理完畢後,它佔用的內容應當即刻被釋放。這樣下一個來自同一個連結通道的訊息可以從最小的buffer大小重新開始。這個操作是必須的如果我們需要儘可能高效地複用不同連結之間的記憶體。大多數情況下並不是所有的連結都會在同一時刻需要大容量的buffer。

筆者寫了一個完整的教程闡述瞭如何實現一個記憶體buffer使其支援擴容:Resizable Arrays 。這個教程也附帶了一個指向GitHub上的原始碼倉地址,裡面有實現方案的具體程式碼。

追加擴容(Resize by Append)

另一種實現buffer擴容的方案是讓buffer包含幾個陣列。當需要擴容的時候只需要在開闢一個新的位元組陣列,然後把內容寫到裡面去。

這種擴容也有兩個具體的辦法。一種是開闢單獨的位元組陣列,然後用一個列表把這些獨立陣列關聯起來。另一種是開闢一些更大的,相互共享的位元組陣列切片,然後用列表把這些切片和buffer關聯起來。個人而言,筆者認為第二種切片方案更好一點點,但是它們之前的差異比較小

這種追加擴容的方案不管是用獨立陣列還是切片都有一個優點,那就是寫資料的時候不需要額外的拷貝操作。所有的資料可以直接從socket(Channel)中拷貝至陣列活切片當中。

這種方案的缺點也很明顯,就是資料不是儲存在一個連續的陣列中。這會使得資料的解析變得更加複雜,因為解析器不得不同時查詢每一個獨立陣列的結尾和所有陣列的結尾。正因為我們需要在寫資料時查詢訊息的結尾,這個模型在設計實現時會相對不那麼容易。

TLV編碼訊息(TLV Encoded Messages)

有些協議的訊息消失採用的是一種TLV格式(Type, Length, Value)。這意味著當訊息到達時,訊息的完整大小儲存在了訊息的開始部分。我們可以立刻判斷為訊息開闢多少記憶體空間。

優點:TLV編碼是的記憶體管理變得更加簡單。我們可以立刻知道為訊息分配多少記憶體。即便是不完整的訊息,buffer結尾後面也不會有浪費的記憶體。

缺點:我們需要在訊息的全部資料接收到之前就開闢好需要用的所有記憶體。因此少量連結慢,但傳送了大塊資料的連結會佔用較多記憶體,導致伺服器無響應。

解決上訴問題的一個變通辦法是使用一種內部包含多個TLV的訊息格式。這樣我們為每個TLV段分配記憶體而不是為整個的訊息分配,並且只在訊息的片段到達時才分配記憶體。但是訊息片段很大時,任然會出現一樣的問題。

另一個辦法是為訊息設定超時,如果長時間未接收到的訊息(比如10-15秒)。這可以讓伺服器從偶發的併發處理大塊訊息恢復過來,不過還是會讓伺服器有一段時間無響應。另外惡意的DoS攻擊會導致伺服器開闢大量記憶體。

TLV編碼使得記憶體管理更加簡單,這也是HTTP1.1協議讓人覺得是一個不太優良的的協議的原因。正因如此,HTTP2.0協議在設計中也利用TLV編碼來傳輸資料幀。也是因為這個原因我們設計了自己的利用TLV編碼的網路協議VStack.co。

寫不完整的訊息(Writing Partial Messages)