1. 程式人生 > >Java IO模型&NIO

Java IO模型&NIO

Java IO模型&NIO

楔子

概述

本文從主要討論一下幾個方面的內容:
- 可擴充套件的網路服務
- 事件驅動處理
- Reactor(反應堆)模式
基礎版
多執行緒版
其他變體版
- Java NIO API 參考

網路服務

網路服務一般都涉及到Web服務,分散式物件等,但大多數服務端都有相同的基礎步驟,如解析讀請求(Read Request),請求解碼,請求處理,響應編碼,傳送響應資料。但實際上,在具體的每個步驟上面所帶來的效能消耗卻每每不同,例如XML解析,檔案傳輸,Web頁面生成及渲染,或者其他計數服務這些不同的處理往往會有不同的效能消耗。

經典的服務設計

這裡寫圖片描述
每個Handler都可以在各自的執行緒裡執行所有的操作。

經典的SocketServer迴圈阻塞

public class Server implements Runnable {

    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
                // 或單執行緒,或執行緒池
} catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte
[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } } } 注:示例程式碼中,大多數異常處理都已省略

如你可見,這樣的模式雖簡單清晰,但卻缺乏彈性伸縮的能力,當客戶端併發量增加後,服務端的執行緒數量會不斷增加,最終會耗盡系統資源。即使是中間採用執行緒池,仍然不能承受大量的併發請求。所以,我們需要新的方案,需要新的模式。但,當務之急是先明確一個目標。

可伸縮目標

  • 在面臨持續增長的負載(客戶請求)壓力時,可優雅的降級
  • 可通過增加物理資源(CPU,記憶體,磁碟,頻寬)來持續改進
  • 仍需滿足可用性和效能目標:潛在的短期內目標,高峰期需求指標,服務質量可調

面對上述的目標,分而治之通常是實現可伸縮,可擴充套件目標的最好的方式。

分而治之

分而治之就是把處理過程折分成明確,職責單一的任務,每個任務採用非阻塞的方式來執行一個操作。 當任務狀態是啟用時,才開始執行。例如,一個IO事件通常是作為一種觸發條件,感知到該事件才開始作相應的read -> decode -> compute -> encode -> send.

這種思想是Java NIO 提供的一種基礎的機制。NIO 支援一種非阻塞的讀寫方式,還有通過感知到的IO事件來分發給相關的任務。

事件驅動設計

這種模式通常比其他的方式更有效,因為它用到的資源會偏少,不用為每個Client分配單獨的執行緒。除此之外,所需開銷會減少,因為減少了執行緒上下文切換,也減少了鎖競爭的場景。但排程相應的請求會偏慢,因為需要手動的為事件繫結具體的執行操作。

好事多磨礪。這種模式雖然好處多多,但對於編寫程式卻不是一件容易的事。因為需要處處考慮把程式碼寫成非阻塞的行為,還需記錄並跟蹤服務的狀態

背景知識:AWT 事件

這裡寫圖片描述

IO事件驅動模式採用了類似的思想,但使用了不同的設計來實現。

Reactor 模式

Reactor對IO事件的相應是轉發到適配的處理器(Handler)上,這類似於AWT的執行緒。由Handler來執行一些非阻塞的行為。Reactor模式需要繫結Handler到具體的事件上,關於這種模式的詳解可以去看這本書:《面向模式的軟體架構:卷2》- Pattern-Oriented Software Architecture, Volume 2 (POSA2)。

Reactor基礎模式

這裡寫圖片描述
此外單執行緒版本,由acceptor接收請求並轉發到執行緒中,由該執行緒來完成請求的處理。

Java NIO 支援

Channels

Channel是一種通道,可以連線到檔案,套接字(Socket),用於支援非阻塞的讀功能。

Buffers

Buffer是一種類似於陣列的物件,可以被Channel直接讀寫。

Selectors

Selecttor是一種選擇器,或叫多路複用選擇器。用於在多個Channel直接感知IO事件,並作相應的隔離區分,然後負責把事件封裝成對應的SelectionKey。

SelectionKeys

SelectionKey會維護事件的狀態,並與事件繫結,通過SelectionKey來完成IO的讀寫。

Reactor 模式實踐

第一步:初始化

public class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());

        /*
            可選方式:使用明確的SPI提供者方式
            SelectorProvider provider = SelectorProvider.provider();
            Selector s  = provider.openSelector();
            ServerSocketChannel channel = provider.openServerSocketChannel();
        */
    }
   }

第二步:迴圈分發

    public void run() { // 一般在新執行緒中作React
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (Exception e) {

        }
    }
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null)
            r.run();
    }

第三步:接收者

class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            }catch(IOException ex) { /* ... */ }
        }
 }

如圖示:
這裡寫圖片描述

第四步: Handler設定

final class Handler implements Runnable {
        private static final int MAXIN = 1024;
        private static final int MAXOUT = 1024;
        final SocketChannel socket;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(MAXIN);
        ByteBuffer output = ByteBuffer.allocate(MAXOUT);
        static final int READING = 0, SENDING = 1;
        int state = READING;

        Handler(Selector sel, SocketChannel c) throws IOException {
            socket = c;
            c.configureBlocking(false);
            // Optionally try first read now
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }

        boolean inputIsComplete() { /* ... */ }

        boolean outputIsComplete() { /* ... */ }

        void process() { /* ... */ }
  }

第五步:請求處理

 public void run() {
            try {
                if (state == READING) read();
                else if (state == SENDING) send();
            } catch (IOException ex) { /* ... */ }
        }

        void read() throws IOException {
            socket.read(input);
            if (inputIsComplete()) {
                process();
                state = SENDING;
                // Normally also do first write now
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }

        void send() throws IOException {
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }

還有一種:狀態Handler

就是借用GoF的狀態物件模式,為每種狀態都附帶一個Handler,甚至根據狀態重新繫結新的Handler。

    class Handler { 
        public void run() { // initial state is reader
            socket.read(input);
            if (inputIsComplete()) {
                process();
                sk.attach(new Sender());
                sk.interest(SelectionKey.OP_WRITE);
                sk.selector().wakeup();
            }
        }
        class Sender implements Runnable {
            public void run(){ // ...
                socket.write(output);
                if (outputIsComplete()) sk.cancel();
            }
        }
    }

多執行緒版本Reactor模型設計

首先,為了可擴充套件,可策略性的新增執行緒來適應多處理器的計算機。為了複用執行緒,需要工作者執行緒池。這種模式Reactor必須響應快速,在第一時間觸發相應的Handler。還允許劃分非IO的處理給其他執行緒執行。

除此之外,還可再擴充套件。當單一的Reactor執行緒池模式達到飽和時,還能擴充套件成多個Reactor,用以均衡負載CPU、IO速率。
使用執行緒池可以有助於調整和控制執行緒數量,一般數量會少於客戶連線數。下圖是基於執行緒池的模型:
這裡寫圖片描述

執行緒池Handler

    class Handler implements Runnable {

        // uses util.concurrent thread pool
        static Executor pool = new ThreadPoolExecutor()
        static final int PROCESSING = 3;

        synchronized void read() { 
            socket.read(input);
            if (inputIsComplete()) {
                state = PROCESSING;
                pool.execute(new Processer());
            }
        }
        synchronized void processAndHandOff() {
            process();
            state = SENDING; // or rebind attachment
            sk.interest(SelectionKey.OP_WRITE);
        }
        class Processer implements Runnable {
            public void run() { processAndHandOff(); }
        }

        public void run() {}
    }

任務協作

直接傳遞(Handsoff)

投遞任務到執行緒池裡後,然後執行該任務,完成後,又呼叫下一個任務。這樣雖然快速,但卻很生硬,不自然。

回撥(Callback)

回撥Handler的分發器(Dispatcher),然後執行狀態變更,重新繫結Handler。這種類似GoF中的中介者模式。

佇列(Queue)

任務直接投遞到佇列中

非同步結果(Future)

非同步執行任務,利用Wait/Notify機制來獲取執行結果。

使用執行緒池PooledExecutor

首先這必須是一個可調整的工作者執行緒池,由方法簽名為execute(Runnable r), 需要控制的引數有:
任務佇列的型別、最大執行緒數量、最小執行緒數量、預熱與按需分配執行緒數量,執行緒Keep-Alive時間,飽和策略(阻塞,丟棄,生產者執行等)。

多Reactor執行緒池模型

這種模型主要用於靜態或動態的協調控制CPU、IO速率,由一個主接收者分發Selector到其他的Reactor。

Selector[] selectors; // also create threads
    int next = 0;
    class Acceptor { // ...
        public synchronized void run() { ...
            Socket connection = serverSocket.accept();
            if (connection != null)
                new Handler(selectors[next], connection);
            if (++next == selectors.length) next = 0;
        }
}

模型圖如示:
這裡寫圖片描述

使用NIO特性

NIO支援每個Reactor使用多個Selector,用以繫結不同的Handler到不同的事件中,但需要做好同步協作。還支援自動的檔案到網路或網路到檔案的複製傳輸,記憶體檔案對映,直接記憶體分配。

NIO API 參考

Buffer
ByteBuffer (CharBuffer, LongBuffer, etc not shown.)
Channel
SelectableChannel
SocketChannel
ServerSocketChannel
FileChannel
Selector
SelectionKey

下面對部分API作簡單的介紹

Buffer

abstract class Buffer {
    int capacity();
    int position();
    Buffer position(int newPosition);
    int limit();
    Buffer limit(int newLimit);
    Buffer mark();
    Buffer reset();
    Buffer clear();
    Buffer flip();
    Buffer rewind();
    int remaining();
    boolean hasRemaining();
    boolean isReadOnly();
}

很多方法都是見名之意,Buffer內部維護有四個狀態變數:mark、position、limit、capacity. 其中mark表示標記的位置,而其餘三者之間的關係是:position < limit < capacity. 如圖:
這裡寫圖片描述
這三個變數一起可以跟蹤緩衝區的狀態和它所包含的資料。

Channel

interface Channel {
    boolean isOpen();
    void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
    int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
    int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
    int read(ByteBuffer[] dsts, int offset, int length) throws IOException;
    int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
    int write(ByteBuffer[] srcs, int offset, int length) throws IOException;
    int write(ByteBuffer[] srcs) throws IOException;
}

Channel可看作IO操作的聯結器。其子類來實現不同的行為,例如用於讀,用於寫的Channel, 還有聚合、分散功能的Channel。

Selector

abstract class Selector {
    static Selector open() throws IOException;
    Set keys();
    Set selectedKeys();
    int selectNow() throws IOException;
    int select(long timeout) throws IOException;
    int select() throws IOException;
    void wakeup();
    void close() throws IOException;
}

Selector是一種多路複用選擇器,Selector註冊好的Channel是由SelectionKey來表示。