Tomcat 連線數與執行緒池詳解 | BIO/NIO有何不同 | 簡談Kafka中的NIO網路通訊模型
前言
在使用tomcat時,經常會遇到連線數、執行緒數之類的配置問題,要真正理解這些概念,必須先了解Tomcat的聯結器(Connector)。
在前面的文章 詳解Tomcat配置檔案server.xml 中寫到過:Connector的主要功能,是接收連線請求,建立Request和Response物件用於和請求端交換資料;然後分配執行緒讓Engine(也就是Servlet容器)來處理這個請求,並把產生的Request和Response物件傳給Engine。當Engine處理完請求後,也會通過Connector將響應返回給客戶端。
可以說,Servlet容器處理請求,是需要Connector進行排程和控制的,Connector是Tomcat處理請求的主幹,因此Connector的配置和使用對Tomcat的效能有著重要的影響。這篇文章將從Connector入手,討論一些與Connector有關的重要問題,包括NIO/BIO模式、執行緒池、連線數等。
根據協議的不同,Connector可以分為HTTP Connector、AJP Connector等,本文只討論HTTP Connector。
一、Nio、Bio、APR
1、Connector的protocol
Connector在處理HTTP請求時,會使用不同的protocol。不同的Tomcat版本支援的protocol不同,其中最典型的protocol包括BIO、NIO和APR(Tomcat7中支援這3種,Tomcat8增加了對NIO2的支援,而到了Tomcat8.5和Tomcat9.0,則去掉了對BIO的支援)。
BIO是Blocking IO,顧名思義是阻塞的IO;NIO是Non-blocking IO,則是非阻塞的IO。而APR是Apache Portable Runtime,是Apache可移植執行庫,利用本地庫可以實現高可擴充套件性、高效能;Apr是在Tomcat上執行高併發應用的首選模式,但是需要安裝apr、apr-utils、tomcat-native等包。點選檢視 Tomcat Server 配置檔案詳解。
2、如何指定protocol
Connector使用哪種protocol,可以通過元素中的protocol屬性進行指定,也可以使用預設值。
指定的protocol取值及對應的協議如下:
HTTP/1.1:預設值,使用的協議與Tomcat版本有關
org.apache.coyote.http11.Http11Protocol:BIO
org.apache.coyote.http11.Http11NioProtocol:NIO
org.apache.coyote.http11.Http11Nio2Protocol:NIO2
org.apache.coyote.http11.Http11AprProtocol:APR
如果沒有指定protocol,則使用預設值HTTP/1.1,其含義如下:在Tomcat7中,自動選取使用BIO或APR(如果找到APR需要的本地庫,則使用APR,否則使用BIO);在Tomcat8中,自動選取使用NIO或APR(如果找到APR需要的本地庫,則使用APR,否則使用NIO)。
3、BIO/NIO有何不同
無論是BIO,還是NIO,Connector處理請求的大致流程是一樣的:
在accept佇列中接收連線(當客戶端向伺服器傳送請求時,如果客戶端與OS完成三次握手建立了連線,則OS將該連線放入accept佇列);在連線中獲取請求的資料,生成request;呼叫servlet容器處理請求;返回response。為了便於後面的說明,首先明確一下連線與請求的關係:連線是TCP層面的(傳輸層),對應socket;請求是HTTP層面的(應用層),必須依賴於TCP的連線實現;一個TCP連線中可能傳輸多個HTTP請求。
在BIO實現的Connector中,處理請求的主要實體是JIoEndpoint物件。JIoEndpoint維護了Acceptor和Worker:Acceptor接收socket,然後從Worker執行緒池中找出空閒的執行緒處理socket,如果worker執行緒池沒有空閒執行緒,則Acceptor將阻塞。其中Worker是Tomcat自帶的執行緒池,如果通過配置了其他執行緒池,原理與Worker類似。
在NIO實現的Connector中,處理請求的主要實體是NIoEndpoint物件。NIoEndpoint中除了包含Acceptor和Worker外,還是用了Poller,處理流程如下圖所示(圖片來源:http://gearever.iteye.com/blog/1844203)。
Acceptor接收socket後,不是直接使用Worker中的執行緒處理請求,而是先將請求傳送給了Poller,而Poller是實現NIO的關鍵。Acceptor向Poller傳送請求通過佇列實現,使用了典型的生產者-消費者模式。在Poller中,維護了一個Selector物件;當Poller從佇列中取出socket後,註冊到該Selector中;然後通過遍歷Selector,找出其中可讀的socket,並使用Worker中的執行緒處理相應請求。與BIO類似,Worker也可以被自定義的執行緒池代替。點選檢視 Tomcat Server 配置檔案詳解。
通過上述過程可以看出,在NIoEndpoint處理請求的過程中,無論是Acceptor接收socket,還是執行緒處理請求,使用的仍然是阻塞方式;但在“讀取socket並交給Worker中的執行緒”的這個過程中,使用非阻塞的NIO實現,這是NIO模式與BIO模式的最主要區別(其他區別對效能影響較小,暫時略去不提)。而這個區別,在併發量較大的情形下可以帶來Tomcat效率的顯著提升:
目前大多數HTTP請求使用的是長連線(HTTP/1.1預設keep-alive為true),而長連線意味著,一個TCP的socket在當前請求結束後,如果沒有新的請求到來,socket不會立馬釋放,而是等timeout後再釋放。如果使用BIO,“讀取socket並交給Worker中的執行緒”這個過程是阻塞的,也就意味著在socket等待下一個請求或等待釋放的過程中,處理這個socket的工作執行緒會一直被佔用,無法釋放;因此Tomcat可以同時處理的socket數目不能超過最大執行緒數,效能受到了極大限制。而使用NIO,“讀取socket並交給Worker中的執行緒”這個過程是非阻塞的,當socket在等待下一個請求或等待釋放時,並不會佔用工作執行緒,因此Tomcat可以同時處理的socket數目遠大於最大執行緒數,併發效能大大提高。
二、3個引數:acceptCount、maxConnections、maxThreads
再回顧一下Tomcat處理請求的過程:在accept佇列中接收連線(當客戶端向伺服器傳送請求時,如果客戶端與OS完成三次握手建立了連線,則OS將該連線放入accept佇列);在連線中獲取請求的資料,生成request;呼叫servlet容器處理請求;返回response。
相對應的,Connector中的幾個引數功能如下:
1、acceptCount
accept佇列的長度;當accept佇列中連線的個數達到acceptCount時,佇列滿,進來的請求一律被拒絕。預設值是100。
2、maxConnections
Tomcat在任意時刻接收和處理的最大連線數。當Tomcat接收的連線數達到maxConnections時,Acceptor執行緒不會讀取accept佇列中的連線;這時accept佇列中的執行緒會一直阻塞著,直到Tomcat接收的連線數小於maxConnections。如果設定為-1,則連線數不受限制。
預設值與聯結器使用的協議有關:NIO的預設值是10000,APR/native的預設值是8192,而BIO的預設值為maxThreads(如果配置了Executor,則預設值是Executor的maxThreads)。
在windows下,APR/native的maxConnections值會自動調整為設定值以下最大的1024的整數倍;如設定為2000,則最大值實際是1024。
3、maxThreads
請求處理執行緒的最大數量。預設值是200(Tomcat7和8都是的)。如果該Connector綁定了Executor,這個值會被忽略,因為該Connector將使用繫結的Executor,而不是內建的執行緒池來執行任務。
maxThreads規定的是最大的執行緒數目,並不是實際running的CPU數量;實際上,maxThreads的大小比CPU核心數量要大得多。這是因為,處理請求的執行緒真正用於計算的時間可能很少,大多數時間可能在阻塞,如等待資料庫返回資料、等待硬碟讀寫資料等。因此,在某一時刻,只有少數的執行緒真正的在使用物理CPU,大多數執行緒都在等待;因此執行緒數遠大於物理核心數才是合理的。
換句話說,Tomcat通過使用比CPU核心數量多得多的執行緒數,可以使CPU忙碌起來,大大提高CPU的利用率。
4、引數設定
(1)maxThreads的設定既與應用的特點有關,也與伺服器的CPU核心數量有關。通過前面介紹可以知道,maxThreads數量應該遠大於CPU核心數量;而且CPU核心數越大,maxThreads應該越大;應用中CPU越不密集(IO越密集),maxThreads應該越大,以便能夠充分利用CPU。當然,maxThreads的值並不是越大越好,如果maxThreads過大,那麼CPU會花費大量的時間用於執行緒的切換,整體效率會降低。
(2)maxConnections的設定與Tomcat的執行模式有關。如果tomcat使用的是BIO,那麼maxConnections的值應該與maxThreads一致;如果tomcat使用的是NIO,那麼類似於Tomcat的預設值,maxConnections值應該遠大於maxThreads。
(3)通過前面的介紹可以知道,雖然tomcat同時可以處理的連線數目是maxConnections,但伺服器中可以同時接收的連線數為maxConnections+acceptCount 。acceptCount的設定,與應用在連線過高情況下希望做出什麼反應有關係。如果設定過大,後面進入的請求等待時間會很長;如果設定過小,後面進入的請求立馬返回connection refused。點選檢視 Tomcat Server 配置檔案詳解。
三、執行緒池Executor
Executor元素代表Tomcat中的執行緒池,可以由其他元件共享使用;要使用該執行緒池,元件需要通過executor屬性指定該執行緒池。
Executor是Service元素的內嵌元素。一般來說,使用執行緒池的是Connector元件;為了使Connector能使用執行緒池,Executor元素應該放在Connector前面。Executor與Connector的配置舉例如下:
Executor的主要屬性包括:
name:該執行緒池的標記
maxThreads:執行緒池中最大活躍執行緒數,預設值200(Tomcat7和8都是)
minSpareThreads:執行緒池中保持的最小執行緒數,最小值是25
maxIdleTime:執行緒空閒的最大時間,當空閒超過該值時關閉執行緒(除非執行緒數小於minSpareThreads),單位是ms,預設值60000(1分鐘)
daemon:是否後臺執行緒,預設值true
threadPriority:執行緒優先順序,預設值5
namePrefix:執行緒名字的字首,執行緒池中執行緒名字為:namePrefix+執行緒編號
四、檢視當前狀態
上面介紹了Tomcat連線數、執行緒數的概念以及如何設定,下面說明如何檢視伺服器中的連線數和執行緒數。
檢視伺服器的狀態,大致分為兩種方案:(1)使用現成的工具,(2)直接使用Linux的命令檢視。
現成的工具,如JDK自帶的jconsole工具可以方便的檢視執行緒資訊(此外還可以檢視CPU、記憶體、類、JVM基本資訊等),Tomcat自帶的manager,收費工具New Relic等。下圖是jconsole檢視執行緒資訊的介面:
下面說一下如何通過Linux命令列,檢視伺服器中的連線數和執行緒數。
1、連線數
假設Tomcat接收http請求的埠是8083,則可以使用如下語句檢視連線情況:
netstat –nat | grep 8083
結果如下所示:
可以看出,有一個連線處於listen狀態,監聽請求;除此之外,還有4個已經建立的連線(ESTABLISHED)和2個等待關閉的連線(CLOSE_WAIT)。
2、執行緒
ps命令可以檢視程序狀態,如執行如下命令:
ps –e | grep java
結果如下圖:
可以看到,只打印了一個程序的資訊;27989是執行緒id,java是指執行的java命令。這是因為啟動一個tomcat,內部所有的工作都在這一個程序裡完成,包括主執行緒、垃圾回收執行緒、Acceptor執行緒、請求處理執行緒等等。
通過如下命令,可以看到該程序內有多少個執行緒;其中,nlwp含義是number of light-weight process。
ps –o nlwp 27989
可以看到,該程序內部有73個執行緒;但是73並沒有排除處於idle狀態的執行緒。要想獲得真正在running的執行緒數量,可以通過以下語句完成:
ps -eLo pid ,stat | grep 27989 | grep running | wc -l
其中ps -eLo pid ,stat可以找出所有執行緒,並列印其所在的程序號和執行緒當前的狀態;兩個grep命令分別篩選程序號和執行緒狀態;wc統計個數。其中,ps -eLo pid ,stat | grep 27989輸出的結果如下:
圖中只截圖了部分結果;Sl表示大多數執行緒都處於空閒狀態。
=========================================
簡談Kafka中的NIO網路通訊模型
基本上已經較為詳細地將RocketMQ這款分散式訊息佇列的RPC通訊部分的協議格式、訊息編解碼、通訊方式(同步/非同步/單向)、訊息收發流程和Netty的Reactor多執行緒分離處理架構講了一遍。同時,聯想業界大名鼎鼎的另一款開源分散式訊息佇列—Kafka,具備高吞吐量和高併發的特性,其網路通訊層是如何做到訊息的高效傳輸的呢?為了解開自己心中的疑慮,就查閱了Kafka的Network通訊模組的原始碼,乘機會寫本篇文章。
本文主要通過對Kafka原始碼的分析來簡述其Reactor的多執行緒網路通訊模型和總體框架結構,同時簡要介紹Kafka網路通訊層的設計與具體實現。
一、Kafka網路通訊模型的整體框架概述
Kafka的網路通訊模型是基於NIO的Reactor多執行緒模型來設計的。這裡先引用Kafka原始碼中註釋的一段話:
An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.
相信大家看了上面的這段引文註釋後,大致可以瞭解到Kafka的網路通訊層模型,主要採用了1(1個Acceptor執行緒)+N(N個Processor執行緒)+M(M個業務處理執行緒)。下面的表格簡要的列舉了下(這裡先簡單的看下後面還會詳細說明):
執行緒數 | 執行緒名 | 執行緒具體說明 |
---|---|---|
1 | kafka-socket-acceptor_%x | Acceptor執行緒,負責監聽Client端發起的請求 |
N | kafka-network-thread_%d | Processor執行緒,負責對Socket進行讀寫 |
M | kafka-request-handler-_%d | Worker執行緒,處理具體的業務邏輯並生成Response返回 |
Kafka網路通訊層的完整框架圖如下圖所示:
剛開始看到上面的這個框架圖可能會有一些不太理解,並不要緊,這裡可以先對Kafka的網路通訊層框架結構有一個大致瞭解。本文後面會結合Kafka的部分重要原始碼來詳細闡述上面的過程。這裡可以簡單總結一下其網路通訊模型中的幾個重要概念:
(1) Acceptor:1個接收執行緒,負責監聽新的連線請求,同時註冊OPACCEPT 事件,將新的連線按照"round robin"方式交給對應的 Processor 執行緒處理;
(2) Processor:N個處理器執行緒,其中每個 Processor 都有自己的 selector,它會向 Acceptor 分配的 SocketChannel 註冊相應的 OPREAD 事件,N 的大小由“num.networker.threads”決定;
(3) KafkaRequestHandler:M個請求處理執行緒,包含線上程池—KafkaRequestHandlerPool內部,從RequestChannel的全域性請求佇列—requestQueue中獲取請求資料並交給KafkaApis處理,M的大小由“num.io.threads”決定;
(4) RequestChannel:其為Kafka服務端的請求通道,該資料結構中包含了一個全域性的請求佇列 requestQueue和多個與Processor處理器相對應的響應佇列responseQueue,提供給Processor與請求處理執行緒KafkaRequestHandler和KafkaApis交換資料的地方;
(5) NetworkClient:其底層是對 Java NIO 進行相應的封裝,位於Kafka的網路介面層。Kafka訊息生產者物件—KafkaProducer的send方法主要呼叫NetworkClient完成訊息傳送;
(6) SocketServer:其是一個NIO的服務,它同時啟動一個Acceptor接收執行緒和多個Processor處理器執行緒。提供了一種典型的Reactor多執行緒模式,將接收客戶端請求和處理請求相分離;
(7) KafkaServer:代表了一個Kafka Broker的例項;其startup方法為例項啟動的入口;
(8) KafkaApis:Kafka的業務邏輯處理Api,負責處理不同型別的請求;比如“傳送訊息”、“獲取訊息偏移量—offset”和“處理心跳請求”等;
二、Kafka網路通訊層的設計與具體實現
這一節將結合Kafka網路通訊層的原始碼來分析其設計與實現,這裡主要詳細介紹網路通訊層的幾個重要元素——SocketServer、Acceptor、Processor、RequestChannel、KafkaRequestHandler 和 KafkaApis。本文分析的原始碼部分均基於 Kafka 的 0.11.0 版本。
1、SocketServer
SocketServer是接收客戶端Socket請求連線、處理請求並返回處理結果的核心類,Acceptor及Processor的初始化、處理邏輯都是在這裡實現的。在KafkaServer例項啟動時會呼叫其startup的初始化方法,會初始化1個 Acceptor和N個Processor執行緒(每個EndPoint都會初始化,一般來說一個Server只會設定一個埠),其實現如下:
-
def startup {
-
this.synchronized {
-
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
-
val sendBufferSize = config.socketSendBufferBytes
-
val recvBufferSize = config.socketReceiveBufferBytes
-
val brokerId = config.brokerId
-
var processorBeginIndex = 0
-
// 一個broker一般只設置一個埠
-
config.listeners.foreach { endpoint =>
-
val listenerName = endpoint.listenerName
-
val securityProtocol = endpoint.securityProtocol
-
val processorEndIndex = processorBeginIndex + numProcessorThreads
-
//N 個 processor
-
for (i <- processorBeginIndex until processorEndIndex)
-
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
-
//1個 Acceptor
-
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
-
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
-
acceptors.put(endpoint, acceptor)
-
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start
-
acceptor.awaitStartup
-
processorBeginIndex = processorEndIndex
-
}
-
}
2、Acceptor
Acceptor是一個繼承自抽象類AbstractServerThread的執行緒類。Acceptor的主要任務是監聽並且接收客戶端的請求,同時建立資料傳輸通道—SocketChannel,然後以輪詢的方式交給一個後端的Processor執行緒處理(具體的方式是新增socketChannel至併發佇列並喚醒Processor執行緒處理)。
在該執行緒類中主要可以關注以下兩個重要的變數:
(1) nioSelector:通過NSelector.open方法建立的變數,封裝了JAVA NIO Selector的相關操作;
(2) serverChannel:用於監聽埠的服務端Socket套接字物件;
下面來看下Acceptor主要的run方法的原始碼:
-
def run {
-
//首先註冊OP_ACCEPT事件
-
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
-
startupComplete
-
try {
-
var currentProcessor = 0
-
//以輪詢方式查詢並等待關注的事件發生
-
while (isRunning) {
-
try {
-
val ready = nioSelector.select(500)
-
if (ready > 0) {
-
val keys = nioSelector.selectedKeys
-
val iter = keys.iterator
-
while (iter.hasNext && isRunning) {
-
try {
-
val key = iter.next
-
iter.remove
-
if (key.isAcceptable)
-
//如果事件發生則呼叫accept方法對OP_ACCEPT事件處理
-
accept(key, processors(currentProcessor))
-
else
-
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
-
//輪詢演算法
-
// round robin to the next processor thread
-
currentProcessor = (currentProcessor + 1) % processors.length
-
} catch {
-
case e: Throwable => error("Error while accepting connection", e)
-
}
-
}
-
}
-
}
-
//程式碼省略
-
}
-
def accept(key: SelectionKey, processor: Processor) {
-
val serverSocketChannel = key.channel.asInstanceOf[ServerSocketChannel]
-
val socketChannel = serverSocketChannel.accept
-
try {
-
connectionQuotas.inc(socketChannel.socket.getInetAddress)
-
socketChannel.configureBlocking(false)
-
socketChannel.socket.setTcpNoDelay(true)
-
socketChannel.socket.setKeepAlive(true)
-
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
-
socketChannel.socket.setSendBufferSize(sendBufferSize)
-
processor.accept(socketChannel)
-
} catch {
-
//省略部分程式碼
-
}
-
}
-
def accept(socketChannel: SocketChannel) {
-
newConnections.add(socketChannel)
-
wakeup
-
}
在上面原始碼中可以看到,Acceptor執行緒啟動後,首先會向用於監聽埠的服務端套接字物件—ServerSocketChannel上註冊OPACCEPT 事件。然後以輪詢的方式等待所關注的事件發生。如果該事件發生,則呼叫accept方法對OPACCEPT事件進行處理。這裡,Processor是通過round robin方法選擇的,這樣可以保證後面多個Processor執行緒的負載基本均勻。 Acceptor的accept方法的作用主要如下:
(1) 通過SelectionKey取得與之對應的serverSocketChannel例項,並呼叫它的accept方法與客戶端建立連線;
(2) 呼叫connectionQuotas.inc方法增加連線統計計數;並同時設定第 (1) 步中建立返回的socketChannel屬性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等) ;
(3) 將socketChannel交給processor.accept方法進行處理。這裡主要是將socketChannel加入Processor處理器的併發佇列newConnections佇列中,然後喚醒Processor執行緒從佇列中獲取socketChannel並處理。其中,newConnections會被Acceptor執行緒和Processor執行緒併發訪問操作,所以newConnections是ConcurrentLinkedQueue佇列(一個基於連結節點的無界執行緒安全佇列)
3、Processor
Processor同Acceptor一樣,也是一個執行緒類,繼承了抽象類AbstractServerThread。其主要是從客戶端的請求中讀取資料和將KafkaRequestHandler處理完響應結果返回給客戶端。在該執行緒類中主要關注以下幾個重要的變數:
(1) newConnections:在上面的Acceptor一節中已經提到過,它是一種ConcurrentLinkedQueue[SocketChannel]型別的佇列,用於儲存新連線交由Processor處理的socketChannel;
(2) inflightResponses:是一個Map[String, RequestChannel.Response]型別的集合,用於記錄尚未傳送的響應;
(3) selector:是一個型別為KSelector變數,用於管理網路連線; 下面先給出Processor處理器執行緒run方法執行的流程圖:
從上面的流程圖中能夠可以看出Processor處理器執行緒在其主流程中主要完成了這樣子幾步操作:
(1) 處理newConnections佇列中的socketChannel。遍歷取出佇列中的每個socketChannel並將其在selector上註冊OPREAD事件;
(2) 處理RequestChannel中與當前Processor對應響應佇列中的Response。在這一步中會根據responseAction的型別(NoOpAction/SendAction/CloseConnectionAction)進行判斷,若為“NoOpAction”,表示該連線對應的請求無需響應;若為“SendAction”,表示該Response需要傳送給客戶端,則會通過“selector.send”註冊OPWRITE事件,並且將該Response從responseQueue響應佇列中移至inflightResponses集合中;“CloseConnectionAction”,表示該連線是要關閉的;
(3) 呼叫selector.poll方法進行處理。該方法底層即為呼叫nioSelector.select方法進行處理。
(4) 處理已接受完成的資料包佇列—completedReceives。在processCompletedReceives方法中呼叫“requestChannel.sendRequest”方法將請求Request新增至requestChannel的全域性請求佇列—requestQueue中,等待KafkaRequestHandler來處理。同時,呼叫“selector.mute”方法取消與該請求對應的連線通道上的OPREAD事件;
(5) 處理已傳送完的佇列—completedSends。當已經完成將response傳送給客戶端,則將其從inflightResponses移除,同時通過呼叫“selector.unmute”方法為對應的連線通道重新註冊OPREAD事件;
(6) 處理斷開連線的佇列。將該response從inflightResponses集合中移除,同時將connectionQuotas統計計數減1;
4、RequestChannel
在Kafka的網路通訊層中,RequestChannel為Processor處理器執行緒與KafkaRequestHandler執行緒之間的資料交換提供了一個數據緩衝區,是通訊過程中Request和Response快取的地方。因此,其作用就是在通訊中起到了一個數據緩衝佇列的作用。Processor執行緒將讀取到的請求新增至RequestChannel的全域性請求佇列—requestQueue中;KafkaRequestHandler執行緒從請求佇列中獲取並處理,處理完以後將Response新增至RequestChannel的響應佇列—responseQueue中,並通過responseListeners喚醒對應的Processor執行緒,最後Processor執行緒從響應佇列中取出後傳送至客戶端。
5、KafkaRequestHandler
KafkaRequestHandler也是一種執行緒類,在KafkaServer例項啟動時候會例項化一個執行緒池—KafkaRequestHandlerPool物件(包含了若干個KafkaRequestHandler執行緒),這些執行緒以守護執行緒的方式在後臺執行。在KafkaRequestHandler的run方法中會迴圈地從RequestChannel中阻塞式讀取request,讀取後再交由KafkaApis來具體處理。
6、KafkaApis
KafkaApis是用於處理對通訊網路傳輸過來的業務訊息請求的中心轉發元件。該元件反映出Kafka Broker Server可以提供哪些服務。
三、總結
仔細閱讀Kafka的NIO網路通訊層的原始碼過程中還是可以收穫不少關於NIO網路通訊模組的關鍵技術。Apache的任何一款開源中介軟體都有其設計獨到之處,值得借鑑和學習。對於任何一位使用Kafka這款分散式訊息佇列的同學來說,如果能夠在一定實踐的基礎上,再通過閱讀其原始碼能起到更為深入理解的效果,對於大規模Kafka叢集的效能調優和問題定位都大有裨益。 對於剛接觸Kafka的同學來說,想要自己掌握其NIO網路通訊層模型的關鍵設計,還需要不斷地使用本地環境進行debug除錯和閱讀原始碼反覆思考