1. 程式人生 > >java網路程式設計(三)----同步非阻塞nio及reactor模型

java網路程式設計(三)----同步非阻塞nio及reactor模型

很多剛接觸NIO的人,第一眼看到的就是Java相對晦澀的API,比如:Channel,Selector,Socket什麼的;然後就是一坨上百行的程式碼來演示NIO的服務端Demo,所以這裡我們人性化地簡單介紹一下。

NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。
新增的著兩種通道都支援阻塞和非阻塞兩種模式。阻塞模式使用就像傳統中的支援一樣,比較簡單,但是效能和可靠性都不好;非阻塞模式正好與之相反。
對於低負載、低併發的應用程式,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網路)應用,應使用NIO的非阻塞模式來開發。

java nio基礎實現

  1. 緩衝區 Buffer
    Buffer是一個物件,包含一些要寫入或者讀出的資料。
    在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的;在寫入資料時,也是寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作。
    緩衝區實際上是一個數組,並提供了對資料結構化訪問以及維護讀寫位置等資訊。
    具體的快取區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的介面:Buffer。

  2. 通道 Channel
    我們對資料的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。
    底層的作業系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的對映底層作業系統的API。
    Channel主要分兩大類:

    • SelectableChannel:使用者網路讀寫
    • FileChannel:用於檔案操作

    後面程式碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

  3. 多路複用器 Selector
    Selector是Java NIO 程式設計的基礎。
    Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

    一個Selector可以同時輪詢多個Channel,只需要一個執行緒負責Selector的輪詢,就可以接入成千上萬的客戶端。

nio使用

回憶BIO模型,之所以需要多執行緒,是因為在進行I/O操作的時候,一是沒有辦法知道到底能不能寫、能不能讀,只能”傻等”,即使通過各種估算,算出來作業系統沒有能力進行讀寫,也沒法在socket.read()和socket.write()函式中返回,這兩個函式無法進行有效的中斷。所以除了多開執行緒另起爐灶,沒有好的辦法利用CPU。

NIO的讀寫函式可以立刻返回,這就給了我們不開執行緒利用CPU的最好機會:如果一個連線不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來,記錄的方式通常是在Selector上註冊標記位,然後切換到其它就緒的連線(channel)繼續進行讀寫。

思考一下在Socket網路通訊中有什麼事件:

  1. 在伺服器端我們需要接收客戶端連線,這裡就有一個接收“Accept”事件;
  2. 而客戶端連線伺服器,連線有一個“Connect”事件
  3. 各自進行讀操作時,有一個“read”事件
  4. 各自進行寫操作時,有一個“write”事件

而上面我們說到連線被抽象為了Channel,這時我們就可以在多路複用器Selector上註冊通道和事件。

  1. 在伺服器端:ServerSocketChannel.register(Selector, SelectionKey.OP_ACCEPT);
    將ServerSocketChannel註冊在Selector並繫結SelectionKey.OP_ACCEPT事件;
  2. 呼叫多路複用器Selector.select();
  3. 如果channel已經發生了接收客戶端的事件,那麼就能被多路複用器select到,然後獲取到和客戶端連線的Socket,再將這個socket的讀寫事件註冊到Selector上
  4. 同樣可以客戶端也是將其他事件註冊到Selector,然後事件被觸發後就可以被select()函式找到
  5. 最後程式不斷輪詢selector,根據select到的不同事件型別呼叫對應的handler進行處理。

select()函式呼叫的是系統底層函式,在Linux 2.6之前是select、poll,2.6之後是epoll,Windows是IOCP。

虛擬碼如下:

   interface ChannelHandler{
      void channelReadable(Channel channel);
      void channelWritable(Channel channel);
   }
   class Channel{
     Socket socket;
     Event event;//讀,寫或者連線
   }

   //IO執行緒主迴圈:
   class IoThread extends Thread{
   public void run(){
   Channel channel;
   //選擇就緒的事件和對應的連線
   while(channel=Selector.select()){
      if(channel.event==accept){
          //觸發了accept事件的是新連線,
          //我們需要為這個新連線註冊讀寫事件
         registerNewChannelHandler(channel);
      }
      if(channel.event==write){
          //如果可以寫,則執行寫事件
         getChannelHandler(channel).channelWritable(channel);
      }
      if(channel.event==read){
      //如果可以讀,則執行讀事件
          getChannelHandler(channel).channelReadable(channel);
      }
    }
   }
   //所有channel的對應事件處理器
   Map<Channel,ChannelHandler> handlerMap;
  }

注意,select是阻塞的,無論是通過作業系統的通知(epoll)還是不停的輪詢(select,poll),這個函式是阻塞的。所以你可以放心大膽地在一個while(true)裡面呼叫這個函式而不用擔心CPU空轉。同時我們也可以通過設定週期的方式呼叫Selector.select(1000)(每一秒select一次)。

實現nio的完整程式碼在最後

nio與bio對比

所有的系統I/O都分為兩個階段:等待就緒和操作。舉例來說,讀函式,分為等待系統可讀和真正的讀;同理,寫函式分為等待網絡卡可以寫和真正的寫。

需要說明的是等待就緒的阻塞是不使用CPU的,是在“空等”;而真正的讀寫操作的阻塞是使用CPU的,真正在”幹活”,而且這個過程非常快,屬於memory copy,頻寬通常在1GB/s級別以上,可以理解為基本不耗時。

下圖是幾種常見I/O模型的對比:

各類IO對比

java的nio便是第二或者第三種形式:

  • 第二種:在一個while迴圈裡面不斷每隔一秒檢查selector裡面是否有事件被觸發Selector.select(1000)
  • 第三種:Selector.select()會檢查是是否有就緒事件,沒有的話會阻塞直到有就緒事件函式才會返回。

Reactor

這裡寫圖片描述

圖解:
1. Handle:在網路程式設計中,這裡一般指Socket Handle,即一個網路連線(Connection,在Java NIO中的Channel)。這個Channel註冊到Synchronous Event Demultiplexer中,以監聽Handle中發生的事件,對ServerSocketChannnel可以是CONNECT事件,對SocketChannel可以是READ、WRITE、CLOSE事件等。
2. Synchronous Event Demultiplexer:多路複用器,阻塞等待一系列的Handle中的事件到來,如果阻塞等待返回,即表示在返回的Handle中可以不阻塞的執行返回的事件型別。這個模組一般使用作業系統的select來實現。在Java NIO中用Selector來封裝,當Selector.select()返回時,可以呼叫Selector的selectedKeys()方法獲取Set,一個SelectionKey表達一個有事件發生的Channel以及該Channel上的事件型別。
3. Initiation Dispatcher:Reactor模式的主要模組,事件排程器,通常被稱為reactor,用於管理Event Handler,即EventHandler的容器,用以註冊、移除EventHandler等;另外,它還作為Reactor模式的入口呼叫,在這個模組裡面呼叫Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,當阻塞等待返回時,根據事件發生的Handle將其分發給對應的Event Handler處理,即回撥EventHandler中的handle_event()方法。
4. 定義事件處理方法:handle_event(),以供InitiationDispatcher回撥使用。

需要注意的是:在java nio中我們註冊了建立連線相關的“Accept”“Connect”事件後,再進行讀寫操作要將讀寫事件再註冊到多路複用器,下次讀寫事件發生時才會被select到。這就是傳說中的多路複用IO。

另外Reactor翻譯為“反應”器,名字中”反應“的由來:

“反應”即“倒置”,“控制逆轉”

具體事件處理程式不呼叫反應器,而是由反應器分配一個具體事件處理程式,具體事件處理程式對某個指定的事件發生做出反應;這種控制逆轉又稱為“好萊塢法則”(不要呼叫我,讓我來呼叫你)

nio實現程式碼

一、server端程式碼

public class Server {
    private static synchronized void start(int port){
        ServerHandle serverHandle = new ServerHandle(port);
        new Thread(serverHandle,"Server").start();
    }
    public static void main(String[] args){
        start(12345);
    }
}

server處理類ServerHandle:

class ServerHandle implements Runnable {
    private Selector m_Selector;
    private ServerSocketChannel m_ServerChannel;
    private volatile boolean m_Started;
    public ServerHandle(int vPort) {
        try {
            m_Selector = Selector.open();
            //伺服器監聽通道
            ServerSocketChannel tServerChannel = ServerSocketChannel.open();
            //如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
            tServerChannel.configureBlocking(false);
            tServerChannel.bind(new InetSocketAddress(vPort), 1024);
            //註冊伺服器通道到selector,監聽ACCEPT事件
            tServerChannel.register(m_Selector, SelectionKey.OP_ACCEPT);
            m_Started = true;
            System.out.println("伺服器已啟動,埠號:" + vPort);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void stop() {
        m_Started = false;
    }

    @Override
    public void run() {
        while (m_Started) {
            try {
                //無論是否有讀寫事件發生,selector在1s後被喚醒一次
                m_Selector.select(1000);
                //或者通過下面這種方式:
                //會阻塞直到selector裡面有就緒事件
                //m_Selector.select();
                Set<SelectionKey> tKeys = m_Selector.selectedKeys();
                Iterator<SelectionKey> tIterator = tKeys.iterator();
                SelectionKey tKey;
                //輪詢
                while (tIterator.hasNext()) {
                    tKey = tIterator.next();
                    tIterator.remove();
                    try {
                        handleInput(tKey);
                    } catch (IOException e) {
                        if (null != tKey) {
                            tKey.cancel();
                            if (null == tKey.channel()) {
                                tKey.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
        //完成後關閉select
        //關閉Selector會自動關閉裡面的資源
        if (null != m_Selector) {
            try {
                m_Selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey vKey) throws IOException {
        if (vKey.isValid()) {
            if (vKey.isAcceptable()) {
                ServerSocketChannel tServerChannel = (ServerSocketChannel) vKey.channel();
                SocketChannel tSocketChannel = tServerChannel.accept();
                tSocketChannel.configureBlocking(false);
                //拿到新連線註冊讀事件
                tSocketChannel.register(m_Selector, SelectionKey.OP_READ);
            }
        }
        //讀訊息
        if (vKey.isReadable()) {
            SocketChannel tSocketChannel = (SocketChannel) vKey.channel();
            //得用buffer
            ByteBuffer tBuffer = ByteBuffer.allocate(1024);
            int tReadBytes = tSocketChannel.read(tBuffer);
            if (0 < tReadBytes) {
                tBuffer.flip();
                byte[] tBytes = new byte[tBuffer.remaining()];
                tBuffer.get(tBytes);
                String tExpression = new String(tBytes, "UTF-8");
                System.out.println("伺服器收到訊息:" + tExpression);
                String tResult = "這句話是伺服器發過來的";
                doWrite(tSocketChannel, tResult);
            }
            //讀取不到訊息關閉資源
            else if (0 > tReadBytes) {
               //即使key被cancel了他的channel還在的
                vKey.cancel();
                tSocketChannel.close();
            }
        }
    }

//傳送資料
    private void doWrite(SocketChannel vChannel, String vResponse) throws IOException {
        byte[] tBytes = vResponse.getBytes("UTF-8");
        ByteBuffer tWriteBuffer = ByteBuffer.allocate(tBytes.length);
        tWriteBuffer.put(tBytes);
        tWriteBuffer.flip();
        vChannel.write(tWriteBuffer);
    }
}

二、客戶端程式碼:

public class Client {
    private static String DEFAULT_HOST = "127.0.0.1";
    private static int DEFAULT_PORT = 12345;
    private static ClientHandle clientHandle;
    public static void start(){
        start(DEFAULT_HOST,DEFAULT_PORT);
    }
    public static synchronized void start(String ip,int port){
        if(null != clientHandle) clientHandle.stop();
        clientHandle = new ClientHandle(ip,port);
        new Thread(clientHandle,"Server").start();
    }
    //向伺服器傳送訊息
    public static boolean sendMsg(String msg) throws Exception{
        clientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception {
        start();
        while(Client.sendMsg(new Scanner(System.in).nextLine()));
    }
}

客戶端處理類ServerHandle:

class ClientHandle implements Runnable {
    private String m_Host;
    private int m_Port;
    private Selector m_Selector;
    private SocketChannel m_SocketChannel;
    private volatile boolean m_Started;

    ClientHandle(String vIp, int vPort) {
        m_Host = vIp;
        m_Port = vPort;

        try {
            m_Selector = Selector.open();
            m_SocketChannel = SocketChannel.open();
            m_SocketChannel.configureBlocking(false);
            m_Started = true;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    void stop() {
        m_Started = false;
    }


    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            System.out.println("客戶端連線失敗");
        }
        while (m_Started) {
            try {
                m_Selector.select(1000);
                Set<SelectionKey> tKeys = m_Selector.selectedKeys();
                Iterator<SelectionKey> tIterator = tKeys.iterator();
                SelectionKey tKey;
                while (tIterator.hasNext()) {
                    tKey = tIterator.next();
                    tIterator.remove();
                    try {
                        handleInput(tKey);
                    } catch (IOException e) {
                        if (null != tKey) {
                            tKey.cancel();
                            if (null != tKey.channel()) {
                                tKey.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
        //關閉selector會關閉它裡面的所有資源
        if (null != m_Selector) {
            try {
                m_Selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private void handleInput(SelectionKey vKey) throws IOException {
        if (vKey.isValid()) {
            SocketChannel socketChannel = (SocketChannel) vKey.channel();
            //如果是連線事件
            if (vKey.isConnectable()) {
                if (socketChannel.finishConnect())
                    System.out.println("客戶端連線上伺服器端");
                else System.exit(1);
            }
            //讀訊息
            if (vKey.isReadable()) {
                //建立ByteBuffer,並開闢一個1M的緩衝區
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //讀取請求碼流,返回讀取到的位元組數
                int readBytes = socketChannel.read(buffer);
                //讀取到位元組,對位元組進行編解碼
                if (0 < readBytes) {
                    //將緩衝區當前的limit設定為position=0,用於後續對緩衝區的讀取操作
                    buffer.flip();
                    //根據緩衝區可讀位元組數建立位元組陣列
                    byte[] bytes = new byte[buffer.remaining()];
                    //將緩衝區可讀位元組陣列複製到新建的陣列中
                    buffer.get(bytes);
                    String result = new String(bytes, "UTF-8");
                    System.out.println("客戶端收到訊息:" + result);
                }
                //鏈路已經關閉,釋放資源
                else if (0 > readBytes) {
                    vKey.cancel();
                    socketChannel.close();
                }
            }

        }

    }

    private void doWrite(SocketChannel channel, String request) throws IOException {
        //將訊息編碼為位元組陣列
        byte[] bytes = request.getBytes();
        //根據陣列容量建立ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //將位元組陣列複製到緩衝區
        writeBuffer.put(bytes);
        //flip操作
        writeBuffer.flip();
        //傳送緩衝區的位元組陣列
        channel.write(writeBuffer);
    }

    private void doConnect() throws IOException {
        //客戶端請求連線上伺服器,並註冊連線事件
        m_SocketChannel.connect(new InetSocketAddress(m_Host, m_Port));
        m_SocketChannel.register(m_Selector, SelectionKey.OP_CONNECT);

    }

    void sendMsg(String msg) throws Exception {
        //傳送了資料的話就註冊讀事件
        m_SocketChannel.register(m_Selector, SelectionKey.OP_READ);
        doWrite(m_SocketChannel, msg);
    }
}