1. 程式人生 > >訊息中介軟體—簡談Kafka中的NIO網路通訊模型

訊息中介軟體—簡談Kafka中的NIO網路通訊模型

摘要:很多人喜歡把RocketMQ與Kafka做對比,其實這兩款訊息佇列的網路通訊層還是比較相似的,本文就為大家簡要地介紹下Kafka的NIO網路通訊模型

前面寫的兩篇RocketMQ原始碼研究筆記系列:

(1) 訊息中介軟體—RocketMQ的RPC通訊(一)

(2) 訊息中介軟體—RocketMQ的RPC通訊(二)

基本上已經較為詳細地將RocketMQ這款分散式訊息佇列的RPC通訊部分的協議格式、訊息編解碼、通訊方式(同步/非同步/單向)、訊息收發流程和Netty的Reactor多執行緒分離處理架構講了一遍。同時,聯想業界大名鼎鼎的另一款開源分散式訊息佇列—Kafka,具備高吞吐量和高併發的特性,其網路通訊層是如何做到訊息的高效傳輸的呢?為了解開自己心中的疑慮,就查閱了Kafka的Network通訊模組的原始碼,乘機會寫本篇文章。

本文主要通過對Kafka原始碼的分析來簡述其Reactor的多執行緒網路通訊模型和總體框架結構,同時簡要介紹Kafka網路通訊層的設計與具體實現。

一、Kafka網路通訊模型的整體框架概述

Kafka的網路通訊模型是基於NIO的Reactor多執行緒模型來設計的。這裡先引用Kafka原始碼中註釋的一段話:

An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing. 

相信大家看了上面的這段引文註釋後,大致可以瞭解到Kafka的網路通訊層模型,主要採用了 1(1個Acceptor執行緒)+N(N個Processor執行緒)+M(M個業務處理執行緒) 。下面的表格簡要的列舉了下(這裡先簡單的看下後面還會詳細說明):

執行緒數執行緒名執行緒具體說明1kafka-socket-acceptor_%xAcceptor執行緒,負責監聽Client端發起的請求Nkafka-network-thread_%dProcessor執行緒,負責對Socket進行讀寫Mkafka-request-handler-_%dWorker執行緒,處理具體的業務邏輯並生成Response返回

Kafka網路通訊層的完整框架圖如下圖所示:

Kafka訊息佇列的通訊層模型—1+N+M模型.png

剛開始看到上面的這個框架圖可能會有一些不太理解,並不要緊,這裡可以先對Kafka的網路通訊層框架結構有一個大致瞭解。本文後面會結合Kafka的部分重要原始碼來詳細闡述上面的過程。這裡可以簡單總結一下其網路通訊模型中的幾個重要概念:

(1), Acceptor :1個接收執行緒,負責監聽新的連線請求,同時註冊OP_ACCEPT 事件,將新的連線按照 "round robin" 方式交給對應的 Processor 執行緒處理;

(2), Processor :N個處理器執行緒,其中每個 Processor 都有自己的 selector,它會向 Acceptor 分配的 SocketChannel 註冊相應的 OP_READ 事件,N 的大小由 “num.networker.threads” 決定;

(3), KafkaRequestHandler :M個請求處理執行緒,包含線上程池—KafkaRequestHandlerPool內部,從RequestChannel的全域性請求佇列—requestQueue中獲取請求資料並交給KafkaApis處理,M的大小由 “num.io.threads” 決定;

(4), RequestChannel :其為Kafka服務端的請求通道,該資料結構中包含了一個全域性的請求佇列 requestQueue和多個與Processor處理器相對應的響應佇列responseQueue,提供給Processor與請求處理執行緒KafkaRequestHandler和KafkaApis交換資料的地方。

(5), NetworkClient :其底層是對 Java NIO 進行相應的封裝,位於Kafka的網路介面層。Kafka訊息生產者物件—KafkaProducer的send方法主要呼叫NetworkClient完成訊息傳送;

(6), SocketServer :其是一個NIO的服務,它同時啟動一個Acceptor接收執行緒和多個Processor處理器執行緒。提供了一種典型的Reactor多執行緒模式,將接收客戶端請求和處理請求相分離;

(7), KafkaServer :代表了一個Kafka Broker的例項;其startup方法為例項啟動的入口;

(8), KafkaApis :Kafka的業務邏輯處理Api,負責處理不同型別的請求;比如 “傳送訊息”、 “獲取訊息偏移量—offset” 和 “處理心跳請求” 等;

二、Kafka網路通訊層的設計與具體實現

這一節將結合Kafka網路通訊層的原始碼來分析其設計與實現,這裡主要詳細介紹網路通訊層的幾個重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的原始碼部分均基於Kafka的0.11.0版本。

1、SocketServer

SocketServer是接收客戶端Socket請求連線、處理請求並返回處理結果的核心類,Acceptor及Processor的初始化、處理邏輯都是在這裡實現的。在KafkaServer例項啟動時會呼叫其startup的初始化方法,會初始化1個 Acceptor和N個Processor執行緒(每個EndPoint都會初始化,一般來說一個Server只會設定一個埠),其實現如下:

def startup() { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId var processorBeginIndex = 0 // 一個broker一般只設置一個埠 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads //N 個 processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool) //1個 Acceptor val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } } 

2、Acceptor

Acceptor是一個繼承自抽象類AbstractServerThread的執行緒類。Acceptor的主要任務是監聽並且接收客戶端的請求,同時建立資料傳輸通道—SocketChannel,然後以輪詢的方式交給一個後端的Processor執行緒處理(具體的方式是新增socketChannel至併發佇列並喚醒Processor執行緒處理)。

在該執行緒類中主要可以關注以下兩個重要的變數:

(1), nioSelector :通過NSelector.open()方法建立的變數,封裝了JAVA NIO Selector的相關操作;

(2), serverChannel :用於監聽埠的服務端Socket套接字物件;

下面來看下Acceptor主要的run方法的原始碼:

def run() { //首先註冊OP_ACCEPT事件 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 //以輪詢方式查詢並等待關注的事件發生 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) //如果事件發生則呼叫accept方法對OP_ACCEPT事件處理 accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") //輪詢演算法 // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } //程式碼省略 } def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) } catch { //省略部分程式碼 } } def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup() } 

在上面原始碼中可以看到,Acceptor執行緒啟動後,首先會向用於監聽埠的服務端套接字物件—ServerSocketChannel上註冊OP_ACCEPT 事件。然後以輪詢的方式等待所關注的事件發生。如果該事件發生,則呼叫accept()方法對OP_ACCEPT事件進行處理。這裡,Processor是通過 round robin 方法選擇的,這樣可以保證後面多個Processor執行緒的負載基本均勻。

Acceptor的accept()方法的作用主要如下:

(1)通過SelectionKey取得與之對應的serverSocketChannel例項,並呼叫它的accept()方法與客戶端建立連線;

(2)呼叫connectionQuotas.inc()方法增加連線統計計數;並同時設定第(1)步中建立返回的socketChannel屬性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)

(3)將socketChannel交給processor.accept()方法進行處理。這裡主要是將socketChannel加入Processor處理器的併發佇列newConnections佇列中,然後喚醒Processor執行緒從佇列中獲取socketChannel並處理。其中,newConnections會被Acceptor執行緒和Processor執行緒併發訪問操作,所以newConnections是ConcurrentLinkedQueue佇列(一個基於連結節點的無界執行緒安全佇列)

3、Processor

Processor同Acceptor一樣,也是一個執行緒類,繼承了抽象類AbstractServerThread。其主要是從客戶端的請求中讀取資料和將KafkaRequestHandler處理完響應結果返回給客戶端。在該執行緒類中主要關注以下幾個重要的變數:

(1), newConnections :在上面的 Acceptor 一節中已經提到過,它是一種ConcurrentLinkedQueue[SocketChannel]型別的佇列,用於儲存新連線交由Processor處理的socketChannel;

(2), inflightResponses :是一個Map[String, RequestChannel.Response]型別的集合,用於記錄尚未傳送的響應;

(3), selector :是一個型別為KSelector變數,用於管理網路連線;

下面先給出Processor處理器執行緒run方法執行的流程圖:

Kafk_Processor執行緒的處理流程圖.png

從上面的流程圖中能夠可以看出Processor處理器執行緒在其主流程中主要完成了這樣子幾步操作:

(1), 處理newConnections佇列中的socketChannel 。遍歷取出佇列中的每個socketChannel並將其在selector上註冊OP_READ事件;

(2), 處理RequestChannel中與當前Processor對應響應佇列中的Response 。在這一步中會根據responseAction的型別(NoOpAction/SendAction/CloseConnectionAction)進行判斷,若為“NoOpAction”,表示該連線對應的請求無需響應;若為“SendAction”,表示該Response需要傳送給客戶端,則會通過“selector.send”註冊OP_WRITE事件,並且將該Response從responseQueue響應佇列中移至inflightResponses集合中;“CloseConnectionAction”,表示該連線是要關閉的;

(3), 呼叫selector.poll()方法進行處理 。該方法底層即為呼叫nioSelector.select()方法進行處理。

(4), 處理已接受完成的資料包佇列—completedReceives 。在processCompletedReceives方法中呼叫“requestChannel.sendRequest”方法將請求Request新增至requestChannel的全域性請求佇列—requestQueue中,等待KafkaRequestHandler來處理。同時,呼叫“selector.mute”方法取消與該請求對應的連線通道上的OP_READ事件;

(5), 處理已傳送完的佇列—completedSends 。當已經完成將response傳送給客戶端,則將其從inflightResponses移除,同時通過呼叫“selector.unmute”方法為對應的連線通道重新註冊OP_READ事件;

(6), 處理斷開連線的佇列 。將該response從inflightResponses集合中移除,同時將connectionQuotas統計計數減1;

4、RequestChannel

在Kafka的網路通訊層中,RequestChannel為Processor處理器執行緒與KafkaRequestHandler執行緒之間的資料交換提供了一個數據緩衝區,是通訊過程中Request和Response快取的地方。因此,其作用就是在通訊中起到了一個數據緩衝佇列的作用。Processor執行緒將讀取到的請求新增至RequestChannel的全域性請求佇列—requestQueue中;KafkaRequestHandler執行緒從請求佇列中獲取並處理,處理完以後將Response新增至RequestChannel的響應佇列—responseQueue中,並通過responseListeners喚醒對應的Processor執行緒,最後Processor執行緒從響應佇列中取出後傳送至客戶端。

5、KafkaRequestHandler

KafkaRequestHandler也是一種執行緒類,在KafkaServer例項啟動時候會例項化一個執行緒池—KafkaRequestHandlerPool物件(包含了若干個KafkaRequestHandler執行緒),這些執行緒以守護執行緒的方式在後臺執行。在KafkaRequestHandler的run方法中會迴圈地從RequestChannel中阻塞式讀取request,讀取後再交由KafkaApis來具體處理。

6、KafkaApis

KafkaApis是用於處理對通訊網路傳輸過來的業務訊息請求的中心轉發元件。該元件反映出Kafka Broker Server可以提供哪些服務。

三、總結

仔細閱讀Kafka的NIO網路通訊層的原始碼過程中還是可以收穫不少關於NIO網路通訊模組的關鍵技術。Apache的任何一款開源中介軟體都有其設計獨到之處,值得借鑑和學習。對於任何一位使用Kafka這款分散式訊息佇列的同學來說,如果能夠在一定實踐的基礎上,再通過閱讀其原始碼能起到更為深入理解的效果,對於大規模Kafka叢集的效能調優和問題定位都大有裨益。

對於剛接觸Kafka的同學來說,想要自己掌握其NIO網路通訊層模型的關鍵設計,還需要不斷地使用本地環境進行debug除錯和閱讀原始碼反覆思考。

歡迎工作一到五年的Java工程師朋友們加入Java高階交流:698581634。群內提供免費的Java架構學習資料(有Spring,MyBatis,Netty原始碼分析,高併發、高效能、分散式、微服務架構的原理,JVM效能優化等...)這些成為架構師必備的知識體系。合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!