[紙上談兵]Java IO詳解(三)NIO
一、概述
Java NIO 一種基於通道和緩衝區的 I/O 方式,它可以使用Native函式庫直接分配堆外記憶體,然後通過一個儲存在 Java 堆的 DirectByteBuffer 物件作為這塊記憶體的引用進行操作,避免了在Java堆和Native堆中來回複製資料。
Java NIO是一種同步非阻塞的IO模型.同步是指執行緒不斷輪詢IO事件是否就緒,非阻塞是指執行緒在等待IO的時候,可以同時做其他任務.
(注意:這個地方的同步非阻塞與我們前說的同步阻塞階段可以一一對應).
二、Java NIO三大核心
1. Buffer(緩衝區)
為什麼說NIO是基於緩衝區的IO方式呢?因為,當一個連結建立完成後,IO的資料未必會馬上到達,為了當資料到達時能夠正確完成IO操作,在BIO(阻塞IO)中,等待IO的執行緒必須被阻塞,以全天候地執行IO操作。為了解決這種IO方式低效的問題,引入了緩衝區的概念,當資料到達時,可以預先被寫入緩衝區,再由緩衝區交給執行緒,因此執行緒無需阻塞地等待IO。
Buffer是一個物件,包含一些要寫入或者讀出的資料.
在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的;在寫入資料時,也是寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作.
緩衝區實際上是一個數組,並提供了對資料結構化訪問以及維護讀寫位置等資訊。
具體的快取區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的介面:Buffer.
要注入Buffer是堆外記憶體,非堆記憶體。
Buffer的方法相對來講比較複雜,後面會專門再開一篇部落格講解Buffer中相關的文獻
那buffer與channel是如何配合的呢?
當然是資料讀取或寫入時先往buffer中讀或寫,可以參考下圖(此圖並非我畫的):
2. Channel(通道)
此與要注意通道與流的不同。通道是雙向的,流是單向的。雙向是指同一個Channel既可以進行讀,也可以進行寫;而Stream只能進行單向操作,一個Stream只能進行讀或者寫;
當執行SocketChannel.write(Buffer),便將一個 buffer 寫到了一個通道中。
可以結合Buffer上面的圖看一下。
常用通道:
1)FileChannel 可以從檔案讀或者向檔案寫入資料
2)SocketChanel 以TCP來向網路連線的兩端讀寫資料
3)ServerSocketChannel 監聽客戶端發起的TCP連線,併為每個TCP連線建立一個新的SocketChannel來進行資料讀寫
4)DatagramChannel 以UDP協議來向網路連線的兩端讀寫資料
3. Selector(選擇器)
通道和緩衝區的機制,使得執行緒無需阻塞地等待IO事件的就緒,但是總是要有人來監管這些IO事件。這個工作就交給了selector來完成,這就是所謂的同步
Java NIO的選擇器(Selector)允許一個單獨的執行緒來監視多個輸入通道(Channel),你可以註冊多個通道使用一個選擇器,然後使用一個單獨的執行緒來“選擇”通道:這些通道里已經有可以處理的輸入,或者選擇已準備寫入的通道。這種選擇機制,使得一個單獨的執行緒很容易來管理多個通道。
要使用Selector,得向Selector註冊Channel,然後呼叫它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒,這就是所說的輪詢。一旦這個方法返回,執行緒就可以處理這些事件
Selector中註冊的感興趣事件有:
OP_ACCEPT
OP_CONNECT
OP_READ
OP_WRITE
三、例項
這個例項是抄的,我稍有改動,一定要手寫才能加深理解
package com.cbird.io.aio;
import javax.xml.bind.SchemaOutputResolver;
import javax.xml.ws.Holder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* <p>TODO</p>
* <p>
* <PRE>
* <BR> 修改記錄
* <BR>-----------------------------------------------
* <BR> 修改日期 修改人 修改內容
* </PRE>
*
* @author cuiyh9
* @version 1.0
* @Date Created in 2018年07月24日 12:00
* @since 1.0
*/
public class NioDemo {
/**
* startServer()與startClient()每次只啟動一個即可。
* @author cuiyuhui
* @created
* @param
* @return
*/
public static void main(String[] args) {
try {
// startServer();
// startClient();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void startServer() throws Exception {
Server.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("伺服器啟動---");
}
public static void startClient() throws Exception {
Clent.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("客戶端啟動---");
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
System.out.println("input line:" + line);
Clent.sendMsg(line);
}
}
public static class Server {
private static int DEFAULT_PORT = 12345;
private static ServerHandler serverHandler;
public static void start() {
start(DEFAULT_PORT);
}
public static synchronized void start(int port) {
if (serverHandler != null) {
serverHandler.stop();
}
serverHandler = new ServerHandler(port);
new Thread(serverHandler, "NioServer").start();
}
}
public static class ServerHandler implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean started;
public ServerHandler(int port) {
try {
//定義選擇器
selector = Selector.open();
//定義服務端channel
serverSocketChannel = ServerSocketChannel.open();
//如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
serverSocketChannel.configureBlocking(false);
// 繫結埠 同時將backlog設為1024. backlog值的含義後面會專講
serverSocketChannel.bind(new InetSocketAddress(port), 1024);
// 監聽客戶端連線請求
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
started = true;
System.out.println(Thread.currentThread().getName() + " 服務啟動");
} catch (Exception e) {
e.printStackTrace();
}
}
public void stop() {
started = false;
}
@Override
public void run() {
while (started) {
try {
// 無論是否有讀寫事件發生,selector每隔1s被喚醒一次
this.selector.select(1000);
Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
System.out.println("key:" + key);
it.remove();
handleInput(key);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws Exception {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
// 通過ServerSocketChannel的accept建立SocketChannel例項
// 完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
SocketChannel socketChannel = serverSocketChannel.accept();
// 設定為非阻塞的
socketChannel.configureBlocking(false);
// 註冊為讀
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 讀訊息
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel)key.channel();
// 建立ByteBuffer,並開闢一個1M的緩衝區
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 讀取請求碼流,返回讀取到的位元組數
int readBytes = socketChannel.read(buffer);
if (readBytes > 0) {
// 將緩衝區當前的limit設定為position=0,用於後續對緩衝區的讀取操作. TODO
buffer.flip();
// 根據緩衝區可讀位元組數建立位元組陣列
byte[] bytes = new byte[buffer.remaining()];
// 將緩衝區可讀位元組陣列複製到新建的陣列中
buffer.get(bytes);
String result = new String(bytes ,"UTF-8");
System.out.println("request param:" + result);
result = "RESPONSE:" + result;
doWrite(socketChannel, result);
} else {
key.cancel();
socketChannel.close();
}
}
}
}
private void doWrite(SocketChannel socketChannel, String result) throws Exception {
byte[] bytes = result.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
// 將位元組陣列複製到緩衝區
byteBuffer.put(bytes);
// flip操作。 後面詳細講解這個
byteBuffer.flip();
// 傳送緩衝區的位元組陣列
socketChannel.write(byteBuffer);
}
}
public static class Clent {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static ClientHandler clientHandler;
public static void start() {
start(DEFAULT_HOST, DEFAULT_PORT);
}
public static synchronized void start(String ip, int port) {
if (clientHandler == null) {
clientHandler = new ClientHandler(ip, port);
} else {
System.out.println("client error");
return ;
}
new Thread(clientHandler, "Client").start();
}
public static boolean sendMsg(String msg) throws Exception {
if ("q".equals(msg)) {
System.out.println("stop");
System.exit(1);
return false;
}
clientHandler.sendMsg(msg);
return true;
}
}
public static class ClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;
public ClientHandler(String ip, int port) {
this.host = ip;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
started = true;
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while (started) {
try {
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws Exception {
if (key.isValid()) {
SocketChannel socketChannel = (SocketChannel)key.channel();
if (key.isConnectable()) {
if(socketChannel.finishConnect()) {
} else {
System.out.println("client exit");
System.exit(1);
}
}
if (key.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String result = new String(bytes, "UTF-8");
System.out.println("Server Response:" + result);
} else {
key.cancel();
socketChannel.close();
}
}
}
}
private void doWrite(SocketChannel socketChannel, String request) throws Exception {
byte[] bytes = request.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
socketChannel.write(byteBuffer);
}
private void doConnect() throws Exception {
if (socketChannel.connect(new InetSocketAddress(host, port))) {
System.out.println("client connect success");
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("client register");
}
}
private void sendMsg(String msg) throws Exception {
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("sendMsg after register: " + msg);
doWrite(socketChannel, msg);
}
}
}
四、 引數backlog詳解
參考:https://www.jianshu.com/p/e6f2036621f4
這個引數出現在了下面程式碼中
// 繫結埠 同時將backlog設為1024. TODO backlog作用
serverSocketChannel.bind(new InetSocketAddress(port), 1024);
這個引數是底層tcp socket的引數。如果想要了解這個引數,首先我們需瞭解tcp的三次握手。
1、client傳送SYN到server,將狀態修改為SYN_SEND,如果server收到請求,則將狀態修改為SYN_RCVD,並把該請求放到syns queue佇列中。
2、server回覆SYN+ACK給client,如果client收到請求,則將狀態修改為ESTABLISHED,併發送ACK給server。
3、server收到ACK,將狀態修改為ESTABLISHED,並把該請求從syns queue中放到accept queue。
在linux系統核心中維護了兩個佇列:syns queue和accept queue。
syns queue
用於儲存半連線狀態的請求,其大小通過/proc/sys/net/ipv4/tcp_max_syn_backlog指定,一般預設值是512,不過這個設定有效的前提是系統的syncookies功能被禁用。網際網路常見的TCP SYN FLOOD惡意DOS攻擊方式就是建立大量的半連線狀態的請求,然後丟棄,導致syns queue不能儲存其它正常的請求。
accept queue
用於儲存全連線狀態的請求,其大小通過/proc/sys/net/core/somaxconn指定,在使用listen函式時,核心會根據傳入的backlog引數與系統引數somaxconn,取二者的較小值。
backlog引數歷史上被定義為上面兩個佇列的大小之和.Berkely實現中的backlog值為上面兩佇列之和再乘以1.5.根據不同linux版本不同,這個值定義可能不同。我現在只要理解這個值為syns queue和accept queue佇列之合就可以了。
如果當客戶端SYN到達的時候佇列已滿,TCP將會忽略後續到達的SYN,但是不會給客戶端傳送RST資訊,因為此時允許客戶端重傳SYN分節,如果返回錯誤資訊,那麼客戶端將無法分清到底是服務端對應埠上沒有相應應用程式還是服務端對應埠上佇列已滿這兩種情況
說明:
RST: TCP報頭中的標誌位,表示連線重置
參考: https://www.cnblogs.com/JohnABC/p/6323046.html
這個值我簡單粗暴的理解為一個埠上可以等待連線的最大數值。
五、Reactor IO模型
Java NIO可以進一步優化
種優化方式是:將Selector進一步分解為Reactor,將不同的感興趣事件分開,每一個Reactor只負責一種感興趣的事件。這樣做的好處是:1、分離阻塞級別,減少了輪詢的時間;2、執行緒無需遍歷set以找到自己感興趣的事件,因為得到的set中僅包含自己感興趣的事件。
詳細參考 下一篇博額
參考:https://www.cnblogs.com/geason/p/5774096.html
http://www.jasongj.com/java/nio_reactor/
參考(都是我自認為很不錯的資料):
https://www.cnblogs.com/geason/p/5774096.html
https://www.cnblogs.com/dolphin0520/p/3919162.html
https://blog.csdn.net/anxpp/article/details/51512200