1. 程式人生 > 其它 >1.Java-IO演進之路

1.Java-IO演進之路

概念

必須明白的概念

阻塞block和非阻塞non-block

阻塞和非阻塞是程序在訪問資料的時候,資料是否準備就緒的一種處理方式,當資料沒有準備的時候。

阻塞:往往需要等待緩衝區中的資料準備好過後才處理其他的事情,否則一直等待在那裡。

非阻塞:當我們的程序訪問我們的資料緩衝區的時候,如果資料沒有準備好則直接返回,不會等待。如果資料已經 準備好,也直接返回。

同步synchronization和非同步asynchronous

同步和非同步都是基於應用程式和作業系統處理 IO 事件所採用的方式。比如同步:是應用程式要直接參與 IO 讀寫 的操作。非同步:所有的 IO 讀寫交給作業系統去處理,應用程式只需要等待通知。

同步方式在處理 IO 事件的時候,必須阻塞在某個方法上面等待我們的 IO 事件完成(阻塞 IO 事件或者通過輪詢 IO 事件的方式),對於非同步來說,所有的 IO 讀寫都交給了作業系統。這個時候,我們可以去做其他的事情,並不需要去完 成真正的 IO 操作,當操作完成 IO 後,會給我們的應用程式一個通知。

同步 : 阻塞到 IO 事件,阻塞到 read 或則 write。這個時候我們就完全不能做自己的事情。讓讀寫方法加入到線 程裡面,然後阻塞執行緒來實現,對執行緒的效能開銷比較大。

BIO 與 NIO 對比

面向流與面向緩衝

Java NIO 和 BIO 之間第一個最大的區別是,BIO 是面向流的,NIO 是面向緩衝區的。 Java BIO 面向流意味著每 次從流中讀一個或多個位元組,直至讀取所有位元組,它們沒有被快取在任何地方。此外,它不能前後移動流中的資料。 如果需要前後移動從流中讀取的資料,需要先將它快取到一個緩衝區。 Java NIO 的緩衝導向方法略有不同。資料讀取 到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是 否該緩衝區中包含所有您需要處理的資料。而且,需確保當更多的資料讀入緩衝區時,不要覆蓋緩衝區裡尚未處理的 資料。

阻塞與非阻塞

Java BIO 的各種流是阻塞的。這意味著,當一個執行緒呼叫 read() 或 write()時,該執行緒被阻塞,直到有一些資料被 讀取,或資料完全寫入。該執行緒在此期間不能再幹任何事情了。 Java NIO 的非阻塞模式,使一個執行緒從某通道傳送請 求讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會獲取。而不是保持執行緒阻塞, 所以直至資料變的可以讀取之前,該執行緒可以繼續做其他的事情。 非阻塞寫也是如此。一個執行緒請求寫入一些資料到 某通道,但不需要等待它完全寫入,這個執行緒同時可以去做別的事情。 執行緒通常將非阻塞 IO 的空閒時間用於在其它 通道上執行 IO 操作,所以一個單獨的執行緒現在可以管理多個輸入和輸出通道(channel)。

選擇器的問世

Java NIO 的選擇器(Selector)允許一個單獨的執行緒來監視多個輸入通道,你可以註冊多個通道使用一個選擇器,然 後使用一個單獨的執行緒來“選擇”通道:這些通道里已經有可以處理的輸入,或者選擇已準備寫入的通道。這種選擇機制, 使得一個單獨的執行緒很容易來管理多個通道。

如何影響應用程式的設計

無論您選擇 BIO 或 NIO 工具箱,可能會影響您應用程式設計的以下幾個方面:

A.對 NIO 或 BIO 類的 API 呼叫。

B.資料處理邏輯。

C.用來處理資料的執行緒數。

[¶](#API 呼叫)1.API 呼叫

當然,使用 NIO 的 API 呼叫時看起來與使用 BIO 時有所不同,但這並不意外,因為並不是僅從一個 InputStream 逐位元組讀取,而是資料必須先讀入緩衝區再處理。

2.資料處理

(Java BIO: 從一個阻塞的流中讀資料) 而一個 NIO 的實現會有所不同,下面是一個簡單的例子:

ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);

注意第二行,從通道讀取位元組到 ByteBuffer。當這個方法呼叫返回時,你不知道你所需的所有資料是否在緩衝區內。 你所知道的是,該緩衝區包含一些位元組,這使得處理有點困難。

假設第一次 read(buffer)呼叫後,讀入緩衝區的資料只有半行,例如,“Name:An”,你能處理資料嗎?顯然不能, 需要等待,直到整行資料讀入快取,在此之前,對資料的任何處理毫無意義.

ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
while(!bufferFull(bytesRead)) {
    bytesRead = inChannel.read(buffer);
}

bufferFull()方法必須跟蹤有多少資料讀入緩衝區,並返回真或假,這取決於緩衝區是否已滿。換句話說,如果緩衝區準備好被處理,那麼表示緩衝區滿了。

bufferFull()方法掃描緩衝區,但必須保持在 bufferFull()方法被呼叫之前狀態相同。如果沒有,下一個讀入緩衝區的 資料可能無法讀到正確的位置。這是不可能的,但卻是需要注意的又一問題。

如果緩衝區已滿,它可以被處理。如果它不滿,並且在你的實際案例中有意義,你或許能處理其中的部分資料。 但是許多情況下並非如此。

3.設定處理執行緒數

NIO 可讓您只使用一個(或幾個)單執行緒管理多個通道(網路連線或檔案),但付出的代價是解析資料可能會比 從一個阻塞流中讀取資料更復雜。

如果需要管理同時開啟的成千上萬個連線,這些連線每次只是傳送少量的資料,例如聊天伺服器,實現 NIO 的服 務器可能是一個優勢。

Java NIO: 單執行緒管理多個連線

如果你有少量的連線使用非常高的頻寬,一次傳送大量的資料,也許典型的 IO 伺服器實現可能非常契合。下圖說明了 一個典型的 IO 伺服器設計.

Java BIO: 一個典型的 IO 伺服器設計- 一個連線通過一個執行緒處理。

BIO

public class BIOServer {
    ServerSocket serverSocket;
    public BIOServer(int port) {
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("BIO服務已啟動,監聽埠是:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void listen() throws IOException {
        while (true) {
            // 等待客戶端連線,阻塞方法
            // Socket資料傳送者在服務端的引用
            Socket client = serverSocket.accept();
            InputStream is = client.getInputStream();

            // 網路的客戶端把資料傳送到網絡卡,機器所得到的資料讀到JVM中
            byte[] buff = new byte[1024];

            int len = is.read(buff);

            if (len > 0) {
                String msg = new String(buff, 0, len);
                System.out.println("收到" + msg);
            }
        }
    }
    public static void main(String[] args) {
        try {
            new BIOServer(8080).listen();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class BIOClient {

    public static void main(String[] args) throws IOException {

        // 要和誰進行通訊,IP:PORT
        Socket client = new Socket("localhost", 8080);

        // 不管是客戶端還是服務端,都有可能write和read
        OutputStream os = client.getOutputStream();

        String name = UUID.randomUUID().toString();

        System.out.println("客戶端傳送資料" + name);

        os.write(name.getBytes());

        os.close();

        client.close();
    }
}

NIO

public class NIOServer {

    private int port = 8080;

    // 準備兩個東西
    // 輪詢器 Selector
    private Selector selector;
    // 緩衝區 Buffer

    private ByteBuffer buffer = ByteBuffer.allocate(1024);

    // 初始化完畢
    public NIOServer(int port) {
        // 初始化
        try {
            this.port = port;
            ServerSocketChannel server = ServerSocketChannel.open();

            // 告訴地址 ip:port
            server.bind(new InetSocketAddress(this.port));

            // BIO升級版本NIO NIO模型預設採用阻塞
            server.configureBlocking(false);

            selector = Selector.open();

            // 開始接受訊息
            server.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void listen() {
        System.out.println("listen on:" + this.port);
        try {
            while (true) {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                // 不斷的輪詢
                Iterator<SelectionKey> iter = keys.iterator();
                // 同步體現在這裡,因為每次只能處理一種狀態
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    process(key);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 每一次輪詢就是呼叫一次process方法,而每一次呼叫,都只能幹一件事
    private void process(SelectionKey key) throws IOException {
        // 針對每種狀態給一個反應
        if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            // 這個方法體現非阻塞,不管你資料有沒有準備好,都要給我一個狀態和反饋
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            // 當資料準備就緒的時候,將狀態改為可讀
            key = channel.register(selector, SelectionKey.OP_READ);
        }
        else if (key.isReadable()) {
            // key.channel 多路複用器中拿到客戶端的引用
            SocketChannel channel = (SocketChannel) key.channel();
            int len = channel.read(buffer);
            if (len > 0) {
                buffer.flip();
                String content = new String(buffer.array(), 0, len);
                channel.register(selector, SelectionKey.OP_WRITE);

                // 在key上攜帶一個附件,一會再寫出去
                key.attach(content);
                System.out.println("讀取內容" + content);
            }
        }
        else if (key.isWritable()) {
            SocketChannel channel = (SocketChannel) key.channel();
            String content = (String) key.attachment();
            channel.write(ByteBuffer.wrap(("輸出:" + content).getBytes()));
            channel.close();
        }
    }

    public static void main(String[] args) {
        new NIOServer(8080).listen();
    }
}

[¶](# Java AIO 詳解) Java AIO 詳解

jdk1.7 (NIO2)才是實現真正的非同步 AIO、把 IO 讀寫操作完全交給作業系統,學習了 linux epoll 模式.

[¶](# Java AIO 基本原理) Java AIO 基本原理

服務端:AsynchronousServerSocketChannel

客服端:AsynchronousSocketChannel

使用者處理器:CompletionHandler 介面,這個介面實現應用程式向作業系統發起 IO 請求,當完成後處理具體邏輯,否則做 自己該做的事情。

“真正”的非同步IO需要作業系統更強的支援。在IO多路複用模型中,事件迴圈將檔案控制代碼的狀態事件通知給使用者執行緒, 由使用者執行緒自行讀取資料、處理資料。而在非同步IO模型中,當用戶執行緒收到通知時,資料已經被核心讀取完畢,並放 在了使用者執行緒指定的緩衝區內,核心在IO完成後通知使用者執行緒直接使用即可。非同步IO模型使用了Proactor設計模式實 現了這一機制.

public class AIOServer {

    private final int port;

    public static void main(String args[]) {
        int port = 8000;
        new AIOServer(port);
    }

    public AIOServer(int port) {
        this.port = port;
        listen();
    }

    private void listen() {
        try {
            ExecutorService executorService = Executors.newCachedThreadPool();
            AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            //開門營業
            //工作執行緒,用來偵聽回撥的,事件響應的時候需要回調
            final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup);
            server.bind(new InetSocketAddress(port));
            System.out.println("服務已啟動,監聽埠" + port);

            //準備接受資料
            server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
                final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                //實現completed方法來回調
                //由作業系統來觸發
                //回撥有兩個狀態,成功
                public void completed(AsynchronousSocketChannel result, Object attachment){
                    System.out.println("IO操作成功,開始獲取資料");
                    try {
                        buffer.clear();
                        result.read(buffer).get();
                        buffer.flip();
                        result.write(buffer);
                        buffer.flip();
                    } catch (Exception e) {
                        System.out.println(e.toString());
                    } finally {
                        try {
                            result.close();
                            server.accept(null, this);
                        } catch (Exception e) {
                            System.out.println(e.toString());
                        }
                    }

                    System.out.println("操作完成");
                }

                @Override
                //回撥有兩個狀態,失敗
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("IO操作是失敗: " + exc);
                }
            });

            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
        } catch (IOException e) {
            System.out.println(e);
        }
    }
}
public class AIOClient {
    private final AsynchronousSocketChannel client;

    public AIOClient() throws Exception{
        client = AsynchronousSocketChannel.open();
    }

    public void connect(String host,int port)throws Exception{
        client.connect(new InetSocketAddress(host,port),null,new CompletionHandler<Void,Void>() {
            @Override
            public void completed(Void result, Void attachment) {
                try {
                    client.write(ByteBuffer.wrap("這是一條測試資料".getBytes())).get();
                    System.out.println("已傳送至伺服器");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });
        final ByteBuffer bb = ByteBuffer.allocate(1024);
        client.read(bb, null, new CompletionHandler<Integer,Object>(){

                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("IO操作完成" + result);
                        System.out.println("獲取反饋結果" + new String(bb.array()));
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        exc.printStackTrace();
                    }
                }
        );

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException ex) {
            System.out.println(ex);
        }

    }

    public static void main(String args[])throws Exception{
        new AIOClient().connect("localhost",8000);
    }
}