1. 程式人生 > >NIO+reactor模式的網路伺服器設計方案

NIO+reactor模式的網路伺服器設計方案

NIO+reactor 模式的網路伺服器設計方案

1、前言

       在前一篇文章中,介紹了基於 BlockingIO +thread-per-connection 的方案,由於該方案為每一個連線分配一個執行緒,而執行緒裡的大部分操作都是阻塞式的,所以在高併發的情況下,會導致產生大量的執行緒,執行緒間的上下文切換會浪費大量的 CPU 時間,而且每個執行緒是需要佔用堆疊空間的,所以伺服器能分配的執行緒數量也是有限的,當客戶端的併發訪問量達到一定的數量級後,伺服器的資源就會耗盡,可伸縮性差。

根據上面的分析,要提高網路伺服器的可伸縮性,就必須解決兩個問題:

  • 服務端的執行緒數量不能按照客戶端連線數的增大而成線性增長,但又必須能夠併發的響應不斷增加的客戶端請求
  • 執行緒裡的操作邏輯不能是阻塞式的

因此, java1.4 引入了非阻塞式 NIO Non-blocking IO , 解決了問題 2 ;而採用基於非同步事件通知的 reactor 模式則可以僅僅用一個執行緒來併發的為多個連線服務,這樣就解決了問題 1

2、Reactor 模式

2.1 示例

     首先舉一個生活中的例子來比較 thread-per-connection reactor方案

某火車票售票廳,只有 1 個售票視窗工作。兩個乘客 a b 先後來購票,由於 a 先到,所以售票視窗先為 a 服務, b 只能排隊

  • thread-per-connection

    乘客 a與售票視窗開始溝通時,就相當於在客戶端(乘客 a)與服務端(售票廳)之間建立了一個 connection,服務端為每一個 connection分配一個 thread(售票視窗)。當沒有 thread可以分配時,後續的客戶端請求(乘客 b)就不能及時響應了,所以 b只能排隊。假設存在這種場景,售票視窗的服務員告訴乘客 a票價後,乘客 a準備付款時發現自己忘記了帶錢包,所以乘客 a打電話給家裡人讓他們把錢包送過來,但從 a的家步行到售票廳需要 5分鐘,於是售票視窗的服務員就一直等著(被阻塞),但又不為乘客 b服務,因為她的做事風格( thread-per-connection)是一定要為一個乘客完完整整服務完後才能接著服務下一位乘客。

   這種情況下,乘客 b 肯定會抱怨,而且 5 分鐘後, b 的後面也肯定排了很多人,售票廳發現這種情況後,就只能選擇再開啟一個售票視窗(分配一個 thread )為 b 服務,但 b 後面的人也只能排隊。之前那個視窗的服務員一直等著,又不幹活,但工資還是照樣拿,所以售票廳(服務端)的開銷很大。

  • Reactor

    服務員在等待 a 取錢包的過程中,被通知乘客 b 要求服務,所以視窗和 b 建立連線,悲劇的是 b 也沒有帶錢包,需要家裡人送來。此時服務員又被通知 a 的錢包送過來了,所以視窗接著為 a 服務,出票完成後,服務員又被通知 b 的錢包送過來了,所以接著又為 b 服務。這樣,售票廳(服務端)的開銷就小了,現在只需要一個視窗就可以搞定所有事情。

2.2 Reactor 模式的思想:分而治之 + 事件驅動

  • 分而治之:

    一個 connection裡發生的完整的網路處理過程一般分為 accept read decode compute encode send這幾步。 Reactor將每個步驟對映為一個 task,服務端端的執行緒執行的最小邏輯單元不再是一次完整的網路處理過程,而是更小的 task,且採用非阻塞的執行方式;

  • 事件驅動:

   每個 task對應一個特定的事件,當 task準備就緒時,對應的事件通知就會發出。 Reactor收到事件後,分發給綁定了對應事件的 Handler執行 task

下圖描述了單執行緒版本的 reactor 模式結構圖。

關鍵概念:

  • Reactor:負責響應事件,分發給綁定了該事件的 handler執行 task
  • Handler:綁定了某類事件,負責執行該事件對應的 task
  • Acceptor Handler 的一種,綁定了 connect 事件。它在系統啟動的時候就已經綁定了 connnect 事件,並註冊到 reactor 中。當有客戶端發起 connect 請求時, reactor 會收到 accept 事件,然後分發給 acceptor acceptor 首先會 accept 新的連線,然後新建一個 handler ,將其與該連線上得 read 事件繫結,並註冊到 reactor 中。

2.3 基於 reactor 的網路互動

  1. 客戶端連線伺服器過程

1)伺服器將綁定了 accept事件的 Acceptor註冊到 Reactor中,準備 accept新的 connection;

2)伺服器啟動 Reactor的事件迴圈處理功能(注意:該迴圈會阻塞,直到收到事件)

3)客戶端 connect伺服器

4)Reactor響應 accept事件,分發給 Acceptor Acceptor 確定建立一個新的連線。

5)Acceptor建立一個 handler專門服務該 connection後續的請求;

6)Handler繫結該 connection read事件,並將自己註冊到 Reactor

  1. 伺服器處理客戶端請求過程

1)客戶端傳送請求

2)當客戶端的請求資料到達伺服器時, Reactor響應 read事件,分發給綁定了 read事件的 handler(即上面第 6步建立的 handler

3)Handler執行 task,讀取客戶端的請求資料(此處是非阻塞讀,即如果當前操作會導致當前執行緒阻塞,當前操作會立即返回,並重復執行第 2 3步,直到客戶端的請求讀取完畢。)

4)解析客戶端請求資料

5)讀取檔案

6)Handler重新繫結 write事件

7) connection可以開始 write的時候, Reactor響應 write事件,分發給綁定了 write事件的 handler

8)    Handler 執行 task ,向客戶端傳送檔案(此處是非阻塞寫,即如果當前操作會導致當前執行緒阻塞,當前操作會立即返回,並重復執行第 7 8 步,直到檔案全部發送完畢。)

注意:上述兩個過程都是在伺服器的一個執行緒裡完成的,該執行緒響應所有客戶端的請求。譬如服務端在處理客戶端 A 的請求時,如果在第 2 read 事件還沒有就緒(或者在第 3 步讀取資料的時候發生阻塞了),則該執行緒會立即重新回到客戶端連線伺服器過程的第 2 步(即事件迴圈處理),等待新的事件通知。如果此時客戶端 B 請求連線,則該執行緒會響應 B 的連線請求,這樣就實現了一個執行緒同時為多個連線服務的效果。

3、 程式碼示例

3.1 NIO的幾個關鍵概念

  • Selector

Reactor裡的一個核心組成部分,通過呼叫 selector.select()方法,可以知道感興趣的 IO事件裡哪些已經 ready,該方法是阻塞的,直到有 IO事件 ready;通過呼叫 selector.selectedKeys()方法,可以獲取到 selectionKey物件,這些物件關聯有已經 ready IO事件。

  • SelectionKey

selector註冊一個 channel時,會產生一個該物件,譬如SelectionKey selectionKey = channel .register(selector, SelectionKey. OP_ACCEPT );它維護著 channel selector IO 事件、 Handler 之間的關係。通過呼叫 attach 方法,可以繫結一個 handler ,譬如: selectionKey.attach(new Acceptor());

  • ServerSocketChannel

類似於 ServerSocket,唯一的區別在於: ServerSocketChannel可以使用 selector,而且可以設定為非阻塞模式。

  • SocketChannel

類似於 Socket,唯一的區別在於: SocketChannel可以使用 selector,而且可以設定為非阻塞模式。

  • ByteBuffer :資料緩衝器,是 NIO 裡將資料移入移出 channel 的唯一方式

3.2 code

注:所有程式碼只用來作為原理的進一步闡述,不能用於生產環境

服務端程式碼如下(單執行緒版本)

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;


/**
 * @author jason
 * 
 */
public class NioServer implements Runnable {

	private InetAddress hostAddress;
	private int port;

	private ServerSocketChannel serverChannel;

	private Selector selector;

	public NioServer(InetAddress hostAddress, int port) throws IOException {
		this.hostAddress = hostAddress;
		this.port = port;
		// 初始化selector,繫結服務端監聽套接字、感興趣事件及對應的handler
		this.selector = initSelector();
	}

	public static void main(String[] args) {
		try {
			// 啟動伺服器
			new Thread(new NioServer(null, 9090)).start();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		while (true) {
			try {
				// 選擇事件已經ready的selectionKey,該方法是阻塞的,只有當至少存在selectionKey,或者wakeup方法被呼叫,或者當前執行緒被中斷,才會返回
				selector.select();
				// 迴圈處理每一個事件
				Iterator<SelectionKey> items = selector.selectedKeys()
						.iterator();
				while (items.hasNext()) {
					SelectionKey key = (SelectionKey) items.next();
					items.remove();

					if (!key.isValid()) {
						continue;
					}
					// 事件處理分發
					dispatch(key);
				}

			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

	/**
	 * 事件處理分發
	 * 
	 * @param sk
	 *            已經ready的selectionKey
	 */
	private void dispatch(SelectionKey sk) {
		// 獲取繫結的handler
		Runnable r = (Runnable) sk.attachment();
		if (r != null) {
			r.run();
		}
	}

	/**
	 * 初始化selector,繫結服務端監聽套接字、感興趣事件及對應的handler
	 * 
	 * @return
	 * @throws IOException
	 */
	private Selector initSelector() throws IOException {
		// 建立一個selector
		Selector socketSelector = SelectorProvider.provider().openSelector();
		// 建立並開啟ServerSocketChannel
		serverChannel = ServerSocketChannel.open();
		// 設定為非阻塞
		serverChannel.configureBlocking(false);
		// 繫結埠
		serverChannel.socket().bind(new InetSocketAddress(hostAddress, port));
		// 用selector註冊套接字,並返回對應的SelectionKey,同時設定Key的interest set為監聽客戶端連線事件
		SelectionKey selectionKey = serverChannel.register(socketSelector,
				SelectionKey.OP_ACCEPT);
		// 繫結handler
		selectionKey.attach(new Acceptor());

		return socketSelector;
	}

	/**
	 * 處理OP_ACCEPT事件的handler
	 * 
	 */
	class Acceptor implements Runnable {
		@Override
		public void run() {
			try {
				accept();
			} catch (IOException e) {
				e.printStackTrace();
			}

		}

		private void accept() throws IOException {
			System.out.println("connect");
			// 建立連線
			SocketChannel socketChannel = serverChannel.accept();
			System.out.println("connected");
			// 設定為非阻塞
			socketChannel.configureBlocking(false);
			// 建立Handler,專門處理該連線後續發生的OP_READ和OP_WRITE事件
			new Handler(selector, socketChannel);
		}

	}

}
handler程式碼如下

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * @author jason
 * 
 */
final class Handler implements Runnable {
	final SocketChannel socketChannel;
	final SelectionKey key;
	static final int MAXIN = 8192, MAXOUT = 11240 * 1024;
	ByteBuffer readBuffer = ByteBuffer.allocate(MAXIN);
	ByteBuffer outBuffer = ByteBuffer.allocate(MAXOUT);
	static final int READING = 0;
	static final int SENDING = 1;
	int state = READING;

	Handler(Selector selector, SocketChannel socketChannel) throws IOException {
		this.socketChannel = socketChannel;
		// 用selector註冊套接字,並返回對應的SelectionKey,同時設定Key的interest set為監聽該連線上得read事件
		this.key = socketChannel.register(selector, SelectionKey.OP_READ);
		// 繫結handler
		this.key.attach(this);
	}

	/**
	 * 處理write
	 * 
	 * @throws IOException
	 */
	private void write() throws IOException {
		socketChannel.write(outBuffer);
		if (outBuffer.remaining() > 0) {
			return;
		}
		state = READING;
		key.interestOps(SelectionKey.OP_READ);
	}

	/**
	 * 處理read
	 * 
	 * @throws IOException
	 */
	private void read() throws IOException {
		readBuffer.clear();
		int numRead;
		try {
			// 讀取資料
			numRead = socketChannel.read(readBuffer);
		} catch (Exception e) {
			key.cancel();
			socketChannel.close();
			return;
		}

		if (numRead == -1) {
			socketChannel.close();
			key.cancel();
			return;
		}
		// 處理資料
		process(numRead);

	}

	/**
	 * 處理資料
	 * 
	 * @param numRead
	 */
	private void process(int numRead) {
		byte[] dataCopy = new byte[numRead];
		System.arraycopy(readBuffer.array(), 0, dataCopy, 0, numRead);
		System.out.println(new String(dataCopy));
		outBuffer = ByteBuffer.wrap(dataCopy);
		state = SENDING;
		// 設定Key的interest set為監聽該連線上的write事件
		key.interestOps(SelectionKey.OP_WRITE);
	}

	@Override
	public void run() {
		try {
			if (state == READING) {
				read();
			} else if (state == SENDING) {
				write();
			}

		} catch (IOException e) {
			e.printStackTrace();
		}

	}
}
客戶端程式碼如下:

package sampleNio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;

/**
 * @author jason
 * 
 */
public class NioClient implements Runnable {
	private InetAddress hostAddress;
	private int port;
	private Selector selector;
	private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
	private ByteBuffer outBuffer = ByteBuffer.wrap("nice to meet you"
			.getBytes());

	public NioClient(InetAddress hostAddress, int port) throws IOException {
		this.hostAddress = hostAddress;
		this.port = port;
		initSelector();
	}

	public static void main(String[] args) {
		try {
			NioClient client = new NioClient(
					InetAddress.getByName("localhost"), 9090);
			new Thread(client).start();

		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		while (true) {
			try {
				selector.select();

				Iterator<?> selectedKeys = selector.selectedKeys().iterator();
				while (selectedKeys.hasNext()) {
					SelectionKey key = (SelectionKey) selectedKeys.next();
					selectedKeys.remove();

					if (!key.isValid()) {
						continue;
					}

					if (key.isConnectable()) {
						finishConnection(key);
					} else if (key.isReadable()) {
						read(key);
					} else if (key.isWritable()) {
						write(key);
					}

				}

			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

	private void initSelector() throws IOException {
		// 建立一個selector
		selector = SelectorProvider.provider().openSelector();
		// 開啟SocketChannel
		SocketChannel socketChannel = SocketChannel.open();
		// 設定為非阻塞
		socketChannel.configureBlocking(false);
		// 連線指定IP和埠的地址
		socketChannel
				.connect(new InetSocketAddress(this.hostAddress, this.port));
		// 用selector註冊套接字,並返回對應的SelectionKey,同時設定Key的interest set為監聽服務端已建立連線的事件
		socketChannel.register(selector, SelectionKey.OP_CONNECT);
	}

	private void finishConnection(SelectionKey key) throws IOException {
		SocketChannel socketChannel = (SocketChannel) key.channel();
		try {
			// 判斷連線是否建立成功,不成功會拋異常
			socketChannel.finishConnect();
		} catch (IOException e) {
			key.cancel();
			return;
		}
		// 設定Key的interest set為OP_WRITE事件
		key.interestOps(SelectionKey.OP_WRITE);
	}

	/**
	 * 處理read
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void read(SelectionKey key) throws IOException {
		SocketChannel socketChannel = (SocketChannel) key.channel();
		readBuffer.clear();
		int numRead;
		try {
			numRead = socketChannel.read(readBuffer);
		} catch (Exception e) {
			key.cancel();
			socketChannel.close();
			return;
		}
		if (numRead == 1) {
			System.out.println("close connection");
			socketChannel.close();
			key.cancel();
			return;
		}
		// 處理響應
		handleResponse(socketChannel, readBuffer.array(), numRead);
	}

	/**
	 * 處理響應
	 * 
	 * @param socketChannel
	 * @param data
	 * @param numRead
	 * @throws IOException
	 */
	private void handleResponse(SocketChannel socketChannel, byte[] data,
			int numRead) throws IOException {
		byte[] rspData = new byte[numRead];
		System.arraycopy(data, 0, rspData, 0, numRead);
		System.out.println(new String(rspData));
		socketChannel.close();
		socketChannel.keyFor(selector).cancel();
	}

	/**
	 * 處理write
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void write(SelectionKey key) throws IOException {
		SocketChannel socketChannel = (SocketChannel) key.channel();
		socketChannel.write(outBuffer);
		if (outBuffer.remaining() > 0) {
			return;
		}
		// 設定Key的interest set為OP_READ事件
		key.interestOps(SelectionKey.OP_READ);
	}

}

4、 Reactor 的其他實現方式

單執行緒版本的 Reactor 最大的優勢是:不需要做併發控制,簡化了實現。缺點是不能充分利用多核 CPU的優勢,因為只有一個執行緒,該執行緒需要執行所有的操作: accept read decode compute encode send,而其中的 decode compute encode如果很耗時,則該執行緒就不能及時的響應其他客戶端的請求。

為了解決該問題,可以採用另外兩種版本:

4.1 Worker threads

Reactor所在的執行緒只需要專心的響應客戶端的請求: accept read send。對資料的具體處理過程則交給另外的執行緒池。這樣可以提高服務端對客戶端的響應速度,但同時增加了複雜度,也沒有充分利用到多核的優勢,因為 reactor只有一個,譬如同一時刻只能 read一個客戶端的請求資料。

4.2Multiple reactor threads

採用多個 reactor ,每個 reactor 都在自己單獨的執行緒裡執行。如果是多核,則可以同時響應多個客戶端的請求。( Netty 採用的是類似這種方式,boss執行緒池就是多個mainReactor,worker執行緒池就是多個subReactor)

5、總結

本文分析了基於 NIO Reactor模式的網路伺服器設計方案,在後續的 blog中將結合 Netty進一步分析高效能網路伺服器的設計。

 本文為原創,轉載請註明出處