1. 程式人生 > 程式設計 >Java NIO詳解

Java NIO詳解

前言

本篇主要講解Java中的IO機制和網路通訊中處理高併發的NIO

分為兩塊:
第一塊講解多執行緒下的IO機制
第二塊講解如何在IO機制下優化CPU資源的浪費(New IO)

Echo伺服器

單執行緒下的socket機制就不用我介紹了,不懂得可以去查閱下資料
那麼多執行緒下,如果進行套接字的使用呢?
我們使用最簡單的echo伺服器來幫助大家理解

首先,來看下多執行緒下服務端和客戶端的工作流程圖:

clipboard.png

可以看到,多個客戶端同時向服務端傳送請求
服務端做出的措施是開啟多個執行緒來匹配相對應的客戶端
並且每個執行緒去獨自完成他們的客戶端請求

原理講完了我們來看下是如何實現的
在這裡我寫了一個簡單的伺服器
用到了執行緒池的技術來建立執行緒(具體程式碼作用我已經加了註釋):

public class MyServer {
        private static ExecutorService executorService = Executors.newCachedThreadPool();       //建立一個執行緒池
        private static class HandleMsg implements Runnable{         //一旦有新的客戶端請求,建立這個執行緒進行處理
        Socket client;          //建立一個客戶端
        public HandleMsg(Socket client)
{ //構造傳參繫結 this.client = client; } @Override public void run() { BufferedReader bufferedReader = null; //建立字元快取輸入流 PrintWriter printWriter = null; //建立字元寫入流 try { bufferedReader = new BufferedReader(new
InputStreamReader(client.getInputStream())); //獲取客戶端的輸入流 printWriter = new PrintWriter(client.getOutputStream(),true); //獲取客戶端的輸出流,true是隨時重新整理 String inputLine = null; long a = System.currentTimeMillis(); while ((inputLine = bufferedReader.readLine())!=null){ printWriter.println(inputLine); } long b = System.currentTimeMillis(); System.out.println("此執行緒花費了:"+(b-a)+"秒!"); } catch (IOException e) { e.printStackTrace(); }finally { try { bufferedReader.close(); printWriter.close(); client.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) throws IOException { //服務端的主執行緒是用來迴圈監聽客戶端請求 ServerSocket server = new ServerSocket(8686); //建立一個服務端且埠為8686 Socket client = null; while (true){ //迴圈監聽 client = server.accept(); //服務端監聽到一個客戶端請求 System.out.println(client.getRemoteSocketAddress()+"地址的客戶端連線成功!"); executorService.submit(new HandleMsg(client)); //將該客戶端請求通過執行緒池放入HandlMsg執行緒中進行處理 } } }複製程式碼

上述程式碼中我們使用一個類編寫了一個簡單的echo伺服器
在主執行緒中用死迴圈來開啟埠監聽

簡單客戶端

有了伺服器,我們就可以對其進行訪問,並且傳送一些字串資料
伺服器的功能是返回這些字串,並且打印出執行緒佔用時間

下面來寫個簡單的客戶端來響應服務端:

public class MyClient {
    public static void main(String[] args) throws IOException {
        Socket client = null;
        PrintWriter printWriter = null;
        BufferedReader bufferedReader = null;
        try {
            client = new Socket();
            client.connect(new InetSocketAddress("localhost",8686));
            printWriter = new PrintWriter(client.getOutputStream(),true);
            printWriter.println("hello");
            printWriter.flush();

            bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));            //讀取伺服器返回的資訊並進行輸出
            System.out.println("來自伺服器的資訊是:"+bufferedReader.readLine());
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            printWriter.close();
            bufferedReader.close();
            client.close();
        }
    }
}複製程式碼

程式碼中,我們用字元流傳送了一個hello字串過去,如果程式碼沒問題
伺服器會返回一個hello資料,並且打印出我們設定的日誌資訊

echo伺服器結果展示

我們來執行:
1.開啟server,開啟迴圈監聽:

clipboard.png

2.開啟一個客戶端:

clipboard.png

可以看到客戶端打印出了返回結果

3.檢視服務端日誌:

clipboard.png

很好,一個簡單的多執行緒套接字程式設計就實現了

但是試想一下:
如果一個客戶端請求中,在IO寫入到服務端過程中加入Sleep,
使每個請求佔用服務端執行緒10秒
然後有大量的客戶端請求,每個請求都佔用那麼長時間
那麼服務端的並能能力就會大幅度下降
這並不是因為服務端有多少繁重的任務,而僅僅是因為服務執行緒在等待IO(因為accept,read,write都是阻塞式的)
讓高速執行的CPU去等待及其低效的網路IO是非常不合算的行為

這時候該怎麼辦?

NIO

New IO成功的解決了上述問題,它是怎樣解決的呢?
IO處理客戶端請求的最小單位是執行緒
而NIO使用了比執行緒還小一級的單位:通道(Channel)
可以說,NIO中只需要一個執行緒就能完成所有接收,讀,寫等操作

要學習NIO,首先要理解它的三大核心
Selector,選擇器
Buffer,緩衝區
Channel,通道

博主不才,畫了張醜圖給大家加深下印象 ^ . ^

clipboard.png

再給一張TCP下的NIO工作流程圖(好難畫的線條...)

clipboard.png

大家大致看懂就行,我們一步步來

Buffer

首先要知道什麼是Buffer
在NIO中資料互動不再像IO機制那樣使用流
而是使用Buffer(緩衝區)

博主覺得圖才是最容易理解的
所以...

clipboard.png

可以看出Buffer在整個工作流程中的位置

buffer實際上是一個容器,一個連續陣列,它通過幾個變數來儲存這個資料的當前位置狀態:
1.capacity:容量,緩衝區能容納元素的數量
2.position:當前位置,是緩衝區中下一次發生讀取和寫入操作的索引,當前位置通過大多數讀寫操作向前推進
3.limit:界限,是緩衝區中最後一個有效位置之後下一個位置的索引
如圖:

clipboard.png

幾個常用方法:

.flip()        //將limit設定為position,然後position重置為0,返回對緩衝區的引用
.clear()        //清空呼叫緩衝區並返回對緩衝區的引用複製程式碼

來點實際點的,上面圖中的具體程式碼如下:

1.首先給Buffer分配空間,以位元組為單位

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);複製程式碼

建立一個ByteBuffer物件並且指定記憶體大小

2.向Buffer中寫入資料:

1).資料從ChannelBufferchannel.read(byteBuffer);
2).資料從ClientBufferbyteBuffer.put(...);複製程式碼

3.從Buffer中讀取資料:

1).資料從BufferChannelchannel.write(byteBuffer);
2).資料從BufferServerbyteBuffer.get(...);複製程式碼

Selector

選擇器是NIO的核心,它是channel的管理者
通過執行select()阻塞方法,監聽是否有channel準備好
一旦有資料可讀,此方法的返回值是SelectionKey的數量

所以服務端通常會死迴圈執行select()方法,直到有channl準備就緒,然後開始工作
每個channel都會和Selector繫結一個事件,然後生成一個SelectionKey的物件
需要注意的是:
channel和Selector繫結時,channel必須是非阻塞模式
而FileChannel不能切換到非阻塞模式,因為它不是套接字通道,所以FileChannel不能和Selector繫結事件

在NIO中一共有四種事件:
1.SelectionKey.OP_CONNECT:連線事件
2.SelectionKey.OP_ACCEPT:接收事件
3.SelectionKey.OP_READ:讀事件
4.SelectionKey.OP_WRITE:寫事件

Channel

共有四種通道:
FileChannel:作用於IO檔案流
DatagramChannel:作用於UDP協議
SocketChannel:作用於TCP協議
ServerSocketChannel:作用於TCP協議

本篇文章通過常用的TCP協議來講解NIO

我們以ServerSocketChannel為例:

開啟一個ServerSocketChannel通道

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();複製程式碼

關閉ServerSocketChannel通道:

serverSocketChannel.close();複製程式碼

迴圈監聽SocketChannel:

while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    clientChannel.configureBlocking(false);
}複製程式碼

clientChannel.configureBlocking(false);語句是將此通道設定為非阻塞,也就是非同步
自由控制阻塞或非阻塞便是NIO的特性之一

SelectionKey

SelectionKey是通道和選擇器互動的核心元件
比如在SocketChannel上繫結一個Selector,並註冊為連線事件:

SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
clientChannel.connect(new InetSocketAddress(port));
clientChannel.register(selector,SelectionKey.OP_CONNECT);複製程式碼

核心在register()方法,它返回一個SelectionKey物件
來檢測channel事件是那種事件可以使用以下方法:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();複製程式碼

服務端便是通過這些方法 在輪詢中執行相對應操作

當然通過Channel與Selector繫結的key也可以反過來拿到他們

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();複製程式碼

在Channel上註冊事件時,我們也可以順帶繫結一個Buffer:

clientChannel.register(key.selector(),SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));複製程式碼

或者繫結一個Object:

selectionKey.attach(Object);
Object anthorObj = selectionKey.attachment();複製程式碼

NIO的TCP服務端

講了這麼多,都是理論
我們來看下最簡單也是最核心的程式碼(加那麼多註釋很不優雅,但方便大家看懂):

package cn.blog.test.NioTest;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;


public class MyNioServer {
    private Selector selector;          //建立一個選擇器
    private final static int port = 8686;
    private final static int BUF_SIZE = 10240;

    private void initServer() throws IOException {
        //建立通道管理器物件selector
        this.selector=Selector.open();

        //建立一個通道物件channel
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.configureBlocking(false);       //將通道設定為非阻塞
        channel.socket().bind(new InetSocketAddress(port));       //將通道繫結在8686埠

        //將上述的通道管理器和通道繫結,併為該通道註冊OP_ACCEPT事件
        //註冊事件後,當該事件到達時,selector.select()會返回(一個key),如果該事件沒到達selector.select()會一直阻塞
        SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT);

        while (true){       //輪詢
            selector.select();          //這是一個阻塞方法,一直等待直到有資料可讀,返回值是key的數量(可以有多個)
            Set keys = selector.selectedKeys();         //如果channel有資料了,將生成的key訪入keys集合中
            Iterator iterator = keys.iterator();        //得到這個keys集合的迭代器
            while (iterator.hasNext()){             //使用迭代器遍歷集合
                SelectionKey key = (SelectionKey) iterator.next();       //得到集合中的一個key例項
                iterator.remove();          //拿到當前key例項之後記得在迭代器中將這個元素刪除,非常重要,否則會出錯
                if (key.isAcceptable()){         //判斷當前key所代表的channel是否在Acceptable狀態,如果是就進行接收
                    doAccept(key);
                }else if (key.isReadable()){
                    doRead(key);
                }else if (key.isWritable() && key.isValid()){
                    doWrite(key);
                }else if (key.isConnectable()){
                    System.out.println("連線成功!");
                }
            }
        }
    }

    public void doAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        System.out.println("ServerSocketChannel正在迴圈監聽");
        SocketChannel clientChannel = serverChannel.accept();
        clientChannel.configureBlocking(false);
        clientChannel.register(key.selector(),SelectionKey.OP_READ);
    }

    public void doRead(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
        long bytesRead = clientChannel.read(byteBuffer);
        while (bytesRead>0){
            byteBuffer.flip();
            byte[] data = byteBuffer.array();
            String info = new String(data).trim();
            System.out.println("從客戶端傳送過來的訊息是:"+info);
            byteBuffer.clear();
            bytesRead = clientChannel.read(byteBuffer);
        }
        if (bytesRead==-1){
            clientChannel.close();
        }
    }

    public void doWrite(SelectionKey key) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
        byteBuffer.flip();
        SocketChannel clientChannel = (SocketChannel) key.channel();
        while (byteBuffer.hasRemaining()){
            clientChannel.write(byteBuffer);
        }
        byteBuffer.compact();
    }

    public static void main(String[] args) throws IOException {
        MyNioServer myNioServer = new MyNioServer();
        myNioServer.initServer();
    }
}
複製程式碼

我列印了監聽channel,告訴大家ServerSocketChannel是在什麼時候開始執行的
如果配合NIO客戶端的debug,就能很清楚的發現,進入select()輪詢前
雖然已經有了ACCEPT事件的KEY,但select()預設並不會去呼叫
而是要等待有其它感興趣事件被select()捕獲之後,才會去呼叫ACCEPT的SelectionKey
這時候ServerSocketChannel才開始進行迴圈監聽

也就是說一個Selector中,始終保持著ServerSocketChannel的執行
serverChannel.accept();真正做到了非同步(在initServer方法中的channel.configureBlocking(false);)
如果沒有接受到connect,會返回一個null
如果成功連線了一個SocketChannel,則此SocketChannel會註冊寫入(READ)事件
並且設定為非同步

NIO的TCP客戶端

有服務端必定有客戶端
其實如果能完全理解了服務端
客戶端的程式碼大同小異

package cn.blog.test.NioTest;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class MyNioClient {
    private Selector selector;          //建立一個選擇器
    private final static int port = 8686;
    private final static int BUF_SIZE = 10240;
    private static ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);

    private void  initClient() throws IOException {
        this.selector = Selector.open();
        SocketChannel clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        clientChannel.connect(new InetSocketAddress(port));
        clientChannel.register(selector,SelectionKey.OP_CONNECT);
        while (true){
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isConnectable()){
                    doConnect(key);
                }else if (key.isReadable()){
                    doRead(key);
                }
            }
        }
    }

    public void doConnect(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        if (clientChannel.isConnectionPending()){
            clientChannel.finishConnect();
        }
        clientChannel.configureBlocking(false);
        String info = "服務端你好!!";
        byteBuffer.clear();
        byteBuffer.put(info.getBytes("UTF-8"));
        byteBuffer.flip();
        clientChannel.write(byteBuffer);
        //clientChannel.register(key.selector(),SelectionKey.OP_READ);
        clientChannel.close();
    }

    public void doRead(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        clientChannel.read(byteBuffer);
        byte[] data = byteBuffer.array();
        String msg = new String(data).trim();
        System.out.println("服務端傳送訊息:"+msg);
        clientChannel.close();
        key.selector().close();
    }

    public static void main(String[] args) throws IOException {
        MyNioClient myNioClient = new MyNioClient();
        myNioClient.initClient();
    }
}
複製程式碼

輸出結果

這裡我開啟一個服務端,兩個客戶端:

clipboard.png

接下來,你可以試下同時開啟一千個客戶端,只要你的CPU夠給力,服務端就不可能因為阻塞而降低效能


介紹其他各種I/O對比

屬性\模型 阻塞BIO 非阻塞NIO 非同步AIO
blocking 阻塞並同步 非阻塞但同步 非阻塞並非同步
執行緒數(server:client) 1:1 1:N 0:N
複雜度 簡單 較複雜 複雜
吞吐量

具體使用得依據業務的實際應用場景和效能需求而定,如果客戶端很少,併發量不大,那麼完全可以選擇BIO,不過得加入執行緒池管理;相反要求併發較高的話,就應該採用NIO框架了。

同步、非同步、阻塞、非阻塞概念

1. 同步,就是我呼叫一個功能,該功能沒有結束前,我死等結果。
2. 非同步,就是我呼叫一個功能,不需要知道該功能結果,該功能有結果後通知我(回撥通知)。
3. 阻塞,就是呼叫我(函式),我(函式)沒有接收完資料或者沒有得到結果之前,我不會返回。
4. 非阻塞,就是呼叫我(函式),我(函式)立即返回,通過select通知呼叫者。

簡單總結 

BIO、NIO、AIO概念認知:

  • Java BIO : 同步並阻塞,伺服器實現模式為一個連線一個執行緒,即客戶端有連線請求時伺服器端就需要啟動一個執行緒進行處理,如果這個連線不做任何事情會造成不必要的執行緒開銷,當然可以通過執行緒池機制改善。
  • Java NIO : 同步非阻塞,伺服器實現模式為一個請求一個執行緒,即客戶端傳送的連線請求都會註冊到多路複用器上,多路複用器輪詢到連線有I/O請求時才啟動一個執行緒進行處理。
  • Java AIO(NIO.2) : 非同步非阻塞,伺服器實現模式為一個有效請求一個執行緒,客戶端的I/O請求都是由OS先完成了再通知伺服器應用去啟動執行緒進行處理。

BIO、NIO、AIO適用場景分析:

  • BIO方式適用於連線數目比較小且固定的架構,這種方式對伺服器資源要求比較高,併發侷限於應用中,JDK1.4以前的唯一選擇,但程式直觀簡單易理解。
  • NIO方式適用於連線數目多且連線比較短(輕操作)的架構,比如聊天伺服器,併發侷限於應用中,程式設計比較複雜,JDK1.4開始支援。
  • AIO方式使用於連線數目多且連線比較長(重操作)的架構,比如相簿伺服器,充分呼叫OS參與併發操作,程式設計比較複雜,JDK7開始支援。