Mina、Netty、Twisted一起學(十):執行緒模型
要想開發一個高效能的TCP伺服器,熟悉所使用框架的執行緒模型非常重要。MINA、Netty、Twisted本身都是高效能的網路框架,如果再搭配上高效率的程式碼,才能實現一個高大上的伺服器。但是如果不瞭解它們的執行緒模型,就很難寫出高效能的程式碼。框架本身效率再高,程式寫的太差,那麼伺服器整體的效能也不會太高。就像一個電腦,CPU再好,記憶體小硬碟慢散熱差,整體的效能也不會太高。
玩過Android開發的同學會知道,在Android應用中有一個非常重要執行緒:UI執行緒(即主執行緒)。UI執行緒是負責一個Android的介面顯示以及和使用者互動。Activity的一些方法,例如onCreate、onStop、onDestroy都是執行在UI執行緒中的。但是在編寫Activity程式碼的時候有一點需要非常注意,就是絕對不能把阻塞的或者耗時的任務寫在這些方法中,如果寫在這些方法中,則會阻塞UI執行緒,導致使用者操作的介面反應遲鈍,體驗很差。所以在Android開發中,耗時或者阻塞的任務會另外開執行緒去做。
同樣在MINA、Netty、Twisted中,也有一個非常重要的執行緒:IO執行緒。
傳統的BIO實現的TCP伺服器,特別對於TCP長連線,通常都要為每個連線開啟一個執行緒,執行緒也是作業系統的一種資源,所以很難實現高效能高併發。而非同步IO實現的TCP伺服器,由於IO操作都是非同步的,可以用一個執行緒或者少量執行緒來處理大量連線的IO操作,所以只需要少量的IO執行緒就可以實現高併發的伺服器。
在網路程式設計過程中,通常有一些業務邏輯是比較耗時、阻塞的,例如資料庫操作,如果網路不好,加上資料庫效能差,SQL不夠優化,資料量大,一條SQL可能會執行很久。由於IO執行緒本身數量就不多,通常只有一個或幾個,而如果這種耗時阻塞的程式碼在IO執行緒中執行的話,IO執行緒的其他事情,例如網路read和write,就無法進行了,會影響IO效能以及整個伺服器的效能。
所以,無論是使用MINA、Netty、Twisted,如果有耗時的任務,就絕對不能在IO執行緒中執行,而是要另外開啟執行緒來處理。
MINA:
在MINA中,有三種非常重要的執行緒:Acceptor thread、Connector thread、I/O processor thread。
下面是官方文件的介紹:
In MINA, there are three kinds of I/O worker threads in the NIO socket implementation.
Acceptor thread accepts incoming connections, and forwards the connection to the I/O processor thread for read and write operations.
Each SocketAcceptor creates one acceptor thread. You can't configure the number of the acceptor threads.
Connector thread
Each SocketConnector creates one connector thread. You can't configure the number of the connector threads, either.
I/O processor thread performs the actual read and write operation until the connection is closed.
Each SocketAcceptor or SocketConnector creates its own I/O processor thread(s). You can configure the number of the I/O processor threads. The default maximum number of the I/O processor threads is the number of CPU cores + 1.
Acceptor thread:
這個執行緒用於TCP伺服器接收新的連線,並將連線分配到I/O processor thread,由I/O processor thread來處理IO操作。每個NioSocketAcceptor建立一個Acceptor thread,執行緒數量不可配置。
Connector thread:
用於處理TCP客戶端連線到伺服器,並將連線分配到I/O processor thread,由I/O processor thread來處理IO操作。每個NioSocketConnector建立一個Connector thread,執行緒數量不可配置。
I/O processor thread:
用於處理TCP連線的I/O操作,如read、write。I/O processor thread的執行緒數量可通過NioSocketAcceptor或NioSocketConnector構造方法來配置,預設是CPU核心數+1。
由於本文主要介紹TCP伺服器的執行緒模型,所以就沒有Connector thread什麼事了。下面說下Acceptor thread和I/O processor thread處理TCP連線的流程:
MINA的TCP伺服器包含一個Acceptor thread和多個I/O processor thread,當有新的客戶端連線到伺服器,首先會由Acceptor thread獲取到這個連線,同時將這個連線分配給多個I/O processor thread中的一個執行緒,當客戶端傳送資料給伺服器,對應的I/O processor thread負責讀取這個資料,並執行IoFilterChain中的IoFilter以及IoHandle。
由於I/O processor thread本身數量有限,通常就那麼幾個,但是又要處理成千上萬個連線的IO操作,包括read、write、協議的編碼解碼、各種Filter以及IoHandle中的業務邏輯,特別是業務邏輯,比如IoHandle的messageReceived,如果有耗時、阻塞的任務,例如查詢資料庫,那麼就會阻塞I/O processor thread,導致無法及時處理其他IO事件,伺服器效能下降。
針對這個問題,MINA中提供了一個ExecutorFilter,用於將需要執行很長時間的會阻塞I/O processor thread的業務邏輯放到另外的執行緒中,這樣就不會阻塞I/O processor thread,不會影響IO操作。ExecutorFilter中包含一個執行緒池,預設是OrderedThreadPoolExecutor,這個執行緒池保證同一個連線的多個事件按順序依次執行,另外還可以使用UnorderedThreadPoolExecutor,它不會保證同一連線的事件的執行順序,並且可能會併發執行。二者之間可以根據需要來選擇。
public class TcpServer {
public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor(4); // 配置I/O processor thread執行緒數量
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
acceptor.getFilterChain().addLast("executor", new ExecutorFilter()); // 將TcpServerHandle中的業務邏輯拿到ExecutorFilter的執行緒池中執行
acceptor.setHandler(new TcpServerHandle());
acceptor.bind(new InetSocketAddress(8080));
}
}
class TcpServerHandle extends IoHandlerAdapter {
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
// 假設這裡有個變態的SQL要執行3秒
Thread.sleep(3000);
}
}
Netty:
Netty的TCP伺服器啟動時,會建立兩個NioEventLoopGroup,一個boss,一個worker:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup實際上是一個執行緒組,可以通過構造方法設定執行緒數量,預設為CPU核心數*2。boss用於伺服器接收新的TCP連線,boss執行緒接收到新的連線後將連線註冊到worker執行緒。worker執行緒用於處理IO操作,例如read、write。
Netty中的boss執行緒類似於MINA的Acceptor thread,work執行緒和MINA的I/O processor thread類似。不同的一點是MINA的Acceptor thread是單個執行緒,而Netty的boss是一個執行緒組。實際上Netty的ServerBootstrap可以監聽多個埠號,如果只監聽一個埠號,那麼只需要一個boss執行緒即可,推薦將bossGroup的執行緒數量設定成1。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
當有新的TCP客戶端連線到伺服器,將由boss執行緒來接收連線,然後將連線註冊到worker執行緒,當客戶端傳送資料到伺服器,worker執行緒負責接收資料,並執行ChannelPipeline中的ChannelHandler。
和MINA的I/O processor thread 類似,Netty的worker執行緒本身數量不多,而且要實時處理IO事件,如果有耗時的業務邏輯阻塞住worker執行緒,例如在channelRead中執行一個耗時的資料庫查詢,會導致IO操作無法進行,伺服器整體效能就會下降。
在Netty 3中,存在一個ExecutionHandler,它是ChannelHandler的一個實現類,用於處理耗時的業務邏輯,類似於MINA的ExecutorFilter,但是在Netty 4中被刪除了。所以這裡不再介紹ExecutionHandler。
Netty 4中可以使用EventExecutorGroup來處理耗時的業務邏輯:
public class TcpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 伺服器監聽一個埠號,boss執行緒數建議設定成1
EventLoopGroup workerGroup = new NioEventLoopGroup(4); // worker執行緒數設定成4
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
// 建立一個16個執行緒的執行緒組來處理耗時的業務邏輯
private EventExecutorGroup group = new DefaultEventExecutorGroup(16);
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(80));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
// 將TcpServerHandler中的業務邏輯放到EventExecutorGroup執行緒組中執行
pipeline.addLast(group, new TcpServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class TcpServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
// 假設這裡有個變態的SQL要執行3秒
Thread.sleep(3000);
}
}
Twisted:
Twisted的執行緒模型是最簡單粗暴的:單執行緒,即reactor執行緒。也就是,所有的IO操作、編碼解碼、業務邏輯等都是在一個執行緒中執行。實際上,即使是單執行緒,其效能也是非常高的,可以同時處理大量的連線。在單執行緒的環境下程式設計,不需要考慮執行緒安全的問題。不過,單執行緒帶來一個問題,就是耗時的業務邏輯,如果執行在reactor執行緒中,那麼其他事情,例如網路IO,就要等到reactor執行緒空閒時才能繼續做,會影響到伺服器的效能。
下面的程式碼,通過reactor.callInThread將耗時的業務邏輯放到單獨的執行緒池中執行,而不在reactor執行緒中執行。這樣就不會影響到reactor執行緒的網路IO了。可以通過reactor.suggestThreadPoolSize設定這個執行緒池的執行緒數量。
# -*- coding:utf-8 –*-
import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor
# 耗時、阻塞的業務邏輯
def logic(data):
print data
time.sleep(3) # 假設這裡有個變態的SQL要執行3秒
class TcpServerHandle(Protocol):
def dataReceived(self, data):
reactor.callInThread(logic, data) # 線上程池中執行logic(data)耗時任務,不在reactor執行緒中執行
reactor.suggestThreadPoolSize(8) # 設定執行緒池的執行緒數量為8
factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
由於Twisted的reactor的單執行緒設計,它的很多程式碼都不是執行緒安全的。所以在非reactor執行緒中執行的程式碼需要注意執行緒安全問題。例如transport.write就不是執行緒安全的。不過在非reactor執行緒中可以呼叫reactor.callFromThread方法,這個方法功能和callInThread相反,將一個函式從別的執行緒放到reactor執行緒中執行。不過還是要注意,reactor.callFromThread呼叫的函式由於執行在reactor執行緒中,如果執行耗時,同樣會阻塞reactor執行緒,影響IO。
# -*- coding:utf-8 –*-
import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor
# 非執行緒安全的程式碼
def notThreadSafe():
print "notThreadSafe"
# 耗時、阻塞的業務邏輯
def logic(data):
print data
time.sleep(3) # 假設這裡有個變態的SQL要執行3秒
reactor.callFromThread(notThreadSafe) # 在reactor執行緒中執行notThreadSafe()
class TcpServerHandle(Protocol):
def dataReceived(self, data):
reactor.callInThread(logic, data) # 線上程池中執行logic(data)耗時任務,不在reactor執行緒中執行
reactor.suggestThreadPoolSize(8) # 設定執行緒池的執行緒數量為8
factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
此外,twisted.internet.threads中提供了許多很方便的函式。例如threads.deferToThread用於將一個耗時任務放線上程池中執行,與reactor.callInThread不同的是,它的返回值是Deferred型別,可以通過添加回調函式,處理耗時任務完成後的結果(返回值)。
# -*- coding:utf-8 –*-
import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor, threads
# 耗時、阻塞的業務邏輯
def logic(data):
print data
time.sleep(3) # 假設這裡有個變態的SQL要執行3秒
return "success"
# 回撥函式
def logicSuccess(result):
# result即為logic函式的返回值,即"success"
print result
class TcpServerHandle(Protocol):
def dataReceived(self, data):
d = threads.deferToThread(logic, data) # 將耗時的業務邏輯logic(data)放到執行緒池中執行,deferToThread返回值型別是Deferred
d.addCallback(logicSuccess) # 添加回調函式
reactor.suggestThreadPoolSize(8) # 設定執行緒池的執行緒數量為8
factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()