Kafka網路模型和通訊流程剖析
1.概述
最近有同學在學習Kafka的網路通訊這塊內容時遇到一些疑問,關於網路模型和通訊流程的相關內容,這裡筆者將通過這篇部落格為大家來剖析一下這部分內容。
2.內容
Kafka系統作為一個Message Queue,涉及到的網路通訊主要包含以下兩個方面:
- Pull:Consumer從訊息佇列中拉取訊息資料;
- Push:Producer往訊息佇列中推送訊息資料。
要實現高效能的網路通訊,可以使用更加底層的TCP協議或者UDP協議來實現。Kafka在Producer、Broker、Consumer之間設計了一套基於TCP層的通訊協議,這套協議完全是為了Kafka系統自身需求而定製實現的。
提示: 這裡需要注意的是,由於UDP協議是一種不可靠的傳輸協議,所以Kafka系統採用TCP協議作為服務間的通訊協議。
2.1 基本資料型別
通訊協議中的基本資料型別分為以下幾種:
- 定長資料型別:例如,int8、int16、int32和、int64,對應到Java語言中,分別是byte、short、int和long
- 可變資料型別:例如,Java語言中Map、List等
- 陣列:例如,Java語言中的int[]、String[]等
2.2 通訊模型
Kafka系統採用的是Reactor多執行緒模型,即通過一個Acceptor執行緒處理所有的新連線,通過多個Processor執行緒對請求進行處理(比如解析協議、封裝請求、、轉發等)。
提示: Reactor是一種事件模型,可以將請求提交到一個或者多個服務程式中進行處理。 當收到Client的請求後,Server處理程式使用多路分發策略,由一個非阻塞的執行緒來接收所有的請求,然後將這些請求轉發到對應的工作執行緒中進行處理。
之後,在Kafka的版本迭代中,新增了一個Handler模組,它通過指定的執行緒數對請求進行處理。Handler和Processor之間通過一個Block Queue進行連線。如下圖所示:
這裡 Acceptor是一個繼承於AbstractServerThread的執行緒類,Acceptor的主要目的是監聽並且接收Client的請求,同時,建立資料傳輸通道(SocketChannel),然後通過輪詢的方式交給一個Processor處理。其核心程式碼在Acceptor的run方法中,程式碼如下:
def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(nioSelector.close()) shutdownComplete() } }
這裡還有一個塊通道(BlockingChannel),用於連線Processor和Handler,其程式碼如下所示:
class BlockingChannel( val host: String, val port: Int, val readBufferSize: Int, val writeBufferSize: Int, val readTimeoutMs: Int ) extends Logging { private var connected = false private var channel: SocketChannel = null private var readChannel: ReadableByteChannel = null private var writeChannel: GatheringByteChannel = null private val lock = new Object() private val connectTimeoutMs = readTimeoutMs private var connectionId: String = "" def connect() = lock synchronized { if(!connected) { try { channel = SocketChannel.open() if(readBufferSize > 0) channel.socket.setReceiveBufferSize(readBufferSize) if(writeBufferSize > 0) channel.socket.setSendBufferSize(writeBufferSize) channel.configureBlocking(true) channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work readChannel = Channels.newChannel(channel.socket().getInputStream) connected = true val localHost = channel.socket.getLocalAddress.getHostAddress val localPort = channel.socket.getLocalPort val remoteHost = channel.socket.getInetAddress.getHostAddress val remotePort = channel.socket.getPort connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort // settings may not match what we requested above val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d." debug(msg.format(channel.socket.getSoTimeout, readTimeoutMs, channel.socket.getReceiveBufferSize, readBufferSize, channel.socket.getSendBufferSize, writeBufferSize, connectTimeoutMs)) } catch { case _: Throwable => disconnect() } } } def disconnect() = lock synchronized { if(channel != null) { swallow(channel.close()) swallow(channel.socket.close()) channel = null writeChannel = null } // closing the main socket channel *should* close the read channel // but let's do it to be sure. if(readChannel != null) { swallow(readChannel.close()) readChannel = null } connected = false } def isConnected = connected def send(request: RequestOrResponse): Long = { if(!connected) throw new ClosedChannelException() val send = new RequestOrResponseSend(connectionId, request) send.writeCompletely(writeChannel) } def receive(): NetworkReceive = { if(!connected) throw new ClosedChannelException() val response = readCompletely(readChannel) response.payload().rewind() response } private def readCompletely(channel: ReadableByteChannel): NetworkReceive = { val response = new NetworkReceive while (!response.complete()) response.readFromReadableChannel(channel) response } }
3.通訊過程
Kafka系統的通訊框架也是經過了不同的版本迭代的。例如,在Kafka老的版本中,以NIO作為網路通訊的基礎,通過將多個Socket連線註冊到一個Selector上進行監聽,只用一個執行緒就能管理多個連線,這極大的節省了多執行緒的資源開銷。
在Kafka之後的新版本中,依然以NIO作為網路通訊的基礎,也使用了Reactor多執行緒模型,不同的是,新版本將具體的業務處理模組(Handler模組)獨立出去了,並用單獨的執行緒池進行控制。如下圖所示:
通過上圖,我們可以總結一下Kafka的通訊流程:
- Client向Server傳送請求時,Acceptor負責接收TCP請求,連線成功後傳遞給Processor執行緒;
- Processor執行緒接收到新的連線後,將其註冊到自身的Selector中,並監聽READ事件
- 當Client在當前連線物件上寫入資料時,會觸發READ事件,根據TCP協議呼叫Handler進行處理
- Handler處理完成後,可能會有返回值給Client,並將Handler返回的結果繫結Response端進行傳送
通過總結和分析,我們可以知道Kafka新版中獨立Handler模組,用這樣以下幾點優勢:
- 能夠單獨指定Handler的執行緒數,便於調優和管理
- 防止一個過大的請求阻塞一個Processor執行緒
- Request、Handler、Response之間都是通過佇列來進行連線的,這樣它們彼此之間不存在耦合現象,對提升Kafka系統的效能很有幫助
這裡需要注意的是,在Kafka的網路通訊中,RequestChannel為Processor執行緒與Handler執行緒之間資料交換提供了一個緩衝區,是通訊中Request和Response快取的地方。因此,其作用就是在通訊中起到了一個數據緩衝佇列的作用。Processor執行緒將讀取到的請求新增至RequestChannel的全域性佇列(requestQueue)中,Handler執行緒從請求佇列中獲取並處理,處理完成後將Response新增至RequestChannel的響應佇列(responseQueues)中,通過responseListeners喚醒對應的Processor執行緒,最後Processor執行緒從響應佇列中取出後傳送到Client。實現程式碼如下:
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() newGauge( "RequestQueueSize", new Gauge[Int] { def value = requestQueue.size } ) newGauge("ResponseQueueSize", new Gauge[Int]{ def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()} }) for (i <- 0 until numProcessors) { newGauge("ResponseQueueSize", new Gauge[Int] { def value = responseQueues(i).size() }, Map("processor" -> i.toString) ) } /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) } /** Send a response back to the socket server to be sent over the network */ def sendResponse(response: RequestChannel.Response) { responseQueues(response.processor).put(response) for(onResponse <- responseListeners) onResponse(response.processor) } /** No operation to take for the request, need to read more over the network */ def noOperation(processor: Int, request: RequestChannel.Request) { responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction)) for(onResponse <- responseListeners) onResponse(processor) } /** Close the connection for the request */ def closeConnection(processor: Int, request: RequestChannel.Request) { responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction)) for(onResponse <- responseListeners) onResponse(processor) } /** Get the next request or block until specified time has elapsed */ def receiveRequest(timeout: Long): RequestChannel.Request = requestQueue.poll(timeout, TimeUnit.MILLISECONDS) /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.take() /** Get a response for the given processor if there is one */ def receiveResponse(processor: Int): RequestChannel.Response = { val response = responseQueues(processor).poll() if (response != null) response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds response } def addResponseListener(onResponse: Int => Unit) { responseListeners ::= onResponse } def shutdown() { requestQueue.clear() } }
4.總結
通過認真閱讀和分析Kafka的網路通訊層程式碼,可以收穫不少關於NIO的網路通訊知識。通過對Kafka的原始碼進行閱讀和學習,這對大規模Kafka叢集效能的調優和問題定位排查是很有幫助的。
5.結束語
這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大資料探勘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買博主的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學視訊。&n