Java IO模型&NIO
Java IO模型&NIO
楔子
概述
本文從主要討論一下幾個方面的內容:
- 可擴充套件的網路服務
- 事件驅動處理
- Reactor(反應堆)模式
基礎版
多執行緒版
其他變體版
- Java NIO API 參考
網路服務
網路服務一般都涉及到Web服務,分散式物件等,但大多數服務端都有相同的基礎步驟,如解析讀請求(Read Request),請求解碼,請求處理,響應編碼,傳送響應資料。但實際上,在具體的每個步驟上面所帶來的效能消耗卻每每不同,例如XML解析,檔案傳輸,Web頁面生成及渲染,或者其他計數服務這些不同的處理往往會有不同的效能消耗。
經典的服務設計
每個Handler都可以在各自的執行緒裡執行所有的操作。
經典的SocketServer迴圈阻塞
public class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// 或單執行緒,或執行緒池
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte [] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}
注:示例程式碼中,大多數異常處理都已省略
如你可見,這樣的模式雖簡單清晰,但卻缺乏彈性伸縮的能力,當客戶端併發量增加後,服務端的執行緒數量會不斷增加,最終會耗盡系統資源。即使是中間採用執行緒池,仍然不能承受大量的併發請求。所以,我們需要新的方案,需要新的模式。但,當務之急是先明確一個目標。
可伸縮目標
- 在面臨持續增長的負載(客戶請求)壓力時,可優雅的降級
- 可通過增加物理資源(CPU,記憶體,磁碟,頻寬)來持續改進
- 仍需滿足可用性和效能目標:潛在的短期內目標,高峰期需求指標,服務質量可調
面對上述的目標,分而治之通常是實現可伸縮,可擴充套件目標的最好的方式。
分而治之
分而治之就是把處理過程折分成明確,職責單一的任務,每個任務採用非阻塞的方式來執行一個操作。 當任務狀態是啟用時,才開始執行。例如,一個IO事件通常是作為一種觸發條件,感知到該事件才開始作相應的read -> decode -> compute -> encode -> send.
這種思想是Java NIO 提供的一種基礎的機制。NIO 支援一種非阻塞的讀寫方式,還有通過感知到的IO事件來分發給相關的任務。
事件驅動設計
這種模式通常比其他的方式更有效,因為它用到的資源會偏少,不用為每個Client分配單獨的執行緒。除此之外,所需開銷會減少,因為減少了執行緒上下文切換,也減少了鎖競爭的場景。但排程相應的請求會偏慢,因為需要手動的為事件繫結具體的執行操作。
好事多磨礪。這種模式雖然好處多多,但對於編寫程式卻不是一件容易的事。因為需要處處考慮把程式碼寫成非阻塞的行為,還需記錄並跟蹤服務的狀態
背景知識:AWT 事件
IO事件驅動模式採用了類似的思想,但使用了不同的設計來實現。
Reactor 模式
Reactor對IO事件的相應是轉發到適配的處理器(Handler)上,這類似於AWT的執行緒。由Handler來執行一些非阻塞的行為。Reactor模式需要繫結Handler到具體的事件上,關於這種模式的詳解可以去看這本書:《面向模式的軟體架構:卷2》- Pattern-Oriented Software Architecture, Volume 2 (POSA2)。
Reactor基礎模式
此外單執行緒版本,由acceptor接收請求並轉發到執行緒中,由該執行緒來完成請求的處理。
Java NIO 支援
Channels
Channel是一種通道,可以連線到檔案,套接字(Socket),用於支援非阻塞的讀功能。
Buffers
Buffer是一種類似於陣列的物件,可以被Channel直接讀寫。
Selectors
Selecttor是一種選擇器,或叫多路複用選擇器。用於在多個Channel直接感知IO事件,並作相應的隔離區分,然後負責把事件封裝成對應的SelectionKey。
SelectionKeys
SelectionKey會維護事件的狀態,並與事件繫結,通過SelectionKey來完成IO的讀寫。
Reactor 模式實踐
第一步:初始化
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(
new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
/*
可選方式:使用明確的SPI提供者方式
SelectorProvider provider = SelectorProvider.provider();
Selector s = provider.openSelector();
ServerSocketChannel channel = provider.openServerSocketChannel();
*/
}
}
第二步:迴圈分發
public void run() { // 一般在新執行緒中作React
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey) (it.next());
selected.clear();
}
} catch (Exception e) {
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
第三步:接收者
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}catch(IOException ex) { /* ... */ }
}
}
如圖示:
第四步: Handler設定
final class Handler implements Runnable {
private static final int MAXIN = 1024;
private static final int MAXOUT = 1024;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
}
第五步:請求處理
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
還有一種:狀態Handler
就是借用GoF的狀態物件模式,為每種狀態都附帶一個Handler,甚至根據狀態重新繫結新的Handler。
class Handler {
public void run() { // initial state is reader
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
}
多執行緒版本Reactor模型設計
首先,為了可擴充套件,可策略性的新增執行緒來適應多處理器的計算機。為了複用執行緒,需要工作者執行緒池。這種模式Reactor必須響應快速,在第一時間觸發相應的Handler。還允許劃分非IO的處理給其他執行緒執行。
除此之外,還可再擴充套件。當單一的Reactor執行緒池模式達到飽和時,還能擴充套件成多個Reactor,用以均衡負載CPU、IO速率。
使用執行緒池可以有助於調整和控制執行緒數量,一般數量會少於客戶連線數。下圖是基於執行緒池的模型:
執行緒池Handler
class Handler implements Runnable {
// uses util.concurrent thread pool
static Executor pool = new ThreadPoolExecutor()
static final int PROCESSING = 3;
synchronized void read() {
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
public void run() {}
}
任務協作
直接傳遞(Handsoff)
投遞任務到執行緒池裡後,然後執行該任務,完成後,又呼叫下一個任務。這樣雖然快速,但卻很生硬,不自然。
回撥(Callback)
回撥Handler的分發器(Dispatcher),然後執行狀態變更,重新繫結Handler。這種類似GoF中的中介者模式。
佇列(Queue)
任務直接投遞到佇列中
非同步結果(Future)
非同步執行任務,利用Wait/Notify機制來獲取執行結果。
使用執行緒池PooledExecutor
首先這必須是一個可調整的工作者執行緒池,由方法簽名為execute(Runnable r)
, 需要控制的引數有:
任務佇列的型別、最大執行緒數量、最小執行緒數量、預熱與按需分配執行緒數量,執行緒Keep-Alive時間,飽和策略(阻塞,丟棄,生產者執行等)。
多Reactor執行緒池模型
這種模型主要用於靜態或動態的協調控制CPU、IO速率,由一個主接收者分發Selector到其他的Reactor。
Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length) next = 0;
}
}
模型圖如示:
使用NIO特性
NIO支援每個Reactor使用多個Selector,用以繫結不同的Handler到不同的事件中,但需要做好同步協作。還支援自動的檔案到網路或網路到檔案的複製傳輸,記憶體檔案對映,直接記憶體分配。
NIO API 參考
Buffer
ByteBuffer (CharBuffer, LongBuffer, etc not shown.)
Channel
SelectableChannel
SocketChannel
ServerSocketChannel
FileChannel
Selector
SelectionKey
下面對部分API作簡單的介紹
Buffer
abstract class Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
}
很多方法都是見名之意,Buffer內部維護有四個狀態變數:mark、position、limit、capacity. 其中mark表示標記的位置,而其餘三者之間的關係是:position < limit < capacity. 如圖:
這三個變數一起可以跟蹤緩衝區的狀態和它所包含的資料。
Channel
interface Channel {
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
int read(ByteBuffer[] dsts, int offset, int length) throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
int write(ByteBuffer[] srcs, int offset, int length) throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}
Channel可看作IO操作的聯結器。其子類來實現不同的行為,例如用於讀,用於寫的Channel, 還有聚合、分散功能的Channel。
Selector
abstract class Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
}
Selector是一種多路複用選擇器,Selector註冊好的Channel是由SelectionKey來表示。