Java NIO總結(一):Channel、Buffer、Selector
0. 概述
NIO的全稱是NoneBlocking IO,非阻塞IO,區別於BIO,BIO的全稱是Blocking IO,阻塞IO。那這個阻塞是什麼意思呢?例如傳統的多執行緒伺服器是BlockingIO模式的,從頭到尾所有的執行緒都是阻塞的,接收請求和處理的過程中:
- Accept是阻塞的,只有新連線來了,Accept才會返回,主執行緒才能繼
- Read是阻塞的,只有請求訊息來了,Read才能返回,子執行緒才能繼續處理
- Write是阻塞的,只有客戶端把訊息收了,Write才能返回,子執行緒才能繼續讀取下一個請求
在IO處理被阻塞的時候處理執行緒就需要等在那裡,佔用了作業系統的排程資源,什麼事也不幹,是非常大的效能浪費。
NIO並不是Java獨有的概念,NIO代表的一個詞彙叫著IO多路複用。它是由作業系統提供的系統呼叫,早期這個作業系統呼叫的名字是select,但是效能低下,後來漸漸演化成了Linux下的epoll和Mac裡的kqueue。
1.Java NIO
NIO並不是java獨有概念,很早在作業系統層面已經提出。
Java從1.4版本引入NIO(New IO/Non-blocking IO)系列介面作為IO包的替代模組,用於替代Java IO和Java Networking 介面。與標準IO介面相比,JavaNIO提供了一種不同的方式來處理IO操作。
學習使用Java NIO需要理解下面幾個概念:
- Channels和Buffers:在標準IO介面中我們最常用的是位元組流(byte strams)和字元流(character streams)。在NIO介面中我們需要使用Channel和Buffer進行IO操作,Channel模擬了流的概念,但是又有不同。資料總是從一個Channel讀到一個buffer中,或者從一個buffer中寫到channel中。
- Non-blocking IO: Java NIO介面的核心就是提供了非阻塞IO的能力(Non-blocking IO)。例如:一個執行緒可以請求channel讀取資料到buffer中,在channel讀取資料的過程中,執行緒可以處理其他的事情,一旦資料已經讀取到buffer中,執行緒可以繼續處理buffer中的資料;對於將buffer中的資料寫到channel中道理是一樣的。
- Selectors:Java NIO包含了Selectors的設計,Selector通過事件驅動多個Channel的物件,Selector可以實現讓一個執行緒管理使用多個數據的Channel。
Java NIO包含了大量的類和元件,但是Channel
Channel和Buffer通常是共同使用的,一般來講,所有的IO和NIO操作都從一個channel開始,channel有點像stream,資料可以通過channel讀取到buffer裡;也可以將資料從buffer寫到channel中Java NIO提供了很多種channel和buffer型別;Channel介面主要實現類如下:
* FileChannel
* DatagramChannel
* SocketChannel
* ServerSocketChannel
這些實現類覆蓋了 UDP + TCP 網路IO以及常用的檔案IO操作,這些實現類裡還要一些比較有趣的介面,這裡先簡單瞭解一下,後邊會詳細介紹。Buffer的是要實現類:
* ByteBuffer
* CharBuffer
* DoubleBuffer
* FloatBuffer
* IntBuffer
* LongBuffer
* ShortBuffer
這些Buffer的實現類涵蓋了可以通過IO讀寫的所有基本型別:byte,short,int,long,float,double和字元(char)。Java NIO還包含了一個MappedByteBuffer用於使用記憶體對映讀取檔案,可以以記憶體的速度快速訪問檔案內容。
2.Channel
JavaNIO Channels和流有一些相似,但是又有些不同:
- 你可以同時讀和寫Channels,流Stream只支援單向的讀或寫(InputStream/OutputStream)
- Channels可以非同步的讀和寫,流Stream是同步的
- Channels總是讀取到buffer或者從buffer中寫入
下面分別介紹一下Channel最重要的一些實現類:
- FileChannel : 可以讀寫檔案中的資料
- DatagramChannel:可以通過UDP協議讀寫資料
- SocketChannel:可以通過TCP協議讀寫資料
- ServerSocketChannel:允許我們像一個web伺服器那樣監聽TCP連結請求,為每一個連結請求建立一個SocketChannel
下面是一個基本的使用FileChannel讀取資料到buffer的例子:
public class FileChannelExam {
public static void main(String[] args){
try {
String path = FileChannelExam.class.getResource("/data/nio-data.txt").getPath();
// 建立一個檔案通道
RandomAccessFile file = new RandomAccessFile(path, "rw");
FileChannel channel = file.getChannel();
// 建立一個位元組buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 讀取資料到buffer
int len = channel.read(buffer);
while (len != -1){
System.out.println("Read " + len);
// 將寫模式轉變為讀模式,
// 將寫模式下的buffer內容最後位置設為讀模式下的limit位置,作為讀越界位,同時將讀位置設為0
// 表示轉換後重頭開始讀,同時消除寫模式的mark標記
buffer.flip();
// 判斷當前讀取位置是否到達越界位(position < limit)
while (buffer.hasRemaining()){
// 讀取當前position的位元組(position++)
System.out.println(buffer.get());
}
// 清空當前buffer內容
buffer.clear();
len = channel.read(buffer);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
需要注意buffer.flip()方法,首先我們從Channel讀取資料寫入到Buffer,然後呼叫flip將切換到讀模式,才能從buffer中讀取資料。
Channel到Channel的資料傳輸
在Java NIO中我們可以直接將資料從一個Channel傳輸到另一個Channel中,比如FileChannel中有transferTo()和transferFrom()方法。
transferFrom()
transferFrom()方法可以將一個源channel中的資料傳輸到一個FileChannel中
String fromPath = FileChannelExam2.class.getResource("/data/nio-data.txt").getPath();
String toPath = FileChannelExam2.class.getResource("/data/nio-data-to.txt").getPath();
RandomAccessFile fromFile = new RandomAccessFile(fromPath, "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile(toPath, "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
transferFrom()有三個引數,源channel,position,count;position定義目標channel寫入的起始位置,count定義寫入資料的容量,如果源channel中的資料量小於count,只會寫入源channel資料的量。
另外,在SocketChannel的實現中,當前SocketChannel已經讀取一部分資料,稍後仍會讀取更多資料情況下,並不一定能將完整的資料讀取到FileChannel中。
transferTo()
transferTo()方法可以將FileChannel中的資料傳輸到其他channel中
String fromPath = FileChannelExam2.class.getResource("/data/nio-data.txt").getPath();
String toPath = FileChannelExam2.class.getResource("/data/nio-data-to.txt").getPath();
RandomAccessFile fromFile = new RandomAccessFile(fromPath, "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile(toPath, "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
fromChannel.transferTo(position, count, toChannel);
上邊兩個例子有些相似,唯一的區別就是呼叫的方法和呼叫方法的物件。這個方法和SocketChannel也會存在和transferFrom同樣的問題。
3.Buffer
在Java NIO中各類Buffer主要用於和NIO Channel進行互動,資料從Channel中讀取到Buffer中,從Buffer寫入到Channel中。
我們可以將Buffer看做記憶體中的一塊區域,我們可以在這塊區域上寫資料,然後在從中讀取。這塊記憶體區域被包裝成NIO Buffer物件,提供了一系列的方法使我們操作這塊記憶體變得更簡單一些。
Buffer的基本使用
使用Buffer進行讀寫資料一般會通過下邊四個步驟處理:
- 將資料寫到Buffer中
- 呼叫buffer.flip()切換為讀模式
- 從Buffer中讀取資料
- 呼叫buffer.clear()或者buffer.compact()清空或壓縮buffer
下邊是個簡單的Buffer使用的例子
public class FileChannelExam {
public static void main(String[] args){
try {
String path = FileChannelExam.class.getResource("/data/nio-data.txt").getPath();
// 建立一個檔案通道
RandomAccessFile file = new RandomAccessFile(path, "rw");
FileChannel channel = file.getChannel();
// 建立一個位元組buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 讀取資料到buffer
int len = channel.read(buffer);
while (len != -1){
System.out.println("Read " + len);
// 將寫模式轉變為讀模式,
// 將寫模式下的buffer內容最後位置設為讀模式下的limit位置,作為讀越界位,同時將讀位置設為0
// 表示轉換後重頭開始讀,同時消除寫模式的mark標記
buffer.flip();
// 判斷當前讀取位置是否到達越界位(position < limit)
while (buffer.hasRemaining()){
// 讀取當前position的位元組(position++)
System.out.println(buffer.get());
}
// 清空當前buffer內容
buffer.clear();
len = channel.read(buffer);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
當我們將資料寫入buffer時,buffer會記錄我們寫入了多少資料,當需要讀取資料的時候,需要呼叫flip()方法將buffer從寫模式切換到讀模式,在讀模式下,buffer允許使用者讀取已經寫入buffer的所有資料。
一旦我們已經讀取了buffer中的所有資料,我們需要清空buffer以便寫一次寫入資料。我們可以使用兩種方法達到這個目的:
- 呼叫clear()方法:清空整個buffer;
- 呼叫compact()方法:僅清空已經讀取的資料,未讀取的資料移動到buffer的起始位置,新寫入的資料會放到未讀取資料的後邊。
Buffer的 capacity, position 和limit
Buffer物件使用capacity,position,limit三個屬性來儲存記憶體狀態以便靈活操作記憶體,瞭解這三個屬性的作用是理解Buffer工作原理的關鍵。position和limit決定了Buffer可以讀寫的區域(position <= x < limit),capacity 表示讀寫的最大容量
下圖模擬了Buffer在讀、寫模式下capacity、position、limit的狀態。
- capacity
作為一塊記憶體,buffer必須有一個固定容量,這就是buffer的capacity。你最多隻能寫入capacity容量的資料到buffer中,一旦buffer中被寫滿資料,在你寫入新的資料之前需要置空buffer(通過讀取資料或直接清空)。
- position
當寫入buffer資料的時候需要明確寫入的位置,這就是position,buffer初始化的時候position為0;當你寫入一個位元組或者整型數字後,position指標會移動到已經寫入資料的記憶體的下一個記憶體位置,position的最大值為capacity-1;
當讀取資料的時候,你也可以給定 一個position,當你呼叫filp()方法將一個buffer從寫模式切換到讀模式的時候,position會重置為0,你將會從0位置開始讀取資料,讀取資料後position也會移動到已讀取資料的下一個位置。
- limit
在寫資料的時候,limit限制了寫入資料的最大容量即position的最大值(position < limit).在寫模式下,limit=capacity;
從上邊程式碼可以看到當呼叫flip()切換到讀模式時,limit被設定為已寫入資料的position值,限制你能讀取資料的容量,也就是說你最多能讀取你寫入的所有的資料。
Buffer常用方法
1.申請一個Buffer
在使用Buffer之前,你必須為它申請一塊記憶體空間,每個Buffer的實現類都實現了它自己的allocate()方法來完成記憶體申請的工作,下面的程式碼展示瞭如何建立一個Buffer物件。
// 建立一個1024位元組的ByteBuffer物件
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 建立一個1024字元的CharBuffer物件
CharBuffer charBuffer = CharBuffer.allocate(1024);
2.寫入資料到buffer中
向buffer寫入資料有兩種方法:
- 通過Channel向Buffer中寫入資料
- 直接寫入資料到Buffer
// 通過Channel寫入,即將Channel資料讀取到buffer中
int len = channel.read(buffer);
// 直接寫入,呼叫put方法
buffer.put(127);
需要注意的是,put()方法有多重實現,你可以使用不同的方式寫入資料,例如:寫入到特定的位置,寫入一個位元組陣列等。
3.flip()寫切換到讀
flip()方法是將buffer由寫模式切換到讀模式的方法,flip()方法將position重置為0,將limit設定為已經寫入的最大位置,也就是position從標記寫入位置改變為標記都區位置;原始碼中flip()方法的實現如下:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
4.從buffer中讀取資料
從buffer中讀取資料同樣有兩種方法:
- 通過Channel從Buffer中讀取資料
- 直接從Buffer中讀取資料
// 使用Channel讀取資料,即將資料寫入Channel
int len = channel.write(buffer);
// 直接讀取資料
byte data = buffer.get();
同樣get()方法也有很多過載實現,允許我們使用不同的方法讀取資料,可以參考Buffer實現類文件檢視更多細節。
5.倒回rewind()
rewind()倒回方法只是將position重置為0,limit仍保持原值;一般在讀模式下使用可以讓我們重複讀取buffer中的資料;在寫模式下則會導致重新寫入資料(類似於置空了buffer)。原始碼:
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
6.clear()和compact()
一旦完成讀操作,我們需要讓buffer重新改變為寫模式,以便可以重新向buffer寫入新的資料,buffer通過clear()和compact()來完成。
當呼叫clear的時候position會重置為0,limit設定為capacity,雖然buffer中的資料未被擦除,但邏輯上相當於buffer被清空了,因為新寫入的資料會覆蓋舊資料,如果buffer中還有未被讀取的資料,這些資料依然會被覆蓋!
clear原始碼實現,可以和rewind的比較一下,看有什麼區別:
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
如果希望保留buffer中還未讀取的資料,只是清理已讀取的資料來騰出寫入空間,則可以通過compact()方法實現;compact()方法會拷貝未讀入的資料到buffer記憶體空間的起始位置,然後將position設定到未讀取資料元素的最後位置,limit值仍然為buffer的capacity,現在buffer就有了更多的空間供寫入資料。我們可以看一下HeapByteBuffer的原始碼實現:
public ByteBuffer compact() {
//複製資料
System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
// 重置position位置
position(remaining());
// limit設定為capacity
limit(capacity());
discardMark();
return this;
}
7.mark()和reset()
mark和reset方法是配合使用的一組方法,你可以通過mark()方法標記buffer中的一個位置,經過讀寫操作後position位置會改變,然後你就可以使用reset()方法使position位置回到mark()方法標記的位置。
buffer.mark();
...; // 讀或寫操作
buffer.reset(); // 回到標記位置
8.equals()
可以通過equals和compareTo()方法來比較兩個buffer,equals判斷條件:
1. 兩個buffer是否同一型別;
2. 是否持有相同數量的資料;
3. 持有的資料是否每個元素都相同。
9.Scatter和Gather
Java NIO內建支援分散(Scatter)和聚集(Gather),Scatter和Gather是用於讀取和寫入Channel的概念。
Scatter是指從一個Channel中分散讀取資料到一個或多個Buffer的操作,因此Channel將資料分散到多個Buffer中;
Gather是指將一個或多個Buffer中的資料寫入一個Channel的操作,一次Channel可以從多個Buffer中收集資料。
Scatter和Gather在解決傳輸資料擁有多個部分需要進行分離的場景下有很大的用處;比如,一個訊息資料中包含訊息頭(header)和訊息體(body)兩部分,我們就可以將訊息頭和訊息體分別讀入不同的Buffer儲存,使得訊息的分離處理更加方便。
- Scatter操作
將Channel中的資料讀取到多個Buffer
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] buffers = { header, body };
channel.read(buffers);
當Channel的read()方法傳入引數為buffer資料的時候,read()方法會按照順序將資料寫入到傳入的多個buffer中,當一個buffer寫滿後便會寫入下一個buffer直到寫滿所有的buffer;因為分離讀取的時候,Channel寫入buffer的資料是按順序的,Scatter操作並不適合動態長度的資料傳輸,也就意味著傳輸資料的每一部分都是固定長度時,Scatter才能發揮它的作用。
- Gather操作
Gather操作將多個buffer的資料寫入到同一個Channel
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
// 寫入資料
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
channel的write()方法可以接受buffer資料作為引數,write()方法會按照順序將多個buffer中的資料依次寫入channel。需要注意的是,write()操作只會寫入buffer中已寫入的資料,即position到limit之間的資料;例如一個buffer的容量為128位元組,但buffer中只寫入了28位元組的資料,只有這28個位元組會寫入channel中,因此Gather操作和Scatter相反非常適合動態長度資料寫入。
3.Selector
Selector是Java NIO中用於管理一個或多個Channel的元件,控制決定對哪些Channel進行讀寫;通過使用Selector讓一個單執行緒可以管理多個Channel甚至多個網路連線。
使用Selector最大的優勢就是可以在較少的執行緒中控制更多的Channel。事實上我們可以使用一個執行緒控制需要使用的所有Channel。作業系統執行緒的執行和切換需要一定的開銷,使用的執行緒越小,系統開銷也就越少;因此使用Selector可以節省很多系統開銷。下圖展示了一個執行緒使用Selector控制三個Channel的情形。
1.建立Selector
Selector selector = Selector.open();
2.註冊Channel
想要通過Selector中控制Channel,必須將Channel註冊到Selector中,通過SelectableChannel.register()方法實現。
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
需要注意的是註冊到Selector的Channel必須是非阻塞模式的(non-blocking),FileChannel是無法使用的因為FileChannel無法切換到非阻塞模式,SocketChannel非常適合配合Selector使用。
register方法的第二個引數是監聽設定,用於設定註冊的channel通過Selector監聽的操作事件型別,總共有四類事件可以監聽:
- Connect
- Accept
- Read
- Write
JavaNIO中在SelectionKey中有四個靜態變量表示這四類事件:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
如果在註冊Channel的時候希望監聽多個事件可以使用“|”連線靜態變數
SelectionKey key = channel.register(selector,
SelectionKey.OP_READ|SelectionKey.OP_WRITE);
3.SelectionKey物件
Channel註冊到Selector後會返回一個SelectionKey物件,這個物件包含了下面一些重要屬性:
- 事件監聽集合(interest set)
監聽集合(interest set)是channel在selector監聽的事件型別的集合,可以同SelectionKey讀寫這個配置。
int interestSet = selectionKey.interestOps();
// 通過 & 操作判斷監聽配置是否包含某類事件
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
- 就緒結合(ready set)
就緒集合(ready set)是channel已經就緒的操作的集合,我們主要在一個selection操作後訪問就緒集合。
int readySet = selectionKey.readyOps();
// 可以使用和interest set 同樣的方法測試集合中是否包含某類事件,
// 也可以通過呼叫下邊的一些方法進行判斷:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
- Channel物件
Channel channel = selectionKey.channel();
- Selector物件
Selector selector = selectionKey.selector();
- 一個可選附屬物件(an attached object (optional) )
可以給SelectionKey新增一個附加物件,通常用來標記Channel或者Channel的特徵資訊。例如,我們可以將和Channel配合使用的Buffer附加到SelectionKey上。
// 附加物件
selectionKey.attach(theObject);
// 獲取附加物件
Object attachedObj = selectionKey.attachment();
// 還可以再註冊channel的時候直接新增附加物件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
4.通過Selector選擇Channel
將多個Channel註冊到Selector後,我們就可以通過呼叫select()方法選擇監聽了特定事件(connect,accept,read,write)並且已經就緒的Channel。換種說法就是,如果你已經註冊了一個監聽read事件的channel,它就會通過select()方法接收到read事件。
select方法有幾種不同的過載:
- int select():阻塞直到至少有一個channel對監聽的事件操作準備就緒
- int select(long timeout):和select()方法一樣,但只會阻塞到指定的超時時間;
- int selectNow():不會阻塞,無論是否有就緒的channel都會立即返回。
三個方法的返回值是最後一次呼叫select()後就緒的channel的數量,如果你呼叫select()返回1,表示呼叫select()後有一個channel準備就緒了;當你再次呼叫sleect()時再返回1,表示這次又有一個channel就緒了,如果對第一次呼叫就緒的channel沒有做任何操作,這時總共有兩個已經準備就緒的channel,在兩次呼叫中都只有一個channel變為就緒狀態。
5.selectionKey()
呼叫select()方法返回就緒channel個數後,可以呼叫selectedKeys()方法獲取就緒channel的SelectionKey集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
我們可以通過這個集合訪問已經就緒的channel
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
上邊程式碼演示了遍歷每一個SelectionKey並判斷SelectionKey持有的channel引用就緒的事件。
注意迴圈中最後keyInterator.remove()方法,這裡並不是將SelectionKey物件從selector中移除,只是從就緒集合中移除,對channel操作後必須呼叫這個方法,當下一次channel就緒後,它的SelectionKey還會被加入到就緒集合中。
6.wakeUp()
一個執行緒呼叫select()後可以通過再次呼叫select()離開阻塞狀態;也可以通過其他執行緒呼叫wakeUp()方法是阻塞在select()的Selecor立即返回。
7.close()
使用完Selector後可以使用close()方法關閉它,這會關閉Selector和清除註冊到Selector的SelecionKey物件,但Channel本身並不會關閉。
8.完整流程(虛擬碼)
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}