Java-RPC:2)NIO入門初窺
- Channels and Buffers(通道和緩衝區):標準的IO基於位元組流和字元流進行操作的,而NIO是基於通道(Channel)和緩衝區(Buffer)進行操作,資料總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中,Channel同時支援讀和寫。
- Asynchronous IO(非同步IO):Java NIO可以讓你非同步的使用IO,例如:當執行緒從通道讀取資料到緩衝區時,執行緒還是可以進行其他事情。當資料被寫入到緩衝區時,執行緒可以繼續處理它。從緩衝區寫入通道也類似。
- Selectors(選擇器):Java NIO引入了選擇器的概念,選擇器用於監聽多個通道的事件(比如:連線開啟,資料到達)。因此,單個的執行緒可以監聽多個數據通道。
Channel對應以前的流,Buffer不是什麼新東西,Selector是因為nio可以使用非同步的非堵塞模式才加入的東西。以前的流總是堵塞的,一個執行緒只要對它進行操作,其它操作就會被堵塞,也就相當於水管沒有閥門,你伸手接水的時候,不管水到了沒有,你就都只能耗在接水(流)上。nio的Channel的加入,相當於增加了水龍頭(有閥門),雖然一個時刻也只能接一個水管的水,但依賴輪換策略,在水量不大的時候,各個水管裡流出來的水,都可以得到妥善接納,這個關鍵之處就是增加了一個接水工,也就是Selector,他負責協調,也就是看哪根水管有水了的話,在當前水管的水接到一定程度的時候,就切換一下:臨時關上當前水龍頭,試著開啟另一個水龍頭(看看有沒有水)。當其他人需要用水的時候,不是直接去接水,而是事前提了一個水桶給接水工,這個水桶就是Buffer。也就是,其他人雖然也可能要等,但不會在現場等,而是回家等,可以做其它事去,水接滿了,接水工會通知他們。這其實也是非常接近當前社會分工細化的現實,也是統分利用現有資源達到併發效果的一種很經濟的手段,而不是動不動就來個並行處理,雖然那樣是最簡單的,但也是最浪費資源的方式。
傳統NIO的示意圖:
1.selector向核心註冊了很多的channel,如果客戶端有請求,selector通過輪詢檢視key中的channel有活躍的東西,然後把活躍的key選擇出來,看是否可以進行連線。
2.若可以連線,則selector再向核心註冊一個accept事件,等待其連線成功,此時服務端繼續做其他的事。
3.待到到客戶端與channel真正連通後,selector繼續向核心註冊一個read事件,然後又繼續做自己之前的事。當資料真正發過來的時,再選擇出有資料的channel進行讀取。
以客戶端從伺服器端獲取網路時間為例
如下是伺服器的實現:
伺服器端的Main方法import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路複用器、繫結監聽埠 * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 處理新接入的請求訊息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0位元組,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length<0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用預設值
}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
客服端的實現:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
// 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doConnect() throws IOException {
// 如果直接連線成功,則註冊到多路複用器上,傳送請求訊息,讀應答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 判斷是否連線成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else
System.exit(1);// 連線失敗,程序退出
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
this.stop = true;
} else if (readBytes < 0) {
// 對端鏈路關閉
key.cancel();
sc.close();
} else
; // 讀到0位元組,忽略
}
}
}
}
客戶端的Main方法
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用預設值
}
}
new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
.start();
}
}