Netty(七):EventLoop學習前導——Reactor模式
瞭解Netty的人多少都會知道Netty的高效能的一個原因就是它是基於事件驅動的,而這一事件的原型就是Reactor模式。
所以在學習EventLoop前,很有必要先搞懂Reactor模式。
本文目錄:
- 傳統的伺服器設計
- Basic Reactor(單執行緒模式)
- MultiThreadReactor(多執行緒模式)
- 主從多執行緒模型
傳統的伺服器設計模式:
先來簡單的介紹下傳統的伺服器設計模式。
看從圖例瞭解:
傳統的伺服器設計模式是基於IO實現的。伺服器在等待連線,及IO準備就緒前都會被阻塞。
程式碼示例如下:
class Server implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); } catch (IOException ex) { /* ... */ } }static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); }catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } } }
傳統的伺服器模式的優勢在於實現簡便,相對於NIO的伺服器,它的程式碼量更少,更直接。但它最大的缺點就是IO阻塞導致執行效率低下。
Reactor模式:
Reactor模式是利用NIO的多路複用而設計的一種基於事件驅動的伺服器模式。主要的設計目的是通過分而治之的思想讓伺服器實現可擴容的目標。
Basic Reactor(單執行緒版本):
Basic Reactor是Reactor模式最基礎的版本,可以說是定義了整個Reactor模式的大骨架,其他複雜的版本也是在此基礎上演變而來。
深入瞭解Basic Reactor是掌握Reactor模式的基本,因此我們會用最多的內容去理解Basic Reactor。
無論是Reactor模式的哪些變化,基本上都離不開下列三種角色:
Reactor(反應堆):伺服器啟動的主入口
Acceptor(接收器):主要負責處理IO連線事件
Handler(處理器):負責處理IO讀寫以及業務邏輯處理等
先結合圖例來了解下Reactor:
圖中已經明顯畫出了Reactor和Acceptor的角色,而未畫出的Handler部分就是黃色圓圈的部分(read,decode, compute, encode, send 構成了一個Handler的基本職能)
在通過程式碼來分析下:
1 package com.insaneXs.netty.reactor.basic; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12 13 /** 14 * @Author: insaneXs 15 * @Description: 16 * @Date: Create at 2018-12-19 17 */ 18 public class Reactor implements Runnable{ 19 20 final Selector selector; 21 22 final ServerSocketChannel serverSocket; 23 24 Reactor(int port) throws Exception{ 25 26 //建立ServerSocketChannel,繫結埠,設定為非阻塞,選擇器上註冊ACCEPT事件 27 selector = Selector.open(); 28 serverSocket = ServerSocketChannel.open(); 29 30 serverSocket.bind(new InetSocketAddress(port)); 31 serverSocket.configureBlocking(false); 32 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); 33 34 sk.attach(new Acceptor()); 35 } 36 37 @Override 38 public void run() { 39 try { 40 while (!Thread.interrupted()) { 41 //阻塞,直到註冊的事件發生 42 selector.select(); 43 Set selected = selector.selectedKeys(); 44 Iterator it = selected.iterator(); 45 while (it.hasNext()){ 46 //任務派發 47 dispatch((SelectionKey)(it.next())); 48 } 49 selected.clear(); 50 } 51 } catch (IOException ex) { 52 ex.printStackTrace(); 53 } 54 55 } 56 57 void dispatch(SelectionKey k) { 58 //通過將不同的附件繫結到SelectionKey上,實現dispatch統一派發Acceptor和Handler的邏輯 59 Runnable r = (Runnable)(k.attachment()); 60 if (r != null) 61 r.run(); 62 } 63 64 class Acceptor implements Runnable{ 65 @Override 66 public void run() { 67 try { 68 //ACCEPT負責接收連結 69 SocketChannel sc = serverSocket.accept(); 70 if(sc != null) 71 new Handler(selector, sc); 72 } catch (IOException e) { 73 e.printStackTrace(); 74 } 75 } 76 } 77 78 class Handler implements Runnable{ 79 final SocketChannel socket; 80 81 final SelectionKey sk; 82 83 ByteBuffer input = ByteBuffer.allocate(1024); 84 ByteBuffer output = ByteBuffer.allocate(1024); 85 86 static final int READING = 0, SENDING = 1; 87 int state = READING; 88 89 Handler(Selector sel, SocketChannel c) throws IOException{ 90 socket = c; 91 c.configureBlocking(false); 92 // Optionally try first read now 93 //返回了新的SelectionKey,將Handler新增為SelectionKey的附件,先註冊READ事件 94 sk = socket.register(sel, 0); 95 sk.attach(this); 96 sk.interestOps(SelectionKey.OP_READ); 97 sel.wakeup(); 98 } 99 100 boolean inputIsComplete() { 101 return true; 102 } 103 boolean outputIsComplete() { 104 return true; 105 } 106 void process() { 107 //DO SOME THING 108 } 109 110 @Override 111 public void run() { 112 try { 113 if (state == READING) read(); 114 else if (state == SENDING) send(); 115 } catch (IOException ex) { 116 ex.printStackTrace(); 117 } 118 } 119 120 void read() throws IOException { 121 socket.read(input); 122 if (inputIsComplete()) { 123 process(); 124 state = SENDING; 125 // Normally also do first write now 126 sk.interestOps(SelectionKey.OP_WRITE); 127 } 128 } 129 130 void send() throws IOException { 131 socket.write(output); 132 if (outputIsComplete()) sk.cancel(); 133 } 134 135 } 136 }
瞭解完Reactor中的角色分工,再看程式碼其實並不複雜。程式碼關鍵的部分也都加上了註釋。
每個角色的業務處理邏輯都是以run方法為入口,
Reactor中run方法處理的主要邏輯就是監聽NIO的多路複用,並通過dispatch方法分發任務。
Acceptor中run方法處理的主要邏輯就是接收連線,併為處理讀寫做準備。
Handler中run方法處理的主要邏輯就是讀寫和業務邏輯的處理。
有幾點值得注意的:
第一,這段程式碼最關鍵的地方就是在Reactor進行任務分發時,利用SelectionKey的Attach新增附件的方法實現了用同一入口分發給Acceptor和Handler(這是設計的比較巧妙的部分)。
第二,無論是哪個角色都實現了Runnable,這也保證了即使是其他多執行緒版本,只需要修改部分程式碼,而不用動整個Reactor模式的骨架。
第三,我們可以看到上面的程式碼都是直接呼叫run方法,而不是通過Thread.start方法來執行,說明Basic Reactor的處理過程確實是單執行緒下的。
另外提到一點就是Handler的建構函式中先是register的0,然後再設定SelectionKey的interestOps為OP_READ。這點在之前的Netty原始碼分析中,我們也瞭解到,Netty正是這樣的過程。
將程式碼轉換成時序圖,加深對程式碼的印象:
Basic Reactor優點與不足:
優點:利用了NIO的特性,可以僅用一條執行緒處理多個通道的連線處理。相較於傳統的伺服器模式,這樣對資源的消耗更少。
不足:我們可以看到不僅IO的部分由Reactor的執行緒處理,連業務處理的邏輯同樣是放在Reactor的執行緒中處理,這樣可能就會導致Reactor執行緒積累越來越多的請求,導致效率下降。
MultiThreads版本的Reactor模型,正是為了解決上述的問題。
同樣先通過圖例來了解這個模式下,各個角色的關係:
這個圖和Basic Reactor的區別是什麼?我們又該如何理解呢?
我們可以看到之前的Handler處理的角色被一分為二,read,send(也就是IO的讀寫)和Basic Reactor中的模式不變,但是decode,compute,encode(也就是業務處理的邏輯)被拆出來,提交給ThreadPool執行。
新的Reactor模式對比Basic Reactor,其他程式碼不變,只是我們修改了Handler,增加了一個新的角色,叫做Processor,作為負責處理業務邏輯的單元:
1 public class ThreadPooledHandler implements Runnable{ 2 final SocketChannel socket; 3 final SelectionKey sk; 4 ByteBuffer input = ByteBuffer.allocate(1024); 5 ByteBuffer output = ByteBuffer.allocate(1024); 6 static final int READING = 0, SENDING = 1; 7 static final int PROCESSING = 3; 8 int state = READING; 9 10 // uses util.concurrent thread pool 11 static ExecutorService pool = Executors.newFixedThreadPool(4); 12 13 ThreadPooledHandler(Selector sel, SocketChannel c) throws IOException { 14 socket = c; 15 c.configureBlocking(false); 16 // Optionally try first read now 17 //返回了新的SelectionKey,將Handler新增為SelectionKey的附件,先註冊READ事件 18 sk = socket.register(sel, 0); 19 sk.attach(this); 20 sk.interestOps(SelectionKey.OP_READ); 21 sel.wakeup(); 22 } 23 24 boolean inputIsComplete() { 25 return true; 26 } 27 boolean outputIsComplete() { 28 return true; 29 } 30 31 void process() { 32 //DO SOME THING 33 } 34 35 @Override 36 public void run() { 37 try { 38 if (state == READING) read(); 39 else if (state == SENDING) send(); 40 } catch (IOException ex) { 41 ex.printStackTrace(); 42 } 43 } 44 45 synchronized void read() throws IOException { // ... 46 socket.read(input); 47 if (inputIsComplete()) { 48 state = PROCESSING; 49 pool.execute(new Processer()); 50 } 51 } 52 53 void send() throws IOException { 54 socket.write(output); 55 if (outputIsComplete()) sk.cancel(); 56 } 57 synchronized void processAndHandOff() { 58 process(); 59 state = SENDING; // or rebind attachment 60 sk.interestOps(SelectionKey.OP_WRITE); 61 } 62 63 //增加Processer角色,處理業務邏輯 64 class Processer implements Runnable { 65 public void run() { processAndHandOff(); } 66 } 67 68 }
為了方便看出變化,我將兩個版本的程式碼放在一起,做了對比圖:
最大的區別就是原先Handler中process方法被交給了Processer執行,並且在執行時,是提交給執行緒池去執行。而Handler負責的IO讀寫邏輯仍然在Reactor的執行緒中執行(只是非網路IO的業務邏輯部分在新的執行緒中執行)。
相對於BasicReactor,這個版本的Reactor能更好的利用現代多核CPU的效能。讓一條執行緒負責處理IO,而其他執行緒執行業務邏輯。多路複用上監聽的阻塞,並不會阻塞業務邏輯的執行。
主從複合的Reactor模型
多執行緒的Reactor模型處理能力已經非常的高效,但是IO的連線過程仍然可能是個耗時的過程(比如SSL認證)。因此引出了一個新的變化——主從複合的Reactor模型。
先看圖例:
和上一個版本比較,這個版本的Reactor區別主要是將Reactor拆分一個MainReactor(負責處理Accept事件)和多個SubReactor(負責處理IO讀寫事件)。
而MainReactor和SubReactor的關聯只要是通過Acceptor。
我們知道Reactor和Selector的關係是一對一的關係。通常一個Reactor由一條獨立的執行緒執行。該執行緒在Reactor關聯的Selector是監聽事件。
因此這個模式下,當Accept在為連線進來的SocketChannel繫結Selector時,不再是繫結到MainReactor對應的Selector中,而是繫結到其他Reactor對應的Selector上(對應其他執行緒)。
這也因此讓MainReactor只負責執行ACCEPT,而SubReactor負責IO讀寫。也使得ACCEPT上費時的操作將不會影響IO讀寫和業務邏輯處理。
貼上程式碼:
增加SubReactor:
1 public class SubReactor implements Runnable{ 2 private final Selector selector; 3 4 public SubReactor() throws IOException { 5 selector = Selector.open(); 6 } 7 8 @Override 9 public void run() { 10 while(!Thread.interrupted()){ 11 try { 12 selector.select(); 13 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 14 while(iter.hasNext()){ 15 SelectionKey sk = iter.next(); 16 ((Runnable)sk.attachment()).run(); 17 } 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 } 22 } 23 24 public Selector getSelector(){ 25 return selector; 26 } 27 }
SubReactor的程式碼和Basic Reactor中Reactor的程式碼很相似,因為不處理連線部分,所以沒有ServerSocketChannel和繫結監聽埠的操作。
接下來看MainReactor和Acceptor的程式碼:
1 package com.insaneXs.netty.reactor.multiple; 2 3 import com.insaneXs.netty.reactor.threadpooled.ThreadPooledHandler; 4 5 import java.io.IOException; 6 import java.net.InetSocketAddress; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.ServerSocketChannel; 10 import java.nio.channels.SocketChannel; 11 import java.util.Iterator; 12 import java.util.Set; 13 14 /** 15 * @Author: insaneXs 16 * @Description: 17 * @Date: Create at 2018-12-21 18 */ 19 public class MainReactor implements Runnable{ 20 final Selector selector; 21 22 final ServerSocketChannel serverSocket; 23 24 private final static int SUB_REACTOR_COUNT = 3; 25 26 private final Selector[] selectors = new Selector[SUB_REACTOR_COUNT]; 27 28 MainReactor(int port) throws Exception{ 29 30 //建立ServerSocketChannel,繫結埠,設定為非阻塞,選擇器上註冊ACCEPT事件 31 selector = Selector.open(); 32 serverSocket = ServerSocketChannel.open(); 33 34 serverSocket.bind(new InetSocketAddress(port)); 35 serverSocket.configureBlocking(false); 36 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); 37 38 for(int i=0; i<selectors.length; i++){ 39 40 //建立SUB-REACTOR,並儲存對應的Selector物件 41 SubReactor subReactor = new SubReactor(); 42 selectors[i] = subReactor.getSelector(); 43 //為SUB-REACTOR啟動獨立的執行緒 44 new Thread(subReactor).start(); 45 } 46 47 sk.attach(new Acceptor()); 48 } 49 50 @Override 51 public void run() { 52 try { 53 while (!Thread.interrupted()) { 54 //阻塞,直到註冊的事件發生 55 selector.select(); 56 Set selected = selector.selectedKeys(); 57 Iterator it = selected.iterator(); 58 while (it.hasNext()){ 59 //任務派發 60 dispatch((SelectionKey)(it.next())); 61 } 62 selected.clear(); 63 } 64 } catch (IOException ex) { 65 ex.printStackTrace(); 66 } 67 68 } 69 70 void dispatch(SelectionKey k) { 71 //通過將不同的附件繫結到SelectionKey上,實現dispatch統一派發Acceptor和Handler的邏輯 72 Runnable r = (Runnable)(k.attachment()); 73 if (r != null) 74 r.run(); 75 } 76 77 class Acceptor implements Runnable{ 78 private int idx = 0; 79 @Override 80 public void run() { 81 try { 82 //ACCEPT負責接收連結 83 SocketChannel sc = serverSocket.accept(); 84 if(sc != null)//將SocketChannel與SubReactor的Selector均勻繫結 85 new ThreadPooledHandler(selectors[idx], sc); 86 87 idx++; 88 if(idx == SUB_REACTOR_COUNT) 89 idx = 0; 90 } catch (IOException e) { 91 e.printStackTrace(); 92 } 93 } 94 } 95 96 97 98 }
做個對比圖,比較下和之前的版本的差異:
MainReactor:
區別主要在MainReactor內部儲存了一些SubReactor,在MainReactor被建立時,同時建立了幾個SubReactor。並且建立執行緒獨立的執行SubReactor。
再看看Acceptor:
二者Acceptor的區別就是當把Handler提交給執行緒池時,非主從複合結構的版本仍然是用一個Selector。而主從複合結構的Handler在處理時,用的多路複用器是SubReactor中的。因此分離出了ACCEPT和IO讀寫。
本文參考:Scalable IO in Java
本文程式碼:Github