Java NIO怎麼理解通道和非阻塞?
nio引入了buffer、channel、selector等概念。
通道相當於之前的I/O流。
“通道”太抽象了。java解釋不清的東西只能看它底層是怎麼解釋的——作業系統的I/O控制,通道控制方式?
I/O裝置:CPU——通道——裝置控制器——I/O裝置
(通道和裝置控制器的關係是多對多,裝置控制器和I/O裝置的關係也是多對多。)
1.CPU在執行使用者程式時遇到I/O請求,根據使用者的I/O請求生成通道程式(也可以是事先編好的)。放到記憶體中,並把該通道程式首地址放入CAW中。
2.CPU執行“啟動I/O”指令,啟動通道工作。
3.通道接收“啟動I/O”指令訊號,從CAW(記錄下一條通道指令存放的地址)中取出通道程式首地址,並根據此地址取出通道程式的第一條指令,放入CCW(記錄正在執行的通道指令)中;同時向CPU發回答訊號,通知“啟動I/O”指令完成完畢,CPU可繼續執行。
4.與此同時,通道開始執行通道程式,進行物理I/O操作。當執行完一條指令後,如果還有下一條指令則繼續執行;否則表示傳輸完成,同時自行停止,通知CPU轉去處理通道結束事件,並從CCW中得到有關通道狀態。
如此一來,主處理器只要發出一個I/O操作命令,剩下的工作完全由通道負責。I/O操作結束後,I/O通道會發出一箇中斷請求,表示相應操作已完成。
通道控制方式是對資料塊進行處理的,並非位元組。
I/O分兩段:1.資料從I/O裝置到核心緩衝區。2.資料從核心緩衝區到應用緩衝區
I/O型別:
1.非同步I/O不會產生阻塞,程式不會等待I/O完成,繼續執行程式碼,等I/O完成了再執行一個什麼回撥函式,程式碼執行效率高。很容易聯想到ajax。這個一般用於I/O操作不影響之後的程式碼執行。
2.阻塞I/O,程式發起I/O操作後,程序阻塞,CPU轉而執行其他程序,I/O的兩個步驟完成後,向CPU傳送中斷訊號,程序就緒,等待執行。
3.非阻塞I/O並非都不阻塞,其實是第一步不阻塞,第二部阻塞。程式發起I/O操作後,程序一直檢查第一步是否完成,CPU一直在迴圈詢問,完成後,程序阻塞直到完成第二步。明白了!這個是“站著茅坑不拉屎”,CPU利用率最低的。邏輯和作業系統的程式直接控制方式一樣。
阻塞不阻塞描述的是發生I/O時當前執行緒的狀態。
以上是作業系統的I/O,那麼java的nio又是怎樣的呢?
個人覺得是模仿了通道控制方式。
先看看nio的示例程式碼:
服務端TestReadServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TestReadServer {
/*標識數字*/
private int flag = 0;
/*緩衝區大小*/
private int BLOCK = 1024*1024*10;
/*接受資料緩衝區*/
private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*傳送資料緩衝區*/
private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
private Selector selector;
public TestReadServer(int port) throws IOException {
// 開啟伺服器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 伺服器配置為非阻塞
serverSocketChannel.configureBlocking(false);
// 檢索與此通道關聯的伺服器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
// 進行服務的繫結
serverSocket.bind(new InetSocketAddress(port));
// 通過open()方法找到Selector
selector = Selector.open();
// 註冊到selector,等待連線
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start----"+port+":");
}
// 監聽
private void listen() throws IOException {
while (true) {
// 選擇一組鍵,並且相應的通道已經開啟
selector.select();
// 返回此選擇器的已選擇鍵集。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleKey(selectionKey);
}
}
}
// 處理請求
private void handleKey(SelectionKey selectionKey) throws IOException {
// 接受請求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count=0;
// 測試此鍵的通道是否已準備好接受新的套接字連線。
if (selectionKey.isAcceptable()) {
// 返回為之建立此鍵的通道。
server = (ServerSocketChannel) selectionKey.channel();
// 接受到此通道套接字的連線。
// 此方法返回的套接字通道(如果有)將處於阻塞模式。
client = server.accept();
// 配置為非阻塞
client.configureBlocking(false);
// 註冊到selector,等待連線
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 返回為之建立此鍵的通道。
client = (SocketChannel) selectionKey.channel();
//將緩衝區清空以備下次讀取
receivebuffer.clear();
//讀取伺服器傳送來的資料到緩衝區中
System.out.println(System.currentTimeMillis());
count = client.read(receivebuffer);
System.out.println(System.currentTimeMillis() + "~"+count);
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
int port = 1234;
TestReadServer server = new TestReadServer(port);
server.listen();
}
}
客戶端TestReadClient.javaimport java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TestReadClient {
/*標識數字*/
private static int flag = 0;
/*緩衝區大小*/
private static int BLOCK = 1024*1024*10;
/*接受資料緩衝區*/
private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*傳送資料緩衝區*/
private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
/*伺服器端地址*/
private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress(
"localhost", 1234);
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
// 開啟socket通道
SocketChannel socketChannel = SocketChannel.open();
// 設定為非阻塞方式
socketChannel.configureBlocking(false);
// 開啟選擇器
Selector selector = Selector.open();
// 註冊連線服務端socket動作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 連線
socketChannel.connect(SERVER_ADDRESS);
// 分配緩衝區大小記憶體
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveText;
String sendText;
int count=0;
while (true) {
//選擇一組鍵,其相應的通道已為 I/O 操作準備就緒。
//此方法執行處於阻塞模式的選擇操作。
selector.select();
//返回此選擇器的已選擇鍵集。
selectionKeys = selector.selectedKeys();
//System.out.println(selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判斷此通道上是否正在進行連線操作。
// 完成套接字通道的連線過程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("完成連線!");
sendbuffer.clear();
BufferedInputStream br = new BufferedInputStream(new FileInputStream(new File("D:\\BigData.zip")));
byte[] b = new byte[BLOCK];
br.read(b);
sendbuffer.put(b);
sendbuffer.flip();
System.out.println(System.currentTimeMillis());
client.write(sendbuffer);
System.out.println(System.currentTimeMillis());
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
//將緩衝區清空以備下次讀取
receivebuffer.clear();
//讀取伺服器傳送來的資料到緩衝區中
count=client.read(receivebuffer);
if(count>0){
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("客戶端接受伺服器端資料--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
}
}
selectionKeys.clear();
}
}
}
例子是TestReadClient向TestReadServer傳送一個本地檔案。TestReadServer收到後每次列印讀取到的位元組數。
如何體現非同步I/O?
看看TestReadClient中的:
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判斷此通道上是否正在進行連線操作。
// 完成套接字通道的連線過程。
if (client.isConnectionPending()) {
client.finishConnect();
如果沒有client.finishConnect();這句等待完成socket連線,可能會報異常:java.nio.channels.NotYetConnectedException
非同步的才不會管你有沒有連線成功,都會執行下面的程式碼。這裡需要人為的干預。
如果要證明是java的nio單獨使用非阻塞I/O,真沒辦法!!!阻塞非阻塞要檢視程序。。。
不過還有種說法,叫非同步非阻塞。上面那段,是用非同步方式建立連線,程序當然沒有被阻塞。使用了finishConnect()這是人為將程式中止,等待連線建立完成(是模仿阻塞將當前程序阻塞掉,還是模仿非阻塞不斷輪詢訪問,不重要了反正是程式卡住沒往下執行)。
所以,建立連線的過程用非同步非阻塞I/O可以解釋的通。那read/write的過程呢?
根據上面例子的列印結果,可以知道這個過程是同步的,沒執行完是不會執行下面的程式碼的。至於底下是使用阻塞I/O還是非阻塞I/O,對於應用級程式來說不重要了。
阻塞還是非阻塞,對於正常的開發(創立連線,從連線中讀寫資料)並沒有多少的提升,操作過程都類似。
那NIO憑什麼成為高效能架構的基礎,比起IO,效能優越在哪裡,接著猜。。。
java nio有意模仿作業系統的通道控制方式,那他的底層是不是就是直接使用作業系統的通道?
通道中的資料是以塊為單位的,之前的流是以位元組為單位的,同樣的資料流操作外設的次數較多。程式碼中channel都是針對ByteBuffer物件進行read/write的,而ByteBuffer又是ByteBuffer.allocate(BLOCK);這樣建立的,是一個連續的塊空間。
那ByteBuffer是不是也是模擬作業系統的快取?
快取在io也有,如BufferedInputStream。CPU和外設的速度差很多,快取為了提高CPU使用率,等外設將資料讀入快取後,CPU再統一操作,不用外設讀一次,CPU操作一次,CPU的效率會被拉下來。。。