跟我學Kafka之NIO通訊機制
很久沒有做技術方面的分享了,今天閒來有空寫一篇關於Kafka通訊方面的文章與大家共同學習。
一、Kafka通訊機制的整體結構
這個圖採用的就是我們之前提到的SEDA多執行緒模型,連結如下:http://www.jianshu.com/p/e184fdc0ade4
1、對於broker來說,客戶端連線數量有限,不會頻繁新建大量連線。因此一個Acceptor thread執行緒處理新建連線綽綽有餘。
2、Kafka高吐吞量,則要求broker接收和傳送資料必須快速,因此用proccssor thread執行緒池處理,並把讀取客戶端資料轉交給緩衝區,不會導致客戶端請求大量堆積。
3、Kafka磁碟操作比較頻繁會且有io阻塞或等待,IO Thread執行緒數量一般設定為proccssor thread num兩倍,可以根據執行環境需要進行調節。
二、SocketServer整體設計時序圖
說明:
Kafka SocketServer是基於Java NIO來開發的,採用了Reactor的模式,其中包含了1個Acceptor負責接受客戶端請求,N個Processor執行緒負責讀寫資料,M個Handler來處理業務邏輯。在Acceptor和Processor,Processor和Handler之間都有佇列來緩衝請求。
下面我們就針對以上整體設計思路分開講解各個不同部分的原始碼。
2.1 啟動初始化工作
def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for (i <- 0 until numProcessorThreads) {
processors(i) = new Processor(i,
time,
maxRequestSize,
aggregateIdleMeter,
newMeter("IdlePercent", "percent" , TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads,
requestChannel,
quotas,
connectionsMaxIdleMs)
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}
newGauge("ResponsesBeingSent", new Gauge[Int] {
def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
})
// register the processor threads for notification of responses
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
acceptor.awaitStartup
info("Started")
}
說明:
ConnectionQuotas物件負責管理連線數/IP, 建立一個Acceptor偵聽者執行緒,初始化N個Processor執行緒,processors是一個執行緒陣列,可以作為執行緒池使用,預設是三個,Acceptor執行緒和N個Processor執行緒中每個執行緒都獨立建立Selector.open()多路複用器,相關程式碼在下面:
val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue));
val serverChannel = openServerSocket(host, port);
範圍可以設定從1到Int的最大值。
2.2 Acceptor執行緒
def run() {
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
startupComplete()
var currentProcessor = 0
while(isRunning) {
val ready = selector.select(500)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(selector.close())
shutdownComplete()
}
2.1.1 註冊OP_ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
2.1.2 內部邏輯
此處採用的是同步非阻塞邏輯,每隔500MS輪詢一次,關於同步非阻塞的知識點在http://www.jianshu.com/p/e9c6690c0737
當有請求到來的時候採用輪詢的方式獲取一個Processor執行緒處理請求,程式碼如下:
currentProcessor = (currentProcessor + 1) % processors.length
之後將程式碼新增到newConnections佇列之後返回,程式碼如下:
def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup()}
//newConnections是一個執行緒安全的佇列,存放SocketChannel通道
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
2.3 kafka.net.Processor
override def run() {
startupComplete()
while(isRunning) {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
val startSelectTime = SystemTime.nanoseconds
val ready = selector.select(300)
currentTimeNanos = SystemTime.nanoseconds
val idleTime = currentTimeNanos - startSelectTime
idleMeter.mark(idleTime)
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
aggregateIdleMeter.mark(idleTime / totalProcessorThreads)
trace("Processor id " + id + " selection time = " + idleTime + " ns")
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isReadable)
read(key)
else if(key.isWritable)
write(key)
else if(!key.isValid)
close(key)
else
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
close(key)
} case e: InvalidRequestException => {
info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
close(key)
} case e: Throwable => {
error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
close(key)
}
}
}
}
maybeCloseOldestConnection
}
debug("Closing selector.")
closeAll()
swallowError(selector.close())
shutdownComplete()
}
先來重點看一下configureNewConnections這個方法:
private def configureNewConnections() {
while(newConnections.size() > 0) {
val channel = newConnections.poll()
debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
channel.register(selector, SelectionKey.OP_READ)
}
}
迴圈判斷NewConnections的大小,如果有值則彈出,並且註冊為OP_READ讀事件。
再回到主邏輯看一下read方法。
def read(key: SelectionKey) {
lruConnections.put(key, currentTimeNanos)
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
val address = socketChannel.socket.getRemoteSocketAddress();
trace(read + " bytes read from " + address)
if(read < 0) {
close(key)
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
requestChannel.sendRequest(req)
key.attach(null)
// explicitly reset interest ops to not READ, no need to wake up the selector just yet
key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
}
說明
1、把當前SelectionKey和事件迴圈時間放入LRU對映表中,將來檢查時回收連線資源。
2、建立BoundedByteBufferReceive物件,具體讀取操作由這個物件的readFrom方法負責進行,返回讀取的位元組大小。
- 如果讀取完成,則修改狀態為receive.complete,並通過requestChannel.sendRequest(req)將封裝好的Request物件放到RequestQueue佇列中。
- 如果沒有讀取完成,則讓selector繼續偵聽OP_READ事件。
2.4 kafka.server.KafkaRequestHandler
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = SystemTime.nanoseconds
req = requestChannel.receiveRequest(300)
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
說明
KafkaRequestHandler也是一個事件處理執行緒,不斷的迴圈讀取requestQueue佇列中的Request請求資料,其中超時時間設定為300MS,並將請求傳送到apis.handle方法中處理,並將請求響應結果放到responseQueue佇列中去。
程式碼如下:
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
說明如下:
引數 | 說明 | 對應方法 |
---|---|---|
RequestKeys.ProduceKey | producer請求 | ProducerRequest |
RequestKeys.FetchKey | consumer請求 | FetchRequest |
RequestKeys.OffsetsKey | topic的offset請求 | OffsetRequest |
RequestKeys.MetadataKey | topic元資料請求 | TopicMetadataRequest |
RequestKeys.LeaderAndIsrKey | leader和isr資訊更新請求 | LeaderAndIsrRequest |
RequestKeys.StopReplicaKey | 停止replica請求 | StopReplicaRequest |
RequestKeys.UpdateMetadataKey | 更新元資料請求 | UpdateMetadataRequest |
RequestKeys.ControlledShutdownKey | controlledShutdown請求 | ControlledShutdownRequest |
RequestKeys.OffsetCommitKey | commitOffset請求 | OffsetCommitRequest |
RequestKeys.OffsetFetchKey | consumer的offset請求 | OffsetFetchRequest |
2.5 Processor響應資料處理
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while(curr != null) {
val key = curr.request.requestKey.asInstanceOf[SelectionKey]
curr.responseAction match {
case RequestChannel.SendAction => {
key.interestOps(SelectionKey.OP_WRITE)
key.attach(curr)
}
}
curr = requestChannel.receiveResponse(id)
}
}
我們回到Processor執行緒類中,processNewRequest()方法是傳送請求,那麼會呼叫processNewResponses()來處理Handler提供給客戶端的Response,把requestChannel中responseQueue的Response取出來,註冊OP_WRITE事件,將資料返回給客戶端。
小程故事多
支付領域專家,關注分散式,大資料等技術個人技術部落格:flychao88.iteye.com