Java NIO 通道 Channel
Channel 是 NIO 的核心概念,它表示一個開啟的連線,這個連線可以連線到 I/O 裝置(例如:磁碟檔案,Socket)或者一個支援 I/O 訪問的應用程式。Java NIO 使用緩衝區和通道來進行資料傳輸。
一個通道在建立的時候被開啟,可以呼叫 isOpen() 來判斷一個通道是否是開啟狀態。關閉通道使用 close() 方法,一個通道一旦被關閉,將不能被重新開啟。
1. 基於緩衝區 Buffer 和通道 Channel 的資料互動
應用程式可以通過與 I/O 裝置建立通道來實現對 I/O 裝置的讀寫操作,操作的資料通過緩衝區 Buffer 來進行互動。
從 I/O 裝置讀取資料時:
1)應用程式呼叫通道 Channel 的 read() 方法;
2)通道往緩衝區 Buffer 中填入 I/O 裝置中的資料,填充完成之後返回;
3)應用程式從緩衝區 Buffer 中獲取資料。
往 I/O 裝置寫資料時:
1)應用程式往緩衝區 Buffer 中填入要寫到 I/O 裝置中的資料;
2)呼叫通道 Channel 的 write() 方法,通道將資料傳輸至 I/O 裝置。
2. NIO 中主要的通道型別與操作
這裡僅討論磁碟檔案和網路套接字的 I/O 通道,在整個 NIO 的學習中,直接記憶體對映相關內容一般指的是磁碟檔案 I/O,而 I/O 多路複用模型和選擇器則一般指網路I/O。磁碟檔案通道為 FileChannel,網路套接字通道有 TCP 相關的 SocketChannel,ServerSocketChannel 和 UDP 相關的 DatagramChannel。
2.1 FileChannel
檔案通道可以連線一個檔案,然後對檔案進行讀,寫,對映到直接記憶體。使用檔案通道操作檔案的一般流程為:
1)獲取通道。檔案通道通過 FileChannel 的靜態方法 open() 來獲取,獲取時需要指定檔案路徑和檔案開啟方式。
FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);// 獲取檔案通道
2)建立位元組緩衝區。檔案相關的位元組緩衝區有兩種,一種是基於堆的 HeapByteBuffer,另一種是基於檔案對映,放在堆外記憶體中的 MappedByteBuffer。這裡使用前者,後者相關內容可以參考:Java NIO 中的 MappedByteBuffer。
ByteBuffer buf = ByteBuffer.allocate(10); // 分配位元組快取
3)讀寫操作。
讀取資料。一般需要一個迴圈結構來讀取資料,讀取資料時需要注意切換 ByteBuffer 的讀寫模式。
while (channel.read(buf) != -1){ // 讀取通道中的資料,並寫入到 buf 中
buf.flip(); // 快取區切換到讀模式
while (buf.position() < buf.limit()){ // 讀取 buf 中的資料
text.append((char)buf.get());
}
buf.clear(); // 清空 buffer,快取區切換到寫模式
}
寫入資料。
for (int i = 0; i < text.length(); i++) {
buf.put((byte)text.charAt(i)); // 填充緩衝區,需要將 2 位元組的 char 強轉為 1 自己的 byte
if (buf.position() == buf.limit() || i == text.length() - 1) { // 快取區已滿或者已經遍歷到最後一個字元
buf.flip(); // 將緩衝區由寫模式置為讀模式
channel.write(buf); // 將緩衝區的資料寫到通道
buf.clear(); // 清空快取區,將緩衝區置為寫模式,下次才能使用
}
}
4)將資料刷出到物理磁碟。FileChannel 的 force(boolean metaData) 方法可以確保對檔案的操作能夠更新到磁碟。metaData 為 true 表示不僅要刷出資料,還要刷入檔案的元資料,如:修改時間。
channel.force(false);
5)關閉通道。
channel.close();
下面給出一個檔案通道的具體示例。示例中 writeText() 將字串寫入到檔案當中,然後 readText() 再將內容讀出來。這裡為了簡單起見,示例程式碼中字串只能包含 ASCII 字元,而不能包含中文字或其它特殊字元;否則會亂碼。
public class FileChannelReadWrite {
public static void main(String[] args) throws IOException {
String fileName = "data.txt";
String text = "Hello, welcome to Robothy's blog.";
writeText(fileName, text);
System.out.println(readText(fileName));
}
static String readText(String fileName) throws IOException {
FileChannel channel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);// 獲取檔案通道
ByteBuffer buf = ByteBuffer.allocate(10); // 分配位元組快取
StringBuilder text = new StringBuilder();
while (channel.read(buf) != -1){ // 讀取通道中的資料,並寫入到 buf 中
buf.flip(); // 快取區切換到讀模式
while (buf.position() < buf.limit()){ // 讀取 buf 中的資料
text.append((char)buf.get());
}
buf.clear(); // 清空 buffer,快取區切換到寫模式
}
channel.close(); // 關閉通道
return text.toString();
}
static void writeText(String fileName, String text) throws IOException {
// 獲取檔案通道
FileChannel channel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
ByteBuffer buf = ByteBuffer.allocate(10); // 建立位元組緩衝區
for (int i = 0; i < text.length(); i++) {
buf.put((byte)text.charAt(i)); // 填充緩衝區,需要將 2 位元組的 char 強轉為 1 自己的 byte
if (buf.position() == buf.limit() || i == text.length() - 1) { // 快取區已滿或者已經遍歷到最後一個字元
buf.flip(); // 將緩衝區由寫模式置為讀模式
channel.write(buf); // 將緩衝區的資料寫到通道
buf.clear(); // 清空快取區,將緩衝區置為寫模式,下次才能使用
}
}
channel.force(false); // 將資料刷出到磁碟,不刷出文件元資料
channel.close(); // 關閉通道
}
}
關於 FileChannel 的更多詳細用法:Java NIO 檔案通道 FileChannel 用法。
2.2 SocketChannel
SocketChannel 負責 TCP 套接字的連線和資料傳輸,客戶端和服務端都需要用到。SocketChannel 是執行緒安全的,支援多執行緒訪問。
SocketChannel 阻塞連線和非阻塞連線。對於阻塞連線,讀取資料時會阻塞,直到有資料過來或者連線被關閉;對於非阻塞連線,呼叫 read() 方法時無論是否有資料都會立即返回。可以呼叫 configureBlocking(boolean block) 來配置為阻塞通道或非阻塞通道。
SocketChannel 可以由服務端或者客戶端發起關閉。假設客戶端在寫資料時,服務端關閉了連線,客戶端 write() 方法會丟擲 AsynchronousCloseException;假設客戶端在讀取資料時,服務端關閉了連線,read() 方法會立即返回 -1
,此時緩衝區中沒有內容。
TCP 客戶端使用 SocketChannel 與服務端進行互動的流程為:
1)開啟通道,連線到服務端。
SocketChannel channel = SocketChannel.open(); // 開啟通道,此時還沒有開啟 TCP 連線
channel.connect(new InetSocketAddress("localhost", 9090)); // 連線到服務端
這兩句也可以合併起來寫。
SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
2)分配緩衝區。
ByteBuffer buf = ByteBuffer.allocate(10); // 分配一個 10 位元組的緩衝區,不實用,容量太小
3)配置是否為阻塞方式。(預設為阻塞方式)
channel.configureBlocking(false); // 配置通道為非阻塞模式
如果配置了非阻塞模式,還需要呼叫 SocketChannel.finishConnect() 方法確保連線已經完成。
while (!channel.finishConnect()){// 不斷檢查是否完成了連線
Thread.sleep(10);
}
4)與服務端進行資料互動。
5)關閉連線。
在關閉連線時,如果客戶端是寫資料的一方,完成寫入之後應該先呼叫一下 SocketChannel.shutdownOutput() ,此時讀的一端能夠檢測到 read() 返回的 -1。然後呼叫 clser() 方法關閉通道。
channel.shutdownOutput(); // 關閉 TCP 輸出,此時客戶端會發送 -1 給服務端
channel.close(); // 關閉通道
服務端在客戶端由連線過來時會建立一個 SocketChannel,不需要手動建立,後續步驟和客戶端一樣。下面有完整的示例。
2.3 ServerSocketChannel
ServerSocketChannel 負責監聽連線,服務端使用,在監聽到 TCP 連線時會產生一個 SocketChannel 與客戶端進行連線和資料互動。一般為了支援併發,服務端在產生 SocketChannel 之後可以通道例項放到一個佇列中,用一個執行緒池去處理佇列中的通道。不過這種方式並不能支援高併發,要支援高併發應該使用選擇器。
1)開啟一個 ServerSocketChannel 通道, 繫結埠。
ServerSocketChannel server = ServerSocketChannel.open(); // 開啟通道
2)繫結埠
server.bind(new InetSocketAddress(9090)); // 繫結埠
3)阻塞等待連線到來。有新連線時會建立一個 SocketChannel 通道,服務端可以通過這個通道與連線過來的客戶端進行通訊。等待連線到來的程式碼一般放在一個迴圈結構中。
SocketChannel client = server.accept(); // 阻塞,直到有連線過來
4)通過 SocketChannel 與客戶端進行資料互動
5)關閉 SocketChannel
client.close();
2.4 基於套接字通道的 TCP 通訊完整示例
使用者在客戶端控制檯資料要傳送的內容,服務端接收內容並列印在控制檯。客戶端輸入 "Bye" 之後,斷開與服務端的連線。
TCP 客戶端程式碼:
public class SocketChannelWrite {
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel channel = SocketChannel.open(); // 開啟通道,此時還沒有開啟 TCP 連線
channel.connect(new InetSocketAddress("localhost", 9090)); // 連線到服務端
ByteBuffer buf = ByteBuffer.allocate(10); // 分配一個 10 位元組的緩衝區,不實用,容量太小
Scanner scanner = new Scanner(System.in); // 掃描控制檯輸入
scanner.useDelimiter("\n");
while(true){
String msg = scanner.next() + "\r\n"; // 讀取控制檯輸入的訊息,再拼接上換行符
for(int i=0; i<msg.length(); i++){ // 逐個字元遍歷輸入的內容
buf.put((byte)msg.charAt(i)); // 將字元逐個放入緩衝區
if(buf.position() == buf.limit() || i == msg.length()-1){ // 緩衝區已滿或者
buf.flip(); // 緩衝區切換到讀模式
channel.write(buf); // 往通道寫入資料
buf.clear(); // 清空緩衝區,緩衝區切換到寫入模式
}
}
if("Bye\r\n".equals(msg)){
channel.shutdownOutput(); // 關閉 TCP 輸出,此時客戶端會發送 -1 給服務端
channel.close(); // 關閉通道
break;
}
}
}
}
TCP 服務端程式碼:
public class ServerSocketChannelRead {
public static void main(String[] args) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open(); // 開啟通道
server.bind(new InetSocketAddress(9090)); // 繫結埠
ByteBuffer buff = ByteBuffer.allocate(10); // 為了程式碼演示,只分配容量為 10 位元組的緩衝區
while (true) {
SocketChannel client = server.accept(); // 阻塞,直到有連線過來
System.out.println("Client connected.");
while (true) { // 迴圈讀取客戶端傳送過來的資料
if(client.read(buff) == -1){ // 客戶端關閉了輸出之後,阻塞的 client.read(buf) 會立即返回 -1,此時 buf 中沒有內容
client.close(); // 關閉通道
System.out.println("Client closed the connection.");
break;
}
buff.flip(); // 切換到讀模式
while (buff.position() < buff.limit()) {
System.out.print((char) buff.get()); // 一個字元一個字元打印出來
}
buff.clear(); // 切換到寫模式
}
}
}
}
2.5 DatagramChannel
資料報通道 DatagramChannel 表示 UDP 通道。UDP 是無連線協議,在收發資料時不需要進行連線。與 FileChannel 和 SocketChannel 使用 read()/write() 不同,DatagramChannel 通常使用 receive()/send() 來收發資料。receive() 在接收資料之後會返回傳送方的地址,send() 方法在傳送資料的時候需要指定接收方的地址。
DatagramChannel 支援阻塞模式和非阻塞模式。非阻塞模式時,receive(ByteBuffer dst) 方法會立即返回,如果有資料,則會返回傳送方的地址;如果沒有資料,則返回 null。類似地,非阻塞模式下 send(ByteBuffer src, SocketAddress) 也會立即返回,返回的結果為傳送的位元組數。
DatagramChannel 作為客戶端操作流程:
1)開啟通道
DatagramChannel channel = DatagramChannel.open();
2)配置阻塞模式
channel.configureBlocking(false); // 非阻塞模式
3)分配緩衝區
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配 1024 位元組的緩衝區
4)資料互動
資料報通道 DatagramChannel 通過 receive()/send() 方法來進行資料的互動。需要注意的是,傳送資料時,每次最多傳送一個 UDP 資料報的大小(理論上是 65535-8 位元組);因此,當緩衝區過大時,需要考慮多次傳送。傳送資料的時候需要指定地址。
另外,DatagramChannel 指定了 connect(SocketAddress remote) 方法,傳入通訊對方的地址。如果呼叫了此方法,則該通道只能和指定的地址進行資料互動,即使 send() 指定了其它的地址也沒有。事實上,DatagramChannel 提供了 read()/write() 方法,這兩個方法只有在 connect 指定了地址的情況下才能夠使用,否則資料將被丟棄。
SocketAddress address = channel.receive(buf);
channel.send(buf, address);
5)關閉通道
channel.close();
DatagramChannel 作為服務端操作流程:
1)開啟通道
與客戶端開啟通道的方式一樣。
2)繫結要監聽的埠
channel.bind(new InetSocketAddress(9090)); // 繫結要監聽的埠
3)配置阻塞模式
4)分配緩衝區
5)接收客戶端傳送過來的資料
下面提供基於 DatagramChannel 進行 UDP 通訊的完整示例程式碼。
2.6 基於 DatagrapChannel 的 UDP 通訊例項
服務端接收客戶端傳送過來的資料報,然後列印其內容,再向客戶端傳送一條訊息,表示接收到的訊息的大小。
public class DatagramChannelRead {
public static void main(String[] args) throws IOException {
DatagramChannel channel = DatagramChannel.open(); // 開啟通道
channel.bind(new InetSocketAddress(9090)); // 繫結要監聽的埠
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配緩衝區
while (true){
SocketAddress address = channel.receive(buf); // 接收資料,獲取傳送方地址
buf.flip(); // 緩衝區切換為讀模式
int len = buf.limit(); // 獲取 buff 中資料的長度
System.out.println("Client -> " + new String(buf.array(), 0, len, StandardCharsets.UTF_8)); // 列印 buf 中的內容
buf.clear(); // 清空緩衝區,切換到寫模式
buf.put(String.format("Received %4d bytes.", len).getBytes()); // 將要返回給傳送端的訊息填入緩衝區
buf.flip();
channel.send(buf, address); // send 一次性最多隻能傳送 65535 - 8 位元組的資料,如果 buf 很大的話需要用一個迴圈去傳送。
buf.clear();
}
客戶端有2個執行緒, sender 執行緒接收使用者在控制檯輸入的內容,接收一行輸入的內容就傳送給服務端;receiver 執行緒接收服務端返回的訊息並列印在控制檯。當用戶輸入 "Bye" 時,客戶端退出。
public class DatagramChannelWrite {
public static void main(String[] args) throws IOException, InterruptedException {
DatagramChannel channel = DatagramChannel.open(); // 開啟通道
InetSocketAddress serverAddress = new InetSocketAddress("localhost", 9090); // 宣告服務端的地址
channel.configureBlocking(false); // 非阻塞模式
// 用於接收服務端傳送過來的訊息
Thread receiver = new Thread(()->{
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配 1024 位元組的緩衝區
while(!Thread.currentThread().isInterrupted()){ // 檢查中斷標誌,如果被中斷,則結束執行緒
try {
while (null == channel.receive(buf)) { // 迴圈接收資料
Thread.sleep(10); // 沒有訊息則 sleep 10ms
}
buf.flip();
System.out.println("Server -> " + new String(buf.array(), 0, buf.limit()));
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread sender = new Thread(()->{
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true){
String msg = scanner.nextLine();
if(msg.equals("Bye")) {
receiver.interrupt();
break;
}
buf.put(msg.getBytes(StandardCharsets.UTF_8));
buf.flip();
channel.send(buf, serverAddress);
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
});
sender.start(); // 啟動 sender 執行緒
receiver.start(); // 啟動 receiver執行緒
receiver.join(); // 等待 receiver
channel.close(); // 關閉通道
}
}
3. 小結
1)Java NIO 中的通道結合緩衝區,提供了一種與流不一樣的操作模式。通道是應用程式到 I/O 裝置的一個開啟的連線,應用程式可以往通道中寫入資料或者從通道中讀取資料。
2)NIO 中主要的通道有四種,磁碟檔案 I/O 相關的 FileChannel,網路 I/O 相關的 SocketChannel, ServerSocketChannel 和 DatagramChannel。其中檔案相關的通道只能以阻塞的方式進行 I/O 操作,而網路相關通道則可以通過阻塞方式和非阻塞方式進行通訊。
以上是關於通道的一些基本概念和內容,就這些內容上來看,NIO 相對於普通的 I/O 並沒有太大的優勢(非阻塞網路 I/O除外);普通 I/O 流中的 BufferInputStream, BufferedOutputSteram 能夠起到幾乎一樣的效果。事實上,基於記憶體對映技術的直接記憶體快取提供了比普通 I/O 更加高效的訪問磁碟檔案方式;而 NIO 為網路 I/O 提供了非阻塞訪問模型的介面,配合選擇器 Selector,極大提高了 Java 程式所能夠支援的併發數。