Java異步非阻塞IO NIO使用與代碼分析
阿新 • • 發佈:2018-02-10
package mes 127.0.0.1 back 之一 write throwable private 建立 [TOC]
Java異步非阻塞IO NIO使用與代碼分析
TimeServer程序的NIO實現完整代碼
TimeServer程序來自書本《Netty權威指南》,nio的代碼確實有些難懂(這也是後面需要使用Netty的原因之一),不過我對代碼加了註釋,這樣一來對nio的概念及基本的使用都會有一個非常清晰的認識:
服務端程序
TimeServer.java:
package cn.xpleaf.nio; public class TimeServer { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (Exception e) { // 采用默認值 } } new Thread(new MultiplexerTimeServer(port)).start(); } }
MultiplexerTimeServer.java:
package cn.xpleaf.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.sql.Date; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路復用器,綁定監聽端口 */ public MultiplexerTimeServer(int port) { try { // 創建多路復用器Selector selector = Selector.open(); // 創建ServerSocketChannel,它相當於是所有客戶端連接的父管道 servChannel = ServerSocketChannel.open(); // 將ServerSocketChannel設置為異步非阻塞 servChannel.configureBlocking(false); // 綁定偵聽端口,backlog為1024,表示serverchannel容納的最大的客戶端數量為1024(個人查找資料得出的結果,不一定準確) servChannel.socket().bind(new InetSocketAddress(port), 1024); // 將ServerSocketChannel註冊到selector上,並監聽SelectionKey.OP_ACCEPT操作位 servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while (!stop) { try { // timeout - 如果為正,則在等待某個通道準備就緒時最多阻塞 timeout 毫秒;如果為零,則無限期地阻塞;必須為非負數(API文檔) // 休眠時間為1s,無論是否有讀寫等事件發生,selector每隔1s都被喚醒一次 selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { // 獲取key值,通過對key進行操作,可以獲取到其所對應的註冊到selector上的channel // 最初是只有一個ServerSocketChannel所對應的key,也就是前面所創建的servChannel,它相當於是所有客戶端連接的父管道 // nio的服務端就是通過它來創建與客戶端的連接的,因為目前的代碼就只有它監聽了SelectionKey.OP_ACCEPT操作位 key = it.next(); // 同時把該key值從selectedKeys集合中移除 it.remove(); // 處理該key值 try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { // TODO Auto-generated catch block t.printStackTrace(); } } // 多路復用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重復釋放資源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 對key進行處理 * * @param key * @throws IOException */ public void handleInput(SelectionKey key) throws IOException { // 處理新接入的請求消息 if (key.isValid()) { // 連接建立時 if (key.isAcceptable()) { // 接收新的連接 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); // 設置SocketChannel為異步非阻塞 sc.configureBlocking(false); // 註冊新的連接到多路復用器selector中,監聽SelectionKey.OP_READ操作位 sc.register(selector, SelectionKey.OP_READ); } // 讀數據 if (key.isReadable()) { // 通過key獲取到其註冊在Selector上的channel SocketChannel sc = (SocketChannel) key.channel(); // 分配一個新的字節緩沖區,大小為1024KB,即1MB ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 由於前面已經將該SocketChannel設置為異步非阻塞模式,因此它的read是非阻塞的 // 返回值為讀取到的字節數 // 返回值不同,意義不同: /** * 大小0:讀到了字節,對字節進行編解碼 等於0:沒有讀取到字節,屬於正常場景,忽略 為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源 */ int readBytes = sc.read(readBuffer); if (readBytes > 0) { // 讀取到字節,進行解碼操作 // 將緩沖區當前的limit設置為position,position設置為0,用於後續對緩沖區的讀取操作(我想這是API中定義的吧) readBuffer.flip(); // 根據緩沖區可讀的字節個數創建字節數組 byte[] bytes = new byte[readBuffer.remaining()]; // 將緩沖區可讀的字節數組復制到新創建的字節數組中 readBuffer.get(bytes); // 將字節數組以utf-8方式轉換為字符串 String body = new String(bytes, "utf-8"); System.out.println("The time server receive order : " + body); // 解析客戶端發送的指令,同時構造返回結果 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; // 將應答消息異步發送給客戶端 doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else { ; // 讀到0字節忽略 } } } } /** * 將應答消息異步發送給客戶端 * * @param channel * @param response * @throws IOException */ public void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { // 將字符串編碼為字節數組 byte[] bytes = response.getBytes(); // 根據字節數組的容量創建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // 將字節數組復制到緩沖區 writeBuffer.put(bytes); // flip操作 writeBuffer.flip(); // 將緩沖區的字節數組發送出去 channel.write(writeBuffer); /** * 註意這裏並沒有處理半包問題,《Netty權威指南》中的說明如下(P35) * 需要指出的是,由於SocketChannel是異步非阻塞的,它並不保證一次能夠把需要發送的字節數組發送完,此時會出現半包問題。 * 我們需要註冊寫操作,不斷輪詢Selector將沒有發送完的ByteBuffer發送完畢,然後可以通過ByteBuffer的hasRemain()方法 * 判斷消息是否發送完成。此處僅僅是個簡單的入門級例程,沒有演示如何處理“寫半包”場景,後續的章節會有詳細說明。 */ } } }
客戶端程序
TimeClient.java:
package cn.xpleaf.nio; public class TimeClient { public static void main(String[] args) { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (Exception e) { // 采用默認值 } } new Thread(new TimeClientHandle("127.0.0.1", port)).start(); } }
TimeClientHandle.java:
package cn.xpleaf.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
/**
* 初始化多路復用器,設置連接的服務端地址和端口
*
* @param host
* @param port
*/
public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
// 創建多路復用器Selector
selector = Selector.open();
// 創建SocketChannel,用來連接服務端
socketChannel = SocketChannel.open();
// 將SocketChannel設置為異步非阻塞
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
// 先嘗試直接連接
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
// 當然這裏也可以將上面的直接連接代碼註釋,然後使用下面這兩行代碼
// 只是需要註意的是,如果一開始沒有嘗試連接,那麽即使後來註冊偵聽連接也是沒有意義的
// 此時沒有發送連接請求,服務端根本就不會響應
// socketChannel.connect(new InetSocketAddress(host, port));
// socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (!stop) {
try {
// timeout - 如果為正,則在等待某個通道準備就緒時最多阻塞 timeout 毫秒;如果為零,則無限期地阻塞;必須為非負數(API文檔)
// 休眠時間為1s,無論是否有讀寫等事件發生,selector每隔1s都被喚醒一次
selector.select(1000);
// 獲取所有就緒的channel的key
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
// 獲取key值,通過對key進行操作,可以獲取到其所對應的註冊到selector上的channel
// 最初是只有一個ServerSocketChannel所對應的key,也就是前面所創建的servChannel,它相當於是所有客戶端連接的父管道
// nio的服務端就是通過它來創建與客戶端的連接的,因為目前的代碼就只有它監聽了SelectionKey.OP_ACCEPT操作位
key = it.next();
// 同時把該key值從selectedKeys集合中移除
it.remove();
// 處理該key值
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// 多路復用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重復釋放資源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 對key進行處理
*
* @param key
* @throws IOException
*/
public void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 通過key獲取到SocketChannel
SocketChannel sc = (SocketChannel) key.channel();
// isConnectable是判斷是否處於連接狀態
// 如果是,說明服務端已經返回ACK應答消息,後面就需要對連接結果進行判斷
if (key.isConnectable()) {
// 對連接結果進行判斷
if (sc.finishConnect()) {
// 註冊SocketChannel到多路復用器selector上,並監聽SelectionKey.OP_READ操作位
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else {
// 連接失敗,進程退出
System.exit(1);
}
}
// 讀數據
if (key.isReadable()) {
// 分配一個新的字節緩沖區,大小為1024KB,即1MB
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 由於前面已經將該SocketChannel設置為異步非阻塞模式,因此它的read是非阻塞的
// 返回值為讀取到的字節數
// 返回值不同,意義不同:
/**
* 大小0:讀到了字節,對字節進行編解碼 等於0:沒有讀取到字節,屬於正常場景,忽略 為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源
*/
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
// 讀取到字節,進行解碼操作
// 將緩沖區當前的limit設置為position,position設置為0,用於後續對緩沖區的讀取操作(我想這是API中定義的吧)
readBuffer.flip();
// 根據緩沖區可讀的字節個數創建字節數組
byte[] bytes = new byte[readBuffer.remaining()];
// 將緩沖區可讀的字節數組復制到新創建的字節數組中
readBuffer.get(bytes);
// 將字節數組以utf-8方式轉換為字符串
String body = new String(bytes, "utf-8");
System.out.println("Now : " + body);
} else if (readBytes < 0) {
// 對端鏈路關閉
key.cancel();
sc.close();
} else {
; // 讀到0字節忽略
}
}
}
}
/**
* 連接到服務端
*
* @throws IOException
*/
private void doConnect() throws IOException {
// 如果直接連接成功,則註冊到多路復用器上,發送請求消息,讀應答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
/**
* 寫操作
*
* @throws IOException
*/
private void doWrite(SocketChannel sc) throws IOException {
// 將字符串編碼為字節數組
byte[] req = "QUERY TIME ORDER".getBytes();
// 根據字節數組的容量創建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
// 將字節數組復制到緩沖區
writeBuffer.put(req);
// flip操作
writeBuffer.flip();
// 將緩沖區的字節數組發送出去
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("Send order 2 server succeesd.");
}
// 也是沒有處理"半包寫"的問題,可以查看服務端程序的代碼註釋說明
}
}
程序測試
服務端執行:
The time server is start in port : 8080
客戶端執行:
Send order 2 server succeesd.
Now : 2018-02-10
此時再查看服務端的輸出結果:
The time server is start in port : 8080
The time server receive order : QUERY TIME ORDER
Java異步非阻塞IO NIO使用與代碼分析