1. 程式人生 > >NIO學習--Reactor模型

NIO學習--Reactor模型

通過之前的Unix的IO模型介紹,想必也瞭解到了5種IO模型。java的NIO是屬於同步非阻塞IO,關於IO多路複用,java沒有相應的IO模型,但有相應的程式設計模式,Reactor 就是基於NIO中實現多路複用的一種模式。本文將從以下幾點闡述Reactor模式:

  1. reactor 是什麼

  2. 為何要用,能解決什麼問題

  3. 如何用,更好的方式

  4. 其他事件處理模式

一、Reactor 是什麼

關於reactor 是什麼,我們先從wiki上看下:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

從上述文字中我們可以看出以下關鍵點 :

  1. 事件驅動(event handling)

  2. 可以處理一個或多個輸入源(one or more inputs)

  3. 通過Service Handler同步的將輸入事件(Event)採用多路複用分發給相應的Request Handler(多個)處理

自POSA2 中的關於Reactor Pattern 介紹中,我們瞭解了Reactor 的處理方式:

  1. 同步的等待多個事件源到達(採用select()實現)

  2. 將事件多路分解以及分配相應的事件服務進行處理,這個分派採用server集中處理(dispatch)

  3. 分解的事件以及對應的事件服務應用從分派服務中分離出去(handler)

關於Reactor Pattern 的OMT 類圖設計:

二、為何要用Reactor

常見的網路服務中,如果每一個客戶端都維持一個與登陸伺服器的連線。那麼伺服器將維護多個和客戶端的連線以出來和客戶端的contnect 、read、write ,特別是對於長連結的服務,有多少個c端,就需要在s端維護同等的IO連線。這對伺服器來說是一個很大的開銷。

1、BIO

比如我們採用BIO的方式來維護和客戶端的連線:

// 主執行緒維護連線
  public void run() {
      try {
          while (true) {
              Socket socket = serverSocket.accept();
              //提交執行緒池處理
              executorService.submit(new Handler(socket));           }       } catch (Exception e) {           e.printStackTrace();       }   } ​   // 處理讀寫服務   class Handler implements Runnable {       public void run() {           try {               //獲取Socket的輸入流,接收資料               BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));               String readData = buf.readLine();               while (readData != null) {                   readData = buf.readLine();                   System.out.println(readData);               }           } catch (Exception e) {               e.printStackTrace();           }       }   }

很明顯,為了避免資源耗盡,我們採用執行緒池的方式來處理讀寫服務。但是這麼做依然有很明顯的弊端:

  1. 同步阻塞IO,讀寫阻塞,執行緒等待時間過長

  2. 在制定執行緒策略的時候,只能根據CPU的數目來限定可用執行緒資源,不能根據連線併發數目來制定,也就是連線有限制。否則很難保證對客戶端請求的高效和公平。

  3. 多執行緒之間的上下文切換,造成執行緒使用效率並不高,並且不易擴充套件

  4. 狀態資料以及其他需要保持一致的資料,需要採用併發同步控制

2、NIO

那麼可以有其他方式來更好的處理麼,我們可以採用NIO來處理,NIO中支援的基本機制:

  1. 非阻塞的IO讀寫

  2. 基於IO事件進行分發任務,同時支援對多個fd的監聽

我們看下NIO 中實現相關方式:

public NIOServer(int port) throws Exception {
      selector = Selector.open();
      serverSocket = ServerSocketChannel.open();
      serverSocket.socket().bind(new InetSocketAddress(port));
      serverSocket.configureBlocking(false);
      serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  }
​
  @Override
  public void run() {
      while (!Thread.interrupted()) {
          try {
              //阻塞等待事件
              selector.select();
              // 事件列表
              Set selected = selector.selectedKeys();
              Iterator it = selected.iterator();
              while (it.hasNext()) {
                  it.remove();
                  //分發事件
                  dispatch((SelectionKey) (it.next()));
              }
          } catch (Exception e) {
​
          }
      }
  }
​
  private void dispatch(SelectionKey key) throws Exception {
      if (key.isAcceptable()) {
          register(key);//新連結建立,註冊
      } else if (key.isReadable()) {
          read(key);//讀事件處理
      } else if (key.isWritable()) {
          wirete(key);//寫事件處理
      }
  }
​
  private void register(SelectionKey key) throws Exception {
      ServerSocketChannel server = (ServerSocketChannel) key
              .channel();
      // 獲得和客戶端連線的通道
      SocketChannel channel = server.accept();
      channel.configureBlocking(false);
      //客戶端通道註冊到selector 上
      channel.register(this.selector, SelectionKey.OP_READ);
  }

我們可以看到上述的NIO例子已經差不多擁有reactor的影子了

  1. 基於事件驅動-> selector(支援對多個socketChannel的監聽)

  2. 統一的事件分派中心-> dispatch

  3. 事件處理服務-> read & write

事實上NIO已經解決了上述BIO暴露的1&2問題了,伺服器的併發客戶端有了量的提升,不再受限於一個客戶端一個執行緒來處理,而是一個執行緒可以維護多個客戶端(selector 支援對多個socketChannel 監聽)。

但這依然不是一個完善的Reactor Pattern ,首先Reactor 是一種設計模式,好的模式應該是支援更好的擴充套件性,顯然以上的並不支援,另外好的Reactor Pattern 必須有以下特點:

  1. 更少的資源利用,通常不需要一個客戶端一個執行緒

  2. 更少的開銷,更少的上下文切換以及locking

  3. 能夠跟蹤伺服器狀態

  4. 能夠管理handler 對event的繫結

那麼好的Reactor Pattern應該是怎樣的?

三、Reactor

在應用Java NIO構建Reactor Pattern中,大神 Doug Lea(讓人無限景仰的java 大神)在“Scalable IO in Java”中給了很好的闡述。我們採用大神介紹的3種Reactor 來分別介紹。

首先我們基於Reactor Pattern 處理模式中,定義以下三種角色:

  • Reactor 將I/O事件分派給對應的Handler

  • Acceptor 處理客戶端新連線,並分派請求到處理器鏈中

  • Handlers 執行非阻塞讀/寫 任務

1、單Reactor單執行緒模型

我們看程式碼的實現方式:

/**
    * 等待事件到來,分發事件處理
    */
  class Reactor implements Runnable {
​
      private Reactor() throws Exception {
​
          SelectionKey sk =
                  serverSocket.register(selector,
                          SelectionKey.OP_ACCEPT);
          // attach Acceptor 處理新連線
          sk.attach(new Acceptor());
      }
​
      public void run() {
          try {
              while (!Thread.interrupted()) {
                  selector.select();
                  Set selected = selector.selectedKeys();
                  Iterator it = selected.iterator();
                  while (it.hasNext()) {
                      it.remove();
                      //分發事件處理
                      dispatch((SelectionKey) (it.next()));
                  }
              }
          } catch (IOException ex) {
              //do something
          }
      }
​
      void dispatch(SelectionKey k) {
          // 若是連線事件獲取是acceptor
          // 若是IO讀寫事件獲取是handler
          Runnable runnable = (Runnable) (k.attachment());
          if (runnable != null) {
              runnable.run();
          }
      }
​
  }
  /**
    * 連線事件就緒,處理連線事件
    */
  class Acceptor implements Runnable {
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 註冊讀寫
                  new Handler(c, selector);
              }
          } catch (Exception e) {
​
          }
      }
  }
  /**
    * 處理讀寫業務邏輯
    */
  class Handler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      public Handler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          process();
          //下一步處理寫事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          process();
          //下一步處理讀事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 業務處理
        */
      public void process() {
          //do something
      }
  }

這是最基本的單Reactor單執行緒模型。其中Reactor執行緒,負責多路分離套接字,有新連線到來觸發connect 事件之後,交由Acceptor進行處理,有IO讀寫事件之後交給hanlder 處理。

Acceptor主要任務就是構建handler ,在獲取到和client相關的SocketChannel之後 ,繫結到相應的hanlder上,對應的SocketChannel有讀寫事件之後,基於racotor 分發,hanlder就可以處理了(所有的IO事件都繫結到selector上,有Reactor分發)。

該模型 適用於處理器鏈中業務處理元件能快速完成的場景。不過,這種單執行緒模型不能充分利用多核資源,所以實際使用的不多。

2、單Reactor多執行緒模型

相對於第一種單執行緒的模式來說,在處理業務邏輯,也就是獲取到IO的讀寫事件之後,交由執行緒池來處理,這樣可以減小主reactor的效能開銷,從而更專注的做事件分發工作了,從而提升整個應用的吞吐。

我們看下實現方式:

/**
    * 多執行緒處理讀寫業務邏輯
    */
  class MultiThreadHandler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      //多執行緒處理業務邏輯
      ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
​
​
      public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          //任務非同步處理
          executorService.submit(() -> process());
​
          //下一步處理寫事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          //任務非同步處理
          executorService.submit(() -> process());
​
          //下一步處理讀事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 業務處理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }

3、多Reactor多執行緒模型

第三種模型比起第二種模型,是將Reactor分成兩部分,

  1. mainReactor負責監聽server socket,用來處理新連線的建立,將建立的socketChannel指定註冊給subReactor。

  2. subReactor維護自己的selector, 基於mainReactor 註冊的socketChannel多路分離IO讀寫事件,讀寫網 絡資料,對業務處理的功能,另其扔給worker執行緒池來完成。

我們看下實現方式:

/**
    * 多work 連線事件Acceptor,處理連線事件
    */
  class MultiWorkThreadAcceptor implements Runnable {
​
      // cpu執行緒數相同多work執行緒
      int workCount =Runtime.getRuntime().availableProcessors();
      SubReactor[] workThreadHandlers = new SubReactor[workCount];
      volatile int nextHandler = 0;
​
      public MultiWorkThreadAcceptor() {
          this.init();
      }
​
      public void init() {
          nextHandler = 0;
          for (int i = 0; i < workThreadHandlers.length; i++) {
              try {
                  workThreadHandlers[i] = new SubReactor();
              } catch (Exception e) {
              }
​
          }
      }
​
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 註冊讀寫
                  synchronized (c) {
                      // 順序獲取SubReactor,然後註冊channel 
                      SubReactor work = workThreadHandlers[nextHandler];
                      work.registerChannel(c);
                      nextHandler++;
                      if (nextHandler >= workThreadHandlers.length) {
                          nextHandler = 0;
                      }
                  }
              }
          } catch (Exception e) {
          }
      }
  }
  /**
    * 多work執行緒處理讀寫業務邏輯
    */
  class SubReactor implements Runnable {
      final Selector mySelector;
​
      //多執行緒處理業務邏輯
      int workCount =Runtime.getRuntime().availableProcessors();
      ExecutorService executorService = Executors.newFixedThreadPool(workCount);
​
​
      public SubReactor() throws Exception {
          // 每個SubReactor 一個selector 
          this.mySelector = SelectorProvider.provider().openSelector();
      }
​
      /**
        * 註冊chanel
        *
        * @param sc
        * @throws Exception
        */
      public void registerChannel(SocketChannel sc) throws Exception {
          sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
      }
​
      @Override
      public void run() {
          while (true) {
              try {
              //每個SubReactor 自己做事件分派處理讀寫事件
                  selector.select();
                  Set<SelectionKey> keys = selector.selectedKeys();
                  Iterator<SelectionKey> iterator = keys.iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      iterator.remove();
                      if (key.isReadable()) {
                          read();
                      } else if (key.isWritable()) {
                          write();
                      }
                  }
​
              } catch (Exception e) {
​
              }
          }
      }
​
      private void read() {
          //任務非同步處理
          executorService.submit(() -> process());
      }
​
      private void write() {
          //任務非同步處理
          executorService.submit(() -> process());
      }
​
      /**
        * task 業務處理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }
​

第三種模型中,我們可以看到,mainReactor 主要是用來處理網路IO 連線建立操作,通常一個執行緒就可以處理,而subReactor主要做和建立起來的socket做資料互動和事件業務處理操作,它的個數上一般是和CPU個數等同,每個subReactor一個縣城來處理。

此種模型中,每個模組的工作更加專一,耦合度更低,效能和穩定性也大量的提升,支援的可併發客戶端數量可達到上百萬級別。

關於此種模型的應用,目前有很多優秀的礦建已經在應用了,比如mina 和netty 等。上述中去掉執行緒池的第三種形式的變種,也 是Netty NIO的預設模式。下一節我們將著重講解netty的架構模式。

四、事件處理模式

在 Douglas Schmidt 的大作《POSA2》中有關於事件處理模式的介紹,其中有四種事件處理模式:

  1. Reactor  

  2. Proactor  

  3. Asynchronous Completion Token  

  4. Acceptor-Connector  

1.Proactor

本文介紹的Reactor就是其中一種,而Proactor的整體結構和reacotor的處理方式大同小異,不同的是Proactor採用的是非同步非阻塞IO的方式實現,對資料的讀寫由非同步處理,無需使用者執行緒來處理,服務程式更專注於業務事件的處理,而非IO阻塞。

2.Asynchronous Completion Token

簡單來說,ACT就是應對應用程式非同步呼叫服務操作,並處理相應的服務完成事件。從token這個字面意思,我們大概就能瞭解到,它是一種狀態的保持和傳遞。

比如,通常應用程式會有呼叫第三方服務的需求,一般是業務執行緒請求都到,需要第三方資源的時候,去同步的發起第三方請求,而為了提升應用效能,需要非同步的方式發起請求,但非同步請求的話,等資料到達之後,此時的我方應用程式的語境以及上下文資訊已經發生了變化,你沒辦法去處理。

ACT 解決的就是這個問題,採用了一個token的方式記錄非同步傳送前的資訊,傳送給接受方,接受方回覆的時候再帶上這個token,此時就能恢復業務的呼叫場景。

上圖中我們可以看到在client processing 這個階段,客戶端是可以繼續處理其他業務邏輯的,不是阻塞狀態。service 返回期間會帶上token資訊。  

3.Acceptor-Connector

Acceptor-Connector是於Reactor的結合,也可以看成是一種變種,它看起來很像上面介紹的Reactor第三種實現方式,但又有本質的不同。

Acceptor-Connector模式是將網路中對等服務的連線和初始化分開處理,使系統中的連線建立及服務一旦服務初始化後就分開解除耦合。聯結器主動地建立到遠地接受器元件的連線,並初始化服務處理器來處理在連線上交換的資料。同樣地,接受器被動地等待來自遠地聯結器的連線請求,在這樣的請求到達時建立連線,並初始化服務處理器來處理在連線上交換的資料。隨後已初始化的服務處理器執行應用特有的處理,並通過聯結器和接受器元件建立的連線來進行通訊。

這麼處理的好處是:

  1. 一般而言,用於連線建立和服務初始化的策略變動的頻度要遠小於應用服務實現和通訊協議。

  2. 容易增加新型別的服務、新的服務實現和新的通訊協議,而又不影響現有的連線建立和服務初始化軟體。比如採用IPX/SPX通訊協議或者TCP協議。

  3. 連線角色和通訊角色的去耦合,連線角色只管發起連線 vs. 接受連線。通訊角色只管資料互動。

  4. 將程式設計師與低階網路程式設計API(像socket或TLI)型別安全性的缺乏遮蔽開來。業務開發關係底層通訊

 轉載:https://my.oschina.net/u/1859679/blog/1844109

引用: