Java.NIO程式設計一覽筆錄
Java標準IO 與 Java NIO 的簡單差異示意:
Java標準IO |
Java NIO |
|
---|---|---|
API呼叫 |
簡單 |
複雜 |
底層實現 |
面向流(stream),單向 |
面向通道(channel),釋放CPU、記憶體壓力 |
成效 |
同步阻塞 |
同步非阻塞 |
資料窺視 |
阻塞讀取,要麼足夠,要麼沒有 |
使用緩衝區(Buffer), 讀資料時需要檢查是否足夠 |
處理資料的執行緒數 |
1:1(一個執行緒處理一個流) |
1:N(選擇器(Selector),多路複用,可以一個或幾個少量執行緒管理多個通道) |
Java NIO知識體系圖:
- 1、NIO是什麼?
Java NIO(New IO)提供一種替代標準Java IO的API(從Java1.4開始),包名為 java.nio.*。
- 2、NIO提供了什麼特性?
Java NIO提供了同步非阻塞IO的特性,使得IO不再依賴stream(流),而藉助channel(通道)實現非阻塞式響應,避免執行緒上下文切換的開銷。
- 3、NIO基本概念
Java NIO由三個核心部分組成:Buffer(緩衝區)、Channel(通道)、Selector(選擇器)。
另外的Charset(字符集),提供提供Unicode字串編碼轉換到位元組序列以及反編碼的API。
- 4、Buffer(緩衝區)
Buffer,顧名思義為緩衝區,實際上是一個線性且有限的陣列記憶體容器。
可以從channel(通道)中讀取資料到Buffer,也可以從Buffer寫資料到channel(通道)。
實現大體有 ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。
(1)基本屬性
容量(capacity):所包含的元素個數。快取區的容量不能為負,且一旦定義無法變更。
限制(limit):第一個不應該被讀取或寫入的索引。快取區的限制不能為負,且不能大於容量(capacity)。
位置(position):下一個要讀取或寫入的索引。不能為負數,也不能超過限制(limit)。
標記(mark):標記當前位置的索引,暫存。如果定義了標記,則在將位置或限制調整為小於該標記的值時,該標記將被丟棄。
剩餘(remaining):當前位置與限制之間的元素數 (limit - position)。
標記、位置、限制和容量值遵守以下:
0《= 標記(mark)《=位置(position)《=限制(limit)《=容量(capacity)
位置(position)、限制(limit)、容量(capacity)在讀寫模式下的示意圖:
capacity限定你可以操作的記憶體塊大小,capacity個byte、long,char等型別。
在讀模式下,limit表示你最多可以讀取多少資料。在切換讀模式時,limit會設定為寫模式當前的position。這樣,你就可以讀取之前寫入的所有資料。
在寫模式下,limit等於capacity,表示最多可以寫入capacity個數據。
(2)Buffer的基本用法
使用Buffer讀寫資料一般遵循以下四個步驟:
1、建立並分配指定大小的buffer空間
2、寫入資料到Buffer
3、呼叫flip()方法
4、從Buffer中讀取資料 (如有必要,需要檢查是否足夠)
5、呼叫clear()方法或者compact()方法
示例:
java.nio.ByteBuffer byteBuffer = ByteBuffer.allocate(5);
//清空資料,切換寫模式,準備寫資料
byteBuffer.clear();
//byteBuffer.put((byte) 1);
//切換讀模式,準備讀資料
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
//操作資料
}
(3)例項化方法
位元組緩衝區要麼是直接的,要麼是非直接的。如果為直接位元組緩衝區,則 Java 虛擬機器會盡最大努力直接在此緩衝區上執行本機 I/O 操作。也就是說,在每次呼叫基礎作業系統的一個本機 I/O 操作之前(或之後),虛擬機器都會盡量避免將緩衝區的內容複製到中間緩衝區中(或從中間緩衝區中複製內容)。 使用allocateDirect方法可以一次性分配capacity大小的連續位元組空間。通過allocateDirect方法來建立具有連續空間的ByteBuffer物件雖然可以在一定程度上提高效率,在一些作業系統平臺上會使效率大幅度提高,而在另一些作業系統平臺上,效能會表現得非常差。謹慎使用直接緩衝區,除非你明確了確實有效能提升。
a、非直接緩衝區 - 使用allocate建立
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocate(10);
b、直接緩衝區 - 使用allocateDirect建立
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocateDirect(10);
c、靜態warp方法建立
byte[] bytes = new byte[]{1, 2, 3, 4, 5};
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.wrap(bytes);
(4)從Buffer中讀資料
有兩種方式從Buffer中讀資料:
a、從Buffer中讀取資料寫到channel(通道)
int bytesWritten = inChannel.write(buf);
b、使用Buffer的get方法
byte aByte = buf.get();
(5)向Buffer寫資料
有兩種方式向Buffer寫資料:
a、從channel(通道)中讀取資料寫到Buffer
int bytesRead = inChannel.read(buf); //read into buffer.
b、使用Buffer的put方法
buf.put(127);
(6)讀寫模式切換
Buffer存在以下方法可以切換讀寫模式:
清除(clear):(position=0,limit=capacity,mark=-1) 位置(position)置為0,限制(limit)設為容量(capacity),並丟棄標記(mark)。
反轉(flip): (limit=position,position=0,mark=-1) 限制(limit)置為當前位置(position),然後位置(position)置為0,並丟棄標記(mark)。
重繞(rewind):(position=0,mark=-1) 位置(position)置為0,限制(limit)保持不變,並丟棄標記(mark)。
壓縮(compact):(copy未讀資料到前端,position=remaining,limit=capacity,mark=-1)將緩衝區的當前位置和界限之間的位元組(如果有)複製到緩衝區的開始處,然後將緩衝區的位置設定為remaining,限制(limit)設為容量(capacity),並丟棄標記(mark)。
因此,從讀模式切換寫模式,使用清除(clear)。
從寫模式切換讀模式,使用反轉(flip)。
讀寫模式混用,使用重繞(rewind)。
(7)標記與重置
標記(mark):當前位置設為標記(mark=position)
重置(reset):位置設定為以前標記的位置(position=mark)
(8)共享緩衝區
duplicate - 共享底層緩衝區,內容互相可見,位置、限制和標記互相獨立
slice - 共享緩衝區子部分(從共享發生位置起),該部分內容互相可見,位置、限制和標記互相獨立
warp - 包裝,將byte陣列引用為快取區陣列,如果快取區內容變更,byte陣列也相應變更
as檢視 :
共享部分或者全部緩衝區,內容互相可見,位置、限制和標記互相獨立。
remaining>1 (除以2) |
asCharBuffer |
asShortBuffer |
---|---|---|
remaining>2 (除以4) |
asIntBuffer |
asFloatBuffer |
remaining>3 (除以8) |
asLongBuffer |
asDoubleBuffer |
asReadOnlyBuffer(只讀) |
示例:
public static void main(String[] args) {
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocate(31);
printBuffer(byteBuffer1, "byteBuffer1");
/**
* byteBuffer1的remaining>1 (除以2)
*/
CharBuffer charBuffer = byteBuffer1.asCharBuffer();
printBuffer(charBuffer, "charBuffer");
charBuffer.put('a');
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>1 (除以2)
*/
ShortBuffer shortBuffer = byteBuffer1.asShortBuffer();
printBuffer(shortBuffer, "shortBuffer");
shortBuffer.put((short) 3);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>2 (除以4)
*/
IntBuffer intBuffer = byteBuffer1.asIntBuffer();
printBuffer(intBuffer, "intBuffer");
intBuffer.put(4);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>3 (除以8)
*/
LongBuffer longBuffer = byteBuffer1.asLongBuffer();
printBuffer(longBuffer, "longBuffer");
longBuffer.put(120);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>2 (除以4)
*/
FloatBuffer floatBuffer = byteBuffer1.asFloatBuffer();
printBuffer(floatBuffer, "floatBuffer");
floatBuffer.put(9f);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>3 (除以8)
*/
DoubleBuffer doubleBuffer = byteBuffer1.asDoubleBuffer();
printBuffer(doubleBuffer, "doubleBuffer");
doubleBuffer.put(1);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
}
private static void printBuffer(Buffer buffer, String name) {
System.out.println((name != null && !name.isEmpty() ? name + " " : "") + "position="
+ buffer.position() + ",limit=" + buffer.limit()
+ ",remaining=" + buffer.remaining() + ",capacity=" + buffer.capacity());
}
(9)位元組序(ByteOrder)
在電腦科學領域中,位元組序是指存放多位元組資料的位元組(byte)的順序,典型的情況是整數在記憶體中的存放方式和網路傳輸的傳輸順序。
在不同處理器,機器的位元組序是可能不一致的,在跨平臺處理資料的時候,位元組序的調整也就變得有必要了。
public static void main(String[] args) {
String string = "abcde";
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocate(10);
System.out.println(byteBuffer1.order());
byteBuffer1.rewind();//位置設定為 0 並丟棄標記
byteBuffer1.order(ByteOrder.BIG_ENDIAN);
byteBuffer1.asCharBuffer().put(string);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
byteBuffer1.rewind();//位置設定為 0 並丟棄標記
byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);
byteBuffer1.asCharBuffer().put(string);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* 無效用法1,只更改order,不重新填充資料,儲存是不會改變的,只有下次才生效
*/
byteBuffer1.rewind();//位置設定為 0 並丟棄標記
byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* 無效用法2,填充完資料再改order,儲存是不會改變的,只有下次才生效
*/
byteBuffer1.rewind();//位置設定為 0 並丟棄標記
byteBuffer1.asCharBuffer().put(string);
byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
}
(10)其他一些坑的記錄
ByteBuffer 的 array()方法與其他的,如CharBuffer不是一個樣子處理:
ByteBuffer的array()會把所有容量元素返回,正確做法如下:
/**
* 轉化byte陣列
*
* @param byteBuffer
* @return
*/
public static byte[] readToBytes(ByteBuffer byteBuffer) {
byteBuffer.flip();
// Retrieve bytes between the position and limit
// (see Putting Bytes into a ByteBuffer)
byte[] bytes = new byte[byteBuffer.remaining()];
// transfer bytes from this buffer into the given destination array
byteBuffer.get(bytes, 0, bytes.length);
byteBuffer.clear();
return bytes;
}
- 5、Channel(通道)
(1)工作原理
在作業系統知識中,通道指的是獨立於CPU的專管I/O的控制器,控制外圍I/O裝置與記憶體進行資訊交換。在採用通道方式的指令系統中,除了供CPU程式設計使用的機器指令系統外,還設定另外供通道專用的一組通道指令,用通道指令編制通道程式,讀取或存入I/O裝置。當需要進行I/O操作時,CPU只需啟動通道,然後可以繼續執行自身程式,通道則執行通道程式,管理與實現I/O操作,當完成時通道再彙報CPU即可。
如此,通道使得CPU、記憶體與I/O操作之間達到更高的並行程度。
(2)Channel(通道) vs. Stream(流)
Stream(流),是單向的,不能在一個流中讀寫混用,要麼一路讀直到關閉,要麼一路寫直到關閉。同時流是直接依賴CPU指令,與記憶體進行I/O操作,CPU指令會一直等I/O操作完成。
而Channel(通道) ,是雙向的,可以藉助Buffer(緩衝區)在一個通道中讀寫混用,可以交叉讀資料、寫資料到通道,而不用在讀寫操作後立刻關閉。另外還可以在兩個通道中直接對接傳輸。通道不依賴CPU指令,有專用的通道指令,在接收CPU指令,就可以獨立與記憶體完成I/O操作,只有在I/O操作完成後通知CPU,在此期間CPU是不用一直等待。
(3)通道的分類
Java Channel(通道) ,提供了各種I/O實體的連線,主要涵蓋檔案、網路(TCP、UDP)、管道三個方面。
大體實現:FileChannel、ServerSocketChannel、SocketChannel、DatagramChannel、Pipe.SinkChannel、 Pipe.SourceChannel。
檔案通道 |
網路(套接字)通道 |
管道 |
|
---|---|---|---|
實現 |
FileChannel |
ServerSocketChannel、 SocketChannel、 DatagramChannel |
Pipe.SinkChannel、 Pipe.SourceChannel |
繼承的抽象類 |
源自AbstractInterruptibleChannel,可中斷通道。 |
源自AbstractSelectableChannel,提供註冊、登出、關閉,設定阻塞模式,管理當前選擇鍵集,支援“多路複用”功能。 |
同左 |
例項化方法 |
通過FileInputStream、FileOutputStream、RandomAccessFile獲取檔案通道 |
通過Selector選擇器註冊監聽,返回一個表示該通道已向選擇器註冊的新 SelectionKey 物件。 |
同左 |
- 6、檔案通道(FileChannel)
(1)開啟FileChannel(例項化)
在使用FileChannel之前,需要先開啟它。但是我們無法直接開啟一個FileChannel,需要通過FileInputStream、 FileOutputStream、RandomAccessFile獲取FileChannel例項。
java.nio.channels.FileChannel channel = null;
FileInputStream fis = null;
try {
fis = new FileInputStream(path);
channel = fis.getChannel();
} finally {
if (channel != null)
channel.close();
if (fis != null)
fis.close();
}
(2)從FileChannel讀取資料、向FileChannel寫資料
read與write:與其他Channel一樣,讀寫藉助Buffer傳輸,需要注意的是往Channel寫資料(即Buffer讀資料寫入Channel)需要檢查資料是否足夠。
position:獲取、設定當前位置
size:獲取此FileChannel的檔案的當前大小
force(boolean) :強制將所有對此通道的檔案更新寫入包含該檔案的儲存裝置中。保證檔案及時更新到儲存裝置,特別是寫資料時。
truncate:將此通道的檔案擷取為給定大小。
public static void main(String[] args) throws IOException {
final String path = "file.txt";
write(path);//寫檔案
write2(path);//特定位置讀寫
read(path);//讀檔案
System.out.println();
truncate(path);//檔案擷取
read(path);//讀檔案
}
/**
* FileChannel 讀檔案
*/
private static void read(String path) throws IOException {
java.nio.channels.FileChannel channel = null;
FileInputStream fis = null;
try {
fis = new FileInputStream(path);
channel = fis.getChannel();
ByteBuffer buffer1 = ByteBuffer.allocate(1024);
// 從入channel讀取資料到buffer
buffer1.rewind();
while (channel.read(buffer1) > 0) {
//讀取buffer
buffer1.flip();
Charset charset = Charset.defaultCharset();
CharBuffer charBuffer = charset.decode(buffer1);
System.out.print(charBuffer);
}
} finally {
if (channel != null)
channel.close();
if (fis != null)
fis.close();
}
}
/**
* FileChannel 寫檔案
*/
private static void write(String path) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap("趙客縵胡纓,吳鉤霜雪明。銀鞍照白馬,颯沓如流星。n".getBytes());
java.nio.channels.FileChannel channel = null;
FileOutputStream fos = null;
try {
fos = new FileOutputStream(path);
channel = fos.getChannel();
//強制刷出到記憶體
channel.force(true);
// 從buffer讀取資料寫入channel
buffer.rewind();
channel.write(buffer);
} finally {
if (channel != null)
channel.close();
if (fos != null)
fos.close();
}
}
/**
* FileChannel 特定位置讀寫
*/
private static void write2(String path) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap("十步殺一人,千里不留行。事了拂衣去,深藏身與名。n".getBytes());
java.nio.channels.FileChannel channel = null;
RandomAccessFile file = null;
try {
file = new RandomAccessFile(path, "rw");
channel = file.getChannel();
channel.position(channel.size()); //定位到檔案末尾
//強制刷出到記憶體
channel.force(true);
// 從buffer讀取資料寫入channel
buffer.rewind();
channel.write(buffer);
} finally {
if (channel != null)
channel.close();
if (file != null) {
file.close();// 關閉流
}
}
}
/**
* FileChannel 檔案擷取
*/
private static void truncate(String path) throws IOException {
java.nio.channels.FileChannel channel = null;
RandomAccessFile file = null;
try {
file = new RandomAccessFile(path, "rw");
channel = file.getChannel();
/**
* 擷取檔案前36byte
*/
channel.truncate(36);
} finally {
if (channel != null)
channel.close();
if (file != null) {
file.close();// 關閉流
}
}
}
(3)獨佔鎖定
lock() 與 tryLock() - 獲取或嘗試獲取對此通道的檔案的獨佔鎖定。
FileLock lock(long position, long size, boolean shared) 與 FileLock tryLock(long position, long size, boolean shared) 獲取或嘗試獲取此通道的檔案給定區域上的鎖定。 shared的含義:是否使用共享鎖,一些不支援共享鎖的作業系統,將自動將共享鎖改成排它鎖。可以通過呼叫 isShared() 方法來檢測獲得的是什麼型別的鎖。
- 共享鎖與獨佔鎖的區別:
共享鎖: 如果一個執行緒獲得一個檔案的共享鎖,那麼其它執行緒可以獲得同一檔案的共享鎖或同一檔案部分內容的共享鎖,但不能獲取獨佔鎖。
獨佔鎖: 只有一個讀或一個寫(讀和寫都不能同時)。獨佔鎖防止其他程式獲得任何型別的鎖。
- lock()和tryLock()的區別:
lock()阻塞式的,它要阻塞程序直到鎖可以獲得,或呼叫 lock() 的執行緒中斷,或呼叫 lock() 的通道關閉。鎖定範圍可以隨著檔案的增大而增加。無參lock()預設為獨佔鎖;有參lock(0L, Long.MAX_VALUE, true)為共享鎖。
tryLock()非阻塞,當未獲得鎖時,返回null.
- FileLock是執行緒安全的
- FileLock的生命週期:在呼叫FileLock.release(),或者Channel.close(),或者JVM關閉
注意:
- 同一程序內,在檔案鎖沒有被釋放之前,不可以再次獲取。即在release()方法呼叫前,只能lock()或者tryLock()一次。
- 檔案鎖定以整個 Java 虛擬機器來保持。但它們不適用於控制同一虛擬機器內多個執行緒對檔案的訪問。
- 對於一個只讀檔案或是隻讀channel通過任意方式加鎖時會報NonWritableChannelException異常
- 對於一個不可讀檔案或是不可讀channel通過任意方式加鎖時會報NonReadableChannelException異常
示例:
---------同一程序 - 讀讀重疊
兩個讀執行緒都是阻塞式獲取共享鎖。
lock = channel.lock(0L, Long.MAX_VALUE, true);
同一程序,即使是共享鎖,同時讀並且重疊,一樣 檔案重疊鎖異常【OverlappingFileLockException】
---------不同程序共享鎖- 讀讀
兩個程序讀執行緒都是阻塞式獲取共享鎖。
lock = channel.lock(0L, Long.MAX_VALUE, true);
根據結果,我們看到第二程序讀的時候,獲取共享鎖(18:46:03獲取),第一程序的共享鎖還沒釋放(18:46:05釋放)。
驗證了共享鎖允許同時讀的特性。
---------不同程序-讀寫
FileInputStream讀程序:lock = channel.lock(0L, Long.MAX_VALUE, true); 阻塞獲取共享鎖
FileOutputStream寫程序:lock = channel.lock(); 阻塞獲取獨佔鎖
我們嘗試先後不同啟動讀程序、寫程序,發現兩者都沒有異常,同時都是等待另外一個程序完成並釋放鎖再獲取檔案鎖。
---------不同程序-寫寫
類似的,驗證了獨佔鎖的特性。
- 7、網路TCP通道(ServerSocketChannel、SocketChannel)
7.1、ServerSocketChannel基本用法
(1)開啟 ServerSocketChannel
通過呼叫 ServerSocketChannel.open() 方法來開啟ServerSocketChannel
ServerSocketChannel channel= ServerSocketChannel.open();
(2)關閉 ServerSocketChannel
通過呼叫ServerSocketChannel.close() 方法來關閉ServerSocketChannel. 如:
//關閉 ServerSocketChannel
if (channel != null) {
channel.close();
}
(3)將 ServerSocket 繫結到特定地址(IP 地址和埠號)
channel.bind(new InetSocketAddress("127.0.0.1", 9595));
(4)監聽新進來的連線
在阻塞模式下,accept()方法會一直阻塞到有新連線 SocketChannel到達。
在while迴圈中呼叫 accept()方法. 如:
while (true) {
// 監聽新進來的連線
java.nio.channels.SocketChannel socketChannel = channel.accept();
//do something with socketChannel...
}
(5)非阻塞模式
ServerSocketChannel可以設定成非阻塞模式。
在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。
因此,需要檢查返回的SocketChannel是否是null.如:
// 設定非阻塞模式,read的時候就不再阻塞
channel.configureBlocking(false);
while (true) {
// 監聽新進來的連線
java.nio.channels.SocketChannel socketChannel = channel.accept();
if (socketChannel == null) {
// System.out.println("沒有客戶端連線");
TimeUnit.SECONDS.sleep(1);
continue;
}
//do something with socketChannel...
}
7.2、SocketChannel基本用法
SocketChannel 與 ServerSocketChannel類似,區別只在於需要指定連線的伺服器:
// 開啟SocketChannel
SocketChannel channel = SocketChannel.open();
// 設定非阻塞模式,read的時候就不再阻塞
channel.configureBlocking(false);
// tcp連線網路
channel.connect(new InetSocketAddress("127.0.0.1", 9595));
if (channel.finishConnect()) {// 連線伺服器成功
//do something
}
7.3、一個簡單的模擬TCP介面的Demo
這裡為方便學習交流,僅使用基本api,暫時沒有使用Selector(選擇器)。
服務端:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.TimeUnit;
/**
* ServerSocketChannel示例
*
* @author Sven Augustus
*/
public class ServerSocketChannelTest {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
java.nio.channels.ServerSocketChannel channel = null;
try {
// 開啟 ServerSocketChannel
channel = ServerSocketChannel.open();
// 設定非阻塞模式,read的時候就不再阻塞
channel.configureBlocking(false);
// 將 ServerSocket 繫結到特定地址(IP 地址和埠號)
channel.bind(new InetSocketAddress("127.0.0.1", 9595));
while (true) {
// 監聽新進來的連線
java.nio.channels.SocketChannel socketChannel = channel.accept();
if (socketChannel == null) {
// System.out.println("沒有客戶端連線");
TimeUnit.SECONDS.sleep(1);
continue;
}
System.out.println("準備讀:");
// 讀取客戶端傳送的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
socketChannel.read(buffer);
buffer.flip();// 讀取buffer
Object object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
// 往客戶端寫資料
String serializable = "您好,客戶端" + socketChannel.getRemoteAddress();
System.out.println("準備寫:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
socketChannel.write(byteBuffer);
}
} finally {
//關閉 ServerSocketChannel
if (channel != null) {
channel.close();
}
}
}
}
客戶端:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* SocketChannel示例
*
* @author Sven Augustus
*/
@SuppressWarnings("unused")
public class SocketChannelTest {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
java.nio.channels.SocketChannel channel = null;
try {
// 開啟SocketChannel
channel = SocketChannel.open();
// 設定非阻塞模式,read的時候就不再阻塞
channel.configureBlocking(false);
// tcp連線網路
channel.connect(new InetSocketAddress("127.0.0.1", 9595));
if (channel.finishConnect()) {// 連線伺服器成功
/**
* 往服務端寫資料
*/
String serializable = "您好,ServerSocketChannel。";
System.out.println("準備寫:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
channel.write(byteBuffer);
System.out.println("準備讀:");
// 讀取服務端傳送的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int numBytesRead = -1;
while ((numBytesRead = channel.read(buffer)) != -1) {
if (numBytesRead == 0) {// 如果沒有資料,則稍微等待一下
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
buffer.flip();// 讀取buffer
Object object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
buffer.clear(); // 復位,清空
}
} else {
System.out.println("連線失敗,伺服器拒絕服務");
return;
}
} finally {
// 關閉SocketChannel
if (channel != null) {
channel.close();
}
}
}
}
- 8、網路UDP通道(DatagramChannel)
(1)TCP、UDP的區別
可以參考以下文章:http://www.cnblogs.com/visily/archive/2013/03/15/2961190.html
我們可以簡單瞭解和總結:
協議 |
基於 |
資料模式 |
資源要求 |
資料正確性 |
資料順序性 |
適用場景 |
---|---|---|---|---|---|---|
TCP |
連線 |
流或通道 |
較多 |
保證 |
保證 |
精算計算的場景 |
UDP |
無連線 |
資料報 |
較少 |
不保證,可能丟包(當然內網環境幾乎不存在) |
不保證 |
服務系統內部的通訊 |
(2)開啟 DatagramChannel
通過呼叫 DatagramChannel.open() 方法來開啟DatagramChannel
DatagramChannel channel= DatagramChannel.open();
注意的是DatagramChannel的open()方法只是開啟獲得通道,但此時尚未連線。儘管DatagramChannel無需建立遠端連線,但仍然可以通過isConnect()檢測當前的channel是否聲明瞭遠端連線地址。
(3)關閉 DatagramChannel
//關閉DatagramChannel
if (channel!= null) {
channel.close();
}
(4)接收資料
通過receive()方法從DatagramChannel接收資料,返回一個SocketAddress物件以指出資料來源。在阻塞模式下,receive()將會阻塞至有資料包到來,非阻塞模式下,如果沒有可接受的包則返回null。如果包內的資料大小超過緩衝區容量時,多出的資料會被悄悄拋棄。
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
byteBuffer.clear();
SocketAddress address = channel.receive(byteBuffer);//receive data
非阻塞模式:
while (true) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.clear();
SocketAddress socketAddress = channel.receive(byteBuffer);
if (socketAddress == null) {
// System.out.println("沒有客戶端連線");
TimeUnit.MILLISECONDS.sleep(1);
continue;
}
//do something with DatagramChannel...
}
(5)傳送資料
通過send()方法從DatagramChannel傳送資料到指定的SocketAddress物件所描述的地址。在阻塞模式下,呼叫執行緒會被阻塞至有資料包被加入傳輸佇列。非阻塞模式下,如果傳送內容為空則返回0,否則返回傳送的位元組數。
請注意send()方法返回的非零值並不表示資料報到達了目的地,僅代表資料報被成功加到本地網路層的傳輸佇列。
ByteBuffer byteBuffer = ByteBuffer.wrap(new String("i 'm client").getBytes());
int bytesSent = channel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9898));
非阻塞模式:
Serializable serializable = "您好,DatagramChannel。";
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
// 傳送資料,以下為簡單模擬非阻塞模式重發3次機制
final int TIMES = 3;
int bytesSent = 0;
int sendTime = 1;
while (bytesSent == 0 && sendTime <= TIMES) {
bytesSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9898));
sendTime++;
}
(6)連線到特定的地址
可以將DatagramChannel“連線”到網路中的特定地址的。由於UDP是無連線的,連線到特定地址並不會像TCP通道那樣建立一個真正的連線。而是鎖住DatagramChannel ,讓其只能從特定地址收發資料。當連線後,也可以使用read()和write()方法,就像在用傳統的通道一樣。只是在資料傳送方面沒有任何保證。
int bytesRead = channel.read(buf);
int bytesWritten = channel.write(but);
當通道不是已連線狀態時呼叫read()或write()方法,都將產生NotYetConnectedException異常。
(7)一個簡單的模擬UDP介面的Demo
這裡為方便學習交流,僅使用NIO基本api,暫時沒有使用Selector(選擇器)。
接收方:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.TimeUnit;
/**
* 網路UDP通道(DatagramChannel)測試 --作為服務端
*
* @author Sven Augustus
*/
public class DatagramChannelServerTest {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
DatagramChannel channel = null;
try {
//開啟DatagramChannel
channel = DatagramChannel.open();
//非阻塞模式
channel.configureBlocking(false);
//將 UDP 繫結到特定地址(IP 地址和埠號),作為服務端監聽埠
channel.bind(new InetSocketAddress("127.0.0.1", 9898));
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
SocketAddress socketAddress = channel.receive(buffer);
if (socketAddress == null) {
// System.out.println("沒有客戶端連線");
TimeUnit.MILLISECONDS.sleep(1);
continue;
}
System.out.println("準備讀:"+ socketAddress);
buffer.flip();//切換讀模式
Serializable object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
// 往客戶端寫資料
String serializable = "您好,客戶端" + socketAddress.toString();
System.out.println("準備寫:" + serializable);
ByteBuffer byteBuffer =
ByteBufferUtils.writeObject(serializable);
channel.send(byteBuffer,socketAddress);
}
} finally {
//關閉DatagramChannel
if (channel != null) {
channel.close();
}
}
}
}
傳送方:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.TimeUnit;
/**
* 網路UDP通道(DatagramChannel)測試 --作為客戶端,傳送資料
*
* @author Sven Augustus
*/
public class DatagramChannelClientTest {
public static void main(String[] args) throws IOException, ClassNotFoundException {
java.nio.channels.DatagramChannel channel = null;
try {
//開啟DatagramChannel
channel = DatagramChannel.open();
//非阻塞模式
channel.configureBlocking(false);
// 傳送資料將不用提供目的地址而且接收時的源地址也是已知的(這點類似SocketChannel),
// 那麼此時可以使用常規的read()和write()方法
channel.connect(new InetSocketAddress("127.0.0.1", 9898));
Serializable serializable = "您好,DatagramChannel。";
System.out.println("準備寫:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
// 傳送資料,以下為簡單模擬非阻塞模式重發3次機制
final int TIMES = 3;
int bytesSent = 0;
int sendTime = 1;
while (bytesSent == 0 && sendTime <= TIMES) {
//bytesSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9898));
bytesSent = channel.write(byteBuffer);
sendTime++;
}
if (bytesSent > 0) {
System.out.println("傳送成功。");
} else {
System.out.println("傳送失敗。");
}
byteBuffer.clear();
// 讀取服務端傳送的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int numBytesRead = -1;
while ((numBytesRead = channel.read(buffer)) != -1) {
if (numBytesRead == 0) {// 如果沒有資料,則稍微等待一下
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
buffer.flip();// 讀取buffer
Object object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
buffer.clear(); // 復位,清空
}
} finally {
//關閉DatagramChannel
if (channel != null) {
channel.close();
}
}
}
}
- 9、管道(Pipe.SinkChannel、 Pipe.SourceChannel)
Java NIO 管道是2個執行緒之間的單向資料連線。Pipe有一個source通道和一個sink通道。資料會被寫到sink通道,從source通道讀取。
以下是Pipe原理的圖示:
(1)建立管道
通過Pipe.open()方法開啟管道。例如:
Pipe pipe = Pipe.open();
(2)往管道寫資料
要向管道寫資料,需要訪問sink通道。像這樣:
Pipe.SinkChannel sinkChannel = pipe.sink();
然後可以呼叫write方法。
(3)從管道讀資料
從讀取管道的資料,需要訪問source通道,像這樣:
Pipe.SourceChannel sourceChannel = pipe.source();
然後可以呼叫read方法。
(4)簡單示例
package io.flysium.nio.c2_channel.pipe;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
/**
* 管道測試
* @author Sven Augustus
*/
public class PipeTest {
static class Input implements Runnable {
private final Pipe pipe;
public Input(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try {
Pipe.SourceChannel sourceChannel = pipe.source();
System.out.println("管道讀取準備。");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int bytesRead = sourceChannel.read(byteBuffer);
byteBuffer.flip();//切換讀模式
Serializable serializable =
ByteBufferUtils.readObject(byteBuffer);
System.out.println("管道讀取結果:" + serializable);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
static class Output implements Runnable {
private final Pipe pipe;
public Output(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try {
Pipe.SinkChannel sinkChannel = pipe.sink();
System.out.println("管道寫出準備。");
Serializable object = "您好啊,Pipe。";
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(object);
while (byteBuffer.hasRemaining()) {
sinkChannel.write(byteBuffer);
}
System.out.println("管道寫出完成:"+object);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
Pipe pipe = Pipe.open();
new Thread(new Input(pipe)).start();
new Thread(new Output(pipe)).start();
}
}
- 10、Selector(選擇器)
(1)Selector模式
Selector物件本質上是一個觀察者,會監視已註冊的各種通道及其事件,當應用select機制後且某通道有事件發生時,會報告該資訊。
這是一種網路事件驅動模型。分為三部分:
註冊事件:通道將感興趣的事件註冊到Selector上。
select機制:主動應用select機制,當有事件發生時,返回一組SelectionKey(鍵)。
事件處理:從SelectionKey(鍵)中獲取事件集合、就緒IO集合、註冊的通道等資訊,進行I/O操作。
(2)建立Selector
Selector selector = Selector.open();
(3)向Selector註冊通道感興趣的事件
為了將Channel和Selector配合使用,必須將channel註冊到selector上。如下:
與Selector一起使用時,Channel必須處於非阻塞模式下。
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
register第二個引數為“interest集合”,意思是通道感興趣的事件集合,亦指定Selector監聽該通道什麼事件的發生。
事件主要分四類:
SelectionKey.OP_CONNECT |
連線就緒,channel成功連線另一個伺服器。 |
---|---|
SelectionKey.OP_ACCEPT |
接收就緒,server channel成功接受到一個連線。 |
SelectionKey.OP_READ |
讀就緒,channel通道中有資料可讀。 |
SelectionKey.OP_WRITE |
寫就緒,channel通道等待寫資料。 |
如果你對不止一件事件感興趣,可以使用位移操作:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
(4)通過Selector選擇通道
應用select機制,可以返回你所感興趣的事件(如連線、接受、讀或寫)已經準備就緒的那些通道。
下面是select()方法:
- int select() 一直阻塞直到至少有一個通道註冊的事件發生了。
- int select(long timeout) 一直阻塞直到至少有一個通道註冊的事件發生了,或者已經超時timeout毫秒。
- int selectNow() 不會阻塞,不管有沒有通道就緒都立刻返回。
返回值為就緒的通道數。
一旦呼叫了select()方法,並且返回值表明有一個或更多個通道就緒了,然後可以通過呼叫selector的selectedKeys()方法,訪問“已選擇鍵集(selected key set)”中的就緒通道。如下所示:
Set selectedKeys = selector.selectedKeys();
每個元素SelectionKey(鍵),包含
- interest集合 即下一次感興趣的事件集合,確定了下一次呼叫某個選擇器的選擇方法時,將測試哪類操作的準備就緒資訊。建立該鍵時使用給定的值初始化 interest 集合;之後可通過 interestOps(int) 方法對其進行更改。
- ready集合 即通道已經準備就緒的事件的集合。
- Channel 即註冊的通道例項。
- Selector物件
- 附加的物件(可選) 即註冊時附加的物件。
可以遍歷這個已選擇的鍵集合來訪問就緒的通道。如下:
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if(key.isAcceptable()) {
// 接收就緒,server channel成功接受到一個連線。
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
//連線就緒,channel成功連線另一個伺服器。
// a connection was established with a remote server.
} else if (key.isReadable()) {
//讀就緒,channel通道中有資料可讀。
// a channel is ready for reading
} else if (key.isWritable()) {
//寫就緒,channel通道等待寫資料。
// a channel is ready for writing
}
}
每次迭代的keyIterator.remove()呼叫。Selector不會自己從已選擇鍵集中移除SelectionKey例項。必須在處理完通道時自己移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。
(5)示例
現在簡單模擬一下客戶端向伺服器迴圈傳送介面請求,請求引數是整數,服務端會計算好(其實是乘以2)返回給客戶端。
啟動若干個客戶端測試。
服務端:
package io.flysium.nio.c3_selector;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* ServerSocketChannel示例,使用Selector模式
*
* @author Sven Augustus
*/
public class ServerSocketChannelTest2 {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
ServerSocketChannel channel = null;
Selector selector = null;
try {
// 開啟 ServerSocketChannel
channel = ServerSocketChannel.open();
// 設定非阻塞模式,read的時候就不再阻塞
channel.configureBlocking(false);
// 將 ServerSocket 繫結到特定地址(IP 地址和埠號)
channel.bind(new InetSocketAddress("127.0.0.1", 9595));
// 建立Selector選擇器
selector = Selector.open();
// 註冊事件,監聽客戶端連線請求
channel.register(selector, SelectionKey.OP_ACCEPT);
final int timeout = 1000;//超時timeout毫秒
while (true) {
if (selector.select(timeout) == 0) {//無論是否有事件發生,selector每隔timeout被喚醒一次
continue;
}
Set selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {// 接收就緒,server channel成功接受到一個連線。
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannel.configureBlocking(false);// 設定非阻塞模式
// 註冊讀操作 , 以進行下一步的讀操作
socketChannel.register(key.selector(), SelectionKey.OP_READ);
} else if (key.isConnectable()) {//連線就緒,channel成功連線另一個伺服器。
} else if (key.isReadable()) {//讀就緒,channel通道中有資料可讀。
SocketChannel socketChannel = (SocketChannel) key.channel();
//System.out.println("準備讀:");
// 讀取客戶端傳送的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int readBytes = socketChannel.read(buffer);
if (readBytes >= 0) {// 非阻塞,立刻讀取緩衝區可用位元組
buffer.flip();// 讀取buffer
Object object = ByteBufferUtils.readObject(buffer);
//System.out.println(object);
//附加引數
key.attach(object);
// 切換寫操作 , 以進行下一步的寫操作
key.interestOps(SelectionKey.OP_WRITE);
} else if (readBytes < 0) { //客戶端連線已經關閉,釋放資源
System.out.println("客戶端" + socketChannel.socket().getInetAddress()
+ "埠" + socketChannel.socket().getPort() + "斷開...");
socketChannel.close();
}
} else if (key.isValid() && key.isWritable()) {//寫就緒,channel通道等待寫資料。
SocketChannel socketChannel = (SocketChannel) key.channel();
// 計算
Integer integer = Integer.parseInt(String.valueOf(key.attachment()));
String serializable = String.valueOf(integer * 2);
// 往客戶端寫資料
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
socketChannel.write(byteBuffer);
System.out.println("客戶端伺服器:" + integer + ",響應:" + serializable);
// 切換讀操作 , 以進行下一次的介面請求,即下一次讀操作
key.interestOps(SelectionKey.OP_READ);
}
}
}
} finally {
//關閉 ServerSocketChannel
if (channel != null) {
channel.close();
}
if (selector != null) {
selector.close();
}
}
}
}
客戶端:
package io.flysium.nio.c3_selector;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* SocketChannel示例,使用Selector模式
*
* @author Sven Augustus
*/
@SuppressWarnings("unused")
public class SocketChannelTest2 {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
new Thread(new ClientRunnable("A")).start();
new Thread(new ClientRunnable("B")).start();
new Thread(new ClientRunnable("C")).start();
new Thread(new ClientRunnable("D")).start();
}
private static class ClientRunnable implements Runnable {
private final String name;
private ClientRunnable(String name) {
this.name = name;
}
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
// 開啟SocketChannel
channel = SocketChannel.open();
// 設定非阻塞模式,read的時候就不再阻塞
channel.configureBlocking(false);
// tcp連線網路
channel.connect(new InetSocketAddress("127.0.0.1", 9595));
// 建立Selector選擇器
selector = Selector.open();
// 註冊事件,監聽讀/寫操作
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
final int timeout = 1000;//超時timeout毫秒
if (channel.finishConnect()) {// 連線伺服器成功
while (true) {
if (selector.select(timeout) == 0) {
continue;
}
Set selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isValid() && key.isWritable()) {//寫就緒,channel通道等待寫資料。
TimeUnit.SECONDS.sleep(3);
SocketChannel socketChannel = (SocketChannel) key.channel();
// 往服務端寫資料
String serializable = String.valueOf(new Random().nextInt(1000));
//System.out.println("準備寫:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
socketChannel.write(byteBuffer);
//附加引數
key.attach(serializable);
// 切換讀操作 , 以進行下一次的介面請求,即下一次讀操作
key.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {//讀就緒,channel通道中有資料可讀。
SocketChannel socketChannel = (SocketChannel) key.channel();
//System.out.println("準備讀:");
// 讀取服務端傳送的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int readBytes = socketChannel.read(buffer);
if (readBytes >= 0) {// 非阻塞,立刻讀取緩衝區可用位元組
buffer.flip();// 讀取buffer
Object object = ByteBufferUtils.readObject(buffer);
//System.out.println(object);
buffer.clear(); // 復位,清空
Integer integer = Integer.parseInt(String.valueOf(key.attachment()));
System.out.println("執行緒-" + name
+ ",請求伺服器:" + integer + ",響應:" + object);
// 切換寫操作 , 以進行下一步的寫操作,即介面請求
key.interestOps(SelectionKey.OP_WRITE);
} else if (readBytes < 0) { //客戶端連線已經關閉,釋放資源
System.out.println("服務端斷開...");
}
}
}
}
} else {
System.out.println("連線失敗,伺服器拒絕服務");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
// 關閉SocketChannel
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
- 11、發散/匯聚
分散(scatter)從Channel中讀取是指在讀操作時將讀取的資料寫入多個buffer中。因此,Channel將從Channel中讀取的資料“分散(scatter)”到多個Buffer中。
聚集(gather)寫入Channel是指在寫操作時將多個buffer的資料寫入同一個Channel,因此,Channel 將多個Buffer中的資料“聚集(gather)”後傳送到Channel。
scatter / gather經常用於需要將傳輸的資料分開處理的場合,例如傳輸一個由訊息頭和訊息體組成的訊息,你可能會將訊息體和訊息頭分散到不同的buffer中,這樣你可以方便的處理訊息頭和訊息體。
(1)分散
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
buffer首先被插入到陣列,然後再將陣列作為channel.read() 的輸入引數。read()方法按照buffer在陣列中的順序將從channel中讀取的資料寫入到buffer,當一個buffer被寫滿後,channel緊接著向另一個buffer中寫。分散不適用於動態訊息的處理。
(2)聚集
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
//write data into buffers
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
buffers陣列是write()方法的入參,write()方法會按照buffer在陣列中的順序,將資料寫入到channel,注意只有position和limit之間的資料才會被寫入。因此,如果一個buffer的容量為128byte,但是僅僅包含100byte的資料,那麼這100byte的資料將被寫入到channel中。聚集適用於動態訊息的處理。
更多Demo:https://git.oschina.net/svenaugustus/MyJavaIOLab
本文只針對NIO的知識總結,其他IO總結姊妹篇(IO)請參見: + Java標準I/O流程式設計一覽筆錄: https://my.oschina.net/langxSpirit/blog/830620