1. 程式人生 > >Mina、Netty、Twisted一起學(十):執行緒模型

Mina、Netty、Twisted一起學(十):執行緒模型

要想開發一個高效能的TCP伺服器,熟悉所使用框架的執行緒模型非常重要。MINA、Netty、Twisted本身都是高效能的網路框架,如果再搭配上高效率的程式碼,才能實現一個高大上的伺服器。但是如果不瞭解它們的執行緒模型,就很難寫出高效能的程式碼。框架本身效率再高,程式寫的太差,那麼伺服器整體的效能也不會太高。就像一個電腦,CPU再好,記憶體小硬碟慢散熱差,整體的效能也不會太高。

玩過Android開發的同學會知道,在Android應用中有一個非常重要執行緒:UI執行緒(即主執行緒)。UI執行緒是負責一個Android的介面顯示以及和使用者互動。Activity的一些方法,例如onCreate、onStop、onDestroy都是執行在UI執行緒中的。但是在編寫Activity程式碼的時候有一點需要非常注意,就是絕對不能把阻塞的或者耗時的任務寫在這些方法中,如果寫在這些方法中,則會阻塞UI執行緒,導致使用者操作的介面反應遲鈍,體驗很差。所以在Android開發中,耗時或者阻塞的任務會另外開執行緒去做。

同樣在MINA、Netty、Twisted中,也有一個非常重要的執行緒:IO執行緒

傳統的BIO實現的TCP伺服器,特別對於TCP長連線,通常都要為每個連線開啟一個執行緒,執行緒也是作業系統的一種資源,所以很難實現高效能高併發。而非同步IO實現的TCP伺服器,由於IO操作都是非同步的,可以用一個執行緒或者少量執行緒來處理大量連線的IO操作,所以只需要少量的IO執行緒就可以實現高併發的伺服器。

在網路程式設計過程中,通常有一些業務邏輯是比較耗時、阻塞的,例如資料庫操作,如果網路不好,加上資料庫效能差,SQL不夠優化,資料量大,一條SQL可能會執行很久。由於IO執行緒本身數量就不多,通常只有一個或幾個,而如果這種耗時阻塞的程式碼在IO執行緒中執行的話,IO執行緒的其他事情,例如網路read和write,就無法進行了,會影響IO效能以及整個伺服器的效能。

所以,無論是使用MINA、Netty、Twisted,如果有耗時的任務,就絕對不能在IO執行緒中執行,而是要另外開啟執行緒來處理。

MINA:

在MINA中,有三種非常重要的執行緒:Acceptor thread、Connector thread、I/O processor thread。

下面是官方文件的介紹:

In MINA, there are three kinds of I/O worker threads in the NIO socket implementation.
Acceptor thread accepts incoming connections, and forwards the connection to the I/O processor thread for read and write operations.
Each SocketAcceptor creates one acceptor thread. You can't configure the number of the acceptor threads.
Connector thread

attempts connections to a remote peer, and forwards the succeeded connection to the I/O processor thread for read and write operations.
Each SocketConnector creates one connector thread. You can't configure the number of the connector threads, either.
I/O processor thread performs the actual read and write operation until the connection is closed.
Each SocketAcceptor or SocketConnector creates its own I/O processor thread(s). You can configure the number of the I/O processor threads. The default maximum number of the I/O processor threads is the number of CPU cores + 1.

Acceptor thread:

這個執行緒用於TCP伺服器接收新的連線,並將連線分配到I/O processor thread,由I/O processor thread來處理IO操作。每個NioSocketAcceptor建立一個Acceptor thread,執行緒數量不可配置。

Connector thread:

用於處理TCP客戶端連線到伺服器,並將連線分配到I/O processor thread,由I/O processor thread來處理IO操作。每個NioSocketConnector建立一個Connector thread,執行緒數量不可配置。

I/O processor thread:

用於處理TCP連線的I/O操作,如read、write。I/O processor thread的執行緒數量可通過NioSocketAcceptor或NioSocketConnector構造方法來配置,預設是CPU核心數+1。

由於本文主要介紹TCP伺服器的執行緒模型,所以就沒有Connector thread什麼事了。下面說下Acceptor thread和I/O processor thread處理TCP連線的流程:

MINA的TCP伺服器包含一個Acceptor thread和多個I/O processor thread,當有新的客戶端連線到伺服器,首先會由Acceptor thread獲取到這個連線,同時將這個連線分配給多個I/O processor thread中的一個執行緒,當客戶端傳送資料給伺服器,對應的I/O processor thread負責讀取這個資料,並執行IoFilterChain中的IoFilter以及IoHandle。

由於I/O processor thread本身數量有限,通常就那麼幾個,但是又要處理成千上萬個連線的IO操作,包括read、write、協議的編碼解碼、各種Filter以及IoHandle中的業務邏輯,特別是業務邏輯,比如IoHandle的messageReceived,如果有耗時、阻塞的任務,例如查詢資料庫,那麼就會阻塞I/O processor thread,導致無法及時處理其他IO事件,伺服器效能下降。

針對這個問題,MINA中提供了一個ExecutorFilter,用於將需要執行很長時間的會阻塞I/O processor thread的業務邏輯放到另外的執行緒中,這樣就不會阻塞I/O processor thread,不會影響IO操作。ExecutorFilter中包含一個執行緒池,預設是OrderedThreadPoolExecutor,這個執行緒池保證同一個連線的多個事件按順序依次執行,另外還可以使用UnorderedThreadPoolExecutor,它不會保證同一連線的事件的執行順序,並且可能會併發執行。二者之間可以根據需要來選擇。

public class TcpServer {

	public static void main(String[] args) throws IOException {
		IoAcceptor acceptor = new NioSocketAcceptor(4); // 配置I/O processor thread執行緒數量
		acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
		acceptor.getFilterChain().addLast("executor", new ExecutorFilter()); // 將TcpServerHandle中的業務邏輯拿到ExecutorFilter的執行緒池中執行
		acceptor.setHandler(new TcpServerHandle());
		acceptor.bind(new InetSocketAddress(8080));
	}

}

class TcpServerHandle extends IoHandlerAdapter {

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		
		// 假設這裡有個變態的SQL要執行3秒
		Thread.sleep(3000);
	}
}

Netty:

Netty的TCP伺服器啟動時,會建立兩個NioEventLoopGroup,一個boss,一個worker

EventLoopGroup bossGroup = new NioEventLoopGroup();  
EventLoopGroup workerGroup = new NioEventLoopGroup();

NioEventLoopGroup實際上是一個執行緒組,可以通過構造方法設定執行緒數量,預設為CPU核心數*2。boss用於伺服器接收新的TCP連線,boss執行緒接收到新的連線後將連線註冊到worker執行緒。worker執行緒用於處理IO操作,例如read、write。

Netty中的boss執行緒類似於MINA的Acceptor thread,work執行緒和MINA的I/O processor thread類似。不同的一點是MINA的Acceptor thread是單個執行緒,而Netty的boss是一個執行緒組。實際上Netty的ServerBootstrap可以監聽多個埠號,如果只監聽一個埠號,那麼只需要一個boss執行緒即可,推薦將bossGroup的執行緒數量設定成1。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

當有新的TCP客戶端連線到伺服器,將由boss執行緒來接收連線,然後將連線註冊到worker執行緒,當客戶端傳送資料到伺服器,worker執行緒負責接收資料,並執行ChannelPipeline中的ChannelHandler。

和MINA的I/O processor thread 類似,Netty的worker執行緒本身數量不多,而且要實時處理IO事件,如果有耗時的業務邏輯阻塞住worker執行緒,例如在channelRead中執行一個耗時的資料庫查詢,會導致IO操作無法進行,伺服器整體效能就會下降。

在Netty 3中,存在一個ExecutionHandler,它是ChannelHandler的一個實現類,用於處理耗時的業務邏輯,類似於MINA的ExecutorFilter,但是在Netty 4中被刪除了。所以這裡不再介紹ExecutionHandler。

Netty 4中可以使用EventExecutorGroup來處理耗時的業務邏輯:

public class TcpServer {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 伺服器監聽一個埠號,boss執行緒數建議設定成1
		EventLoopGroup workerGroup = new NioEventLoopGroup(4); // worker執行緒數設定成4
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						
						// 建立一個16個執行緒的執行緒組來處理耗時的業務邏輯
						private EventExecutorGroup group = new DefaultEventExecutorGroup(16);
						
						@Override
						public void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();
							pipeline.addLast(new LineBasedFrameDecoder(80));
							pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
							
							// 將TcpServerHandler中的業務邏輯放到EventExecutorGroup執行緒組中執行
							pipeline.addLast(group, new TcpServerHandler());
						}
					});
			ChannelFuture f = b.bind(8080).sync();
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}

}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
		
		// 假設這裡有個變態的SQL要執行3秒
		Thread.sleep(3000);

	}
}

Twisted:

Twisted的執行緒模型是最簡單粗暴的:單執行緒,即reactor執行緒。也就是,所有的IO操作、編碼解碼、業務邏輯等都是在一個執行緒中執行。實際上,即使是單執行緒,其效能也是非常高的,可以同時處理大量的連線。在單執行緒的環境下程式設計,不需要考慮執行緒安全的問題。不過,單執行緒帶來一個問題,就是耗時的業務邏輯,如果執行在reactor執行緒中,那麼其他事情,例如網路IO,就要等到reactor執行緒空閒時才能繼續做,會影響到伺服器的效能。

下面的程式碼,通過reactor.callInThread將耗時的業務邏輯放到單獨的執行緒池中執行,而不在reactor執行緒中執行。這樣就不會影響到reactor執行緒的網路IO了。可以通過reactor.suggestThreadPoolSize設定這個執行緒池的執行緒數量。

# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor

# 耗時、阻塞的業務邏輯
def logic(data):
    print data
    time.sleep(3) # 假設這裡有個變態的SQL要執行3秒    

class TcpServerHandle(Protocol):
    
    def dataReceived(self, data):
        reactor.callInThread(logic, data) # 線上程池中執行logic(data)耗時任務,不在reactor執行緒中執行

reactor.suggestThreadPoolSize(8) # 設定執行緒池的執行緒數量為8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

由於Twisted的reactor的單執行緒設計,它的很多程式碼都不是執行緒安全的。所以在非reactor執行緒中執行的程式碼需要注意執行緒安全問題。例如transport.write就不是執行緒安全的。不過在非reactor執行緒中可以呼叫reactor.callFromThread方法,這個方法功能和callInThread相反,將一個函式從別的執行緒放到reactor執行緒中執行。不過還是要注意,reactor.callFromThread呼叫的函式由於執行在reactor執行緒中,如果執行耗時,同樣會阻塞reactor執行緒,影響IO。
# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor

# 非執行緒安全的程式碼
def notThreadSafe():
    print "notThreadSafe"

# 耗時、阻塞的業務邏輯
def logic(data):
    print data
    time.sleep(3) # 假設這裡有個變態的SQL要執行3秒
    reactor.callFromThread(notThreadSafe) # 在reactor執行緒中執行notThreadSafe()
    

class TcpServerHandle(Protocol):
    
    def dataReceived(self, data):
        reactor.callInThread(logic, data) # 線上程池中執行logic(data)耗時任務,不在reactor執行緒中執行

reactor.suggestThreadPoolSize(8) # 設定執行緒池的執行緒數量為8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

此外,twisted.internet.threads中提供了許多很方便的函式。例如threads.deferToThread用於將一個耗時任務放線上程池中執行,與reactor.callInThread不同的是,它的返回值是Deferred型別,可以通過添加回調函式,處理耗時任務完成後的結果(返回值)。
# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor, threads

# 耗時、阻塞的業務邏輯
def logic(data):
    print data
    time.sleep(3) # 假設這裡有個變態的SQL要執行3秒
    return "success"

# 回撥函式
def logicSuccess(result):
    # result即為logic函式的返回值,即"success"
    print result

class TcpServerHandle(Protocol):
    
    def dataReceived(self, data):
        d = threads.deferToThread(logic, data) # 將耗時的業務邏輯logic(data)放到執行緒池中執行,deferToThread返回值型別是Deferred
        d.addCallback(logicSuccess) # 添加回調函式

reactor.suggestThreadPoolSize(8) # 設定執行緒池的執行緒數量為8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()


MINA、Netty、Twisted一起學系列

原始碼