Reactor server 伺服器模式的初步瞭解
本文針對Reactor模式從四個方面進行了闡述,首先簡單介紹了Reactor模式是什麼;其次,闡述了為什麼使用Reactor模式;再次,針對實際生活的應用場景,分析了在什麼場景下使用Reactor模式;最後,著重分析講解了如何使用Reactor模式,以及程式碼示例。
1、What:Reactor模式是什麼?
反應器設計模式(Reactor pattern)是一種為處理併發服務請求,並將請求提交到一個或者多個服務處理程式的事件設計模式。當客戶端請求抵達後,服務處理程式使用多路分配策略,由一個非阻塞的執行緒來接收所有的請求,然後派發這些請求至相關的工作執行緒進行處理。 Reactor模式主要包含下面幾部分內容。
初始事件分發器(Initialization Dispatcher):用於管理Event Handler,定義註冊、移除EventHandler等。它還作為Reactor模式的入口呼叫Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,當阻塞等待返回時,根據事件發生的Handle將其分發給對應的Event Handler處理,即回撥EventHandler中的handle_event()方法
同步(多路)事件分離器(Synchronous Event Demultiplexer):無限迴圈等待新事件的到來,一旦發現有新的事件到來,就會通知初始事件分發器去調取特定的事件處理器。
系統處理程式(Handles):作業系統中的控制代碼,是對資源在作業系統層面上的一種抽象,它可以是開啟的檔案、一個連線(Socket)、Timer等。由於Reactor模式一般使用在網路程式設計中,因而這裡一般指Socket Handle,即一個網路連線(Connection,在Java NIO中的Channel)。這個Channel註冊到Synchronous Event Demultiplexer中,以監聽Handle中發生的事件,對ServerSocketChannnel可以是CONNECT事件,對SocketChannel可以是READ、WRITE、CLOSE事件等。
事件處理器(Event Handler): 定義事件處理方法,以供Initialization Dispatcher回撥使用。
對於Reactor模式,可以將其看做由兩部分組成,一部分是由Boss組成,另一部分是由worker組成。Boss就像老闆一樣,主要是拉活兒、談專案,一旦Boss接到活兒了,就下發給下面的work去處理。也可以看做是專案經理和程式設計師之間的關係。
2、Why:為什麼使用Reactor模式?
Part A:
對於一個事件驅動的分散式日誌登入服務系統,如下圖1所示。
客戶端應用通過日誌服務來錄入它們當前狀態和記錄,這些狀態可記錄可能包含了錯誤通知資訊、斷點除錯資訊等。日誌記錄被髮送到一箇中央日伺服器上,該伺服器可以處理日誌和連線使用者請求。客戶端想要記錄日誌資訊,首先必須傳送一個連線請求給伺服器。伺服器通過一個“處理工廠”來監聽客戶端對應的地址資訊,以等待這些連線請求的到來。當一個連線請求到來時,“處理工廠”就建立一個handle,其代表了連線的端點,用來建立客戶端和伺服器之間的連線。當handle收到來自客戶端的請求連線時,就會返回給伺服器。一旦客戶端連線成功,它們就可以同時傳送日誌記錄到伺服器。
Part B:
或許最有效的方法來開發一個併發日誌系統是使用多執行緒,這樣可以同時處多個理客戶端請求,如下圖2所示。
然而,多執行緒實現這樣的分散式日誌系統可能會面臨下面的問題:
可用性:伺服器必須能夠處理傳入請求即使是等待其他請求到達的。特別是,一個伺服器不能無限期地處理任何單一來源的事件而排斥其他事件源。因為這可能大大延遲響應其他客戶的時間。
效率:一個伺服器應該做到延遲最小化、吞吐量最大化,避免不必要地使用CPU。多執行緒可能會導致糟糕的效能由於上下文切換、同步和資料移動。
程式設計簡潔:伺服器的設計上應該簡化使用合適的併發策略。多執行緒可能需要複雜的併發控制方案。
可移植性:多執行緒不是可用在所有作業系統平臺。
適應性:整合新的或改進服務,如改變訊息格式或新增伺服器端快取,應該承擔最小的現有程式碼的修改和維護成本。例如,實現新應用程式服務應該不需要修改通用事件多路分解和排程機制。
Part C:
針對上面的問題,可以整合同步多路分解事件並分發相應的事件處理程式來處理相應的事件。對於每一個應用程式所提供的服務,引入一個單獨的事件處理器處理某些型別的事件。所有事件處理程式實現了相同的介面。事件處理程式註冊一個初始排程程式,它使用一個同步事件訊號分離器等待事件發生。當事件發生時,同步事件訊號分離器通知初始排程器,它同步告知事件處理程式去關聯對應的事件。事件處理程式然後分派事件到實現了所請求服務的方法中。
上述日誌系統的Reactor模式類圖如下所示:
客戶端連線到日誌伺服器所經過的一系列步驟如下圖所示:
日誌伺服器記錄日誌所經過的一系列步驟如下圖所示:
3、Where:什麼場景下使用Reactor模式?
對於高併發系統,常會使用Reactor模式,其代替了常用的多執行緒處理方式,節省系統的資源,提高系統的吞吐量。下面用比較直觀的形式來介紹這種模式的使用場景。
以餐廳為例,每一個人就餐就是一個事件,顧客會先看下選單,然後點餐,處理這些就餐事件需要服務人員。就像一個網路服務會有很多的請求,伺服器會收到每個請求,然後指派工作執行緒去處理一樣。
在多執行緒處理方式下:
一個人來就餐,一個服務員去服務,然後客人會看選單,點菜。 服務員將選單給後廚。
二個人來就餐,二個服務員去服務……
五個人來就餐,五個服務員去服務……
這類似多執行緒的處理方式,一個事件到來,就會有一個執行緒為其服務。很顯然這種方式在人少的情況下會有很好的使用者體驗,每個客人都感覺自己享有了最好的服務。如果這家餐廳一直這樣同一時間最多來5個客人,這家餐廳是可以很好的服務下去的。
由於這家店的服務好,吃飯的人多了起來。同一時間會來10個客人,老闆很開心,但是隻有5個服務員,這樣就不能一對一服務了,有些客人就不能馬上享有服務員為其服務了。老闆為了掙錢,不得不又請了5個服務員。現在又好了,每位顧客都享受最好最快的待遇了。
越來越多的人對這家餐廳滿意,客源又多了,同時來吃飯的人到了20人,老闆高興但又高興不起來了,再請服務員吧,佔地方不說,還要開工錢,再請人就掙不到到錢了。
怎麼辦呢?老闆想了想,10個服務員對付20個客人也是能對付過來的,服務員勤快點就好了,伺候完一個客人馬上伺候另外一個,還是來得及的。綜合考慮了一下,老闆決定就使用10個服務人員的執行緒池!
但是這樣又有一個比較嚴重的缺點:如果正在接受服務員服務的客人點菜很慢,其他的客人可能就要等好長時間了。有些脾氣火爆的客人可能就等不了走人了。
這樣,我麼那就引入了Reactor模式,那麼,Reactor模式是如何處理這個問題呢?
老闆後來發現,客人點菜比較慢,大部服務員都在等著客人點菜,其實幹的活不是太多。老闆之所以能當老闆當然有點不一樣的地方,終於發現了一個新的方法,那就是:當客人點菜的時候,服務員就可以去招呼其他客人了,等客人點好了菜,直接招呼一聲“服務員”,馬上就有個服務員過去服務。在用了這個新方法後,老闆進行了一次裁員,只留了一個服務員!這就是用單個執行緒來做多執行緒的事。實際的餐館都是用的Reactor模式在服務。
4、How:如何使用Reactor模式?
在網路服務和分散式物件中,對於網路中的某一個請求處理,我們比較關注的內容大致為:讀取請求( Read request)、 解碼請求(Decode request)、處理服務(Process service)、 編碼答覆(Encode reply)、 傳送答覆(Send reply)。但是每一步對系統的開銷和效率又不盡相同
A、Classic Service Design
對於傳統的服務設計,每一個到來的請求,系統都會分配一個執行緒去處理,這樣看似合乎情理,但是,當系統請求量瞬間暴增時,會直接把系統拖垮。因為在高併發情況下,系統建立的執行緒數量是有限的。傳統系統設計如下圖所示:
傳統的服務程式碼實現如下所示:
class Server implements Runnable {
public void run() {
try {
//建立服務端連線
ServerSocket ss = new ServerSocket(PORT);
//不停建立執行緒處理新的請求
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) {
/* ... */ }
}
//處理請求的handler
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) {
/* ... */ }
}
private byte[] process(byte[] cmd) {
/* ... */ }
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
顯然,傳統的一對一的執行緒處理無法滿足新需求的變化。對此,考慮到了執行緒池的使用,這樣就使得執行緒可以被複用,大大降低建立執行緒和銷燬執行緒的時間。然而,執行緒池並不能很好滿足高併發執行緒的需求,當海量請求到來時,執行緒池中的工作執行緒達到飽和狀態,這時可能就導致請求被拋棄,無法完成客戶端的請求。對此,考慮到將一次完整的請求切分成幾個小的任務,每一個小任務都是非阻塞的;對於讀寫操作,使用NIO對其進行讀寫;不同的任務將被分配到與想關聯的處理器上進行處理,每個處理器都是通過非同步回撥機制實現。這樣就大大提供系統吞吐量,減少響應時間。這就是下面將要介紹的Reactor模式。
B、Basic Reactor Design
單執行緒版的Reactor模式如下圖所示。對於客戶端的所以請求,都又一個專門的執行緒去進行處理,這個執行緒無線迴圈去監聽是否又客戶的請求到來,一旦收到客戶端的請求,就將其分發給響應的處理器進行處理。
Reactor 1: Setup
在Reactor模式中,我們需要進行一些基本設定,首先需要建立一個Selector和一個ServerSocketChannel ,將監聽的埠繫結到Channel中,還需要設定Channel為非阻塞,並在Selector上註冊自己感興趣的時事件,可以是連線事件,也可以是讀寫事件。程式碼如下所示:
//定義reactor,其中包括Selector和ServerSocketChannel
//將ServerSocketChannel和事件型別繫結到Seletor上,設定 serverSocket為非阻塞
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
/*
* Alternatively, use explicit SPI provider: SelectorProvider p =
* SelectorProvider.provider(); selector = p.openSelector();
* serverSocket = p.openServerSocketChannel();
*/
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
下面這段程式碼可以看做是boss執行緒,它負責接收請求並安排給對應的handle處理。可以看出,只要當前執行緒不中斷就會一直監聽,其中selector.select()是阻塞的,一旦又請求到來時,就會從selector中獲取到對應的SelectionKey ,然後將其下發給後續處理程式(工作執行緒)進行處理。
// class Reactor continued
//無限迴圈等待網路請求的到來
//其中selector.select();會阻塞直到有繫結到selector的請求型別對應的請求到來,一旦收到事件,處理分發到對應的handler,並將這個事件移除
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) {
/* ... */ }
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
Acceptor也是一個執行緒,在其run方法中,通過判斷serverSocket.accept()方法來獲取SocketChannel,只要SocketChannel 不為空,則建立一個handler進行相應處理。
// class Reactor continued
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) {
/* ... */ }
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
Reactor 4: Handler setup
從下方程式碼可看出,一個handler就是一個執行緒,其中的SocketChannel 被設定成非阻塞。預設在Selector上註冊了讀事件並繫結到SocketChannel 上。
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
/* ... */ }
boolean outputIsComplete() {
/* ... */ }
void process() {
/* ... */ }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
針對不同的請求事件進行處理,程式碼實現如下所示:
// class Handler continued
//具體的請求處理,可能是讀事件、寫事件等
public void run() {
try {
if (state == READING)
read();
else if (state == SENDING)
send();
} catch (IOException ex) {
/* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
C、Worker Thread Pools for Reactor
考慮到工作執行緒的複用,將工作執行緒設計為執行緒池。工作執行緒使用執行緒池實現如下圖所示。
在handler中使用執行緒池來處理任務。程式碼實現如下所示:
//這裡將具體的業務處理執行緒設定執行緒池,提供執行緒複用
class Handler implements Runnable {
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
//從執行緒池中指派執行緒去完成工作
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() {
processAndHandOff();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
5、總結
本文簡單闡述了Reactor模型,從Reactor模式是什麼到Reactor模式的使用,以及使用Reactor模式的好處。我想如果你瞭解Netty或者vertx,你一定對Reactor模式有所瞭解。其中,Netty就是基於Reactor模式搭建的,其是一個非同步事件驅動的網路應用框架,感興趣的同學可以去看看。希望本文對你有所幫助。