kafka生產者網絡層總結
阿新 • • 發佈:2018-05-01
pre sre ring tsp 檢查 remove complete ava article
1 層次結構
負責進行網絡IO請求的是NetworkClient,主要層次結構如下
ClusterConnectionStates報存了每個節點的狀態,以node為key,以node的狀態為value;inFlightRequets中保存了每個節點已經發送的請求,但是還沒有返回的請求,以node為key,以List<ClientRequest>為value。inFlightRequets從名字也可以看出,表示“正在空中飛”的請求。
2 如何保證每次只發送一個請求
sender線程啟動後,如果RecordBatch中有消息,會將消息按照所在節點重新排列,每個節點會創建一個ClientRequest,用來發送,每個節點每次只能發送一個ClientRequest,如下
KafkaChannel#setSend(..)
public void setSend(Send send) { if (this.send != null) // 如果已經有send,會拋出異常 throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
那麽kafka是如何保證避免setSend的時候KafkaChannel中已經有send了呢,這個關鍵就是在sender線程中會調用NetworkClient#ready(..),會將沒有ready的節點去除掉,從而不會在該節點上setSend:
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) { // 關鍵
iter.remove();
notReadyTimeout = Math.min (notReadyTimeout, this.client.connectionDelay(node, now));
}
}
3 NetworkClient#ready(..)
NetworkClient#ready(..)檢查節點是否準備好,從而決定是否可以將消息封裝成ClientRequest放到KafkaChannel上。
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
if (isReady(node, now)) // 關鍵
return true;
if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);
return false;
}
我們來分析下isReady
public boolean isReady(Node node, long now) {
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}
isReady主要兩個條件,一個是判斷metadata是否到了更新的時候了,如果metadata需要更新,那麽就不發送本次請求,也就是metadata更新優先級高。第二個是判斷這個節點是否canSendRequest。
private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node)
&& inFlightRequests.canSendMore(node); // 重點
}
inFlightRequests報保存的是“正在空中飛”的請求
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
滿足以下幾個條件,表示可以繼續send
- queue是空,即該節點沒有“正在空中飛”的request
- queue不為空。queue中排在最開頭的request已經completed 並且queue的大小小於允許的最大值。如何理解呢?queue是一個雙端隊列,每次set的時候都會在queue的頭部插入,所以queue中第一個就是正在發送的,或者說是KafkaChannel中的send。只要當send發送到網絡中的時候才可以繼續發送。這就保證了前面說的“如何保證每次只發送一個請求”。
4 參考
- https://blog.csdn.net/chunlongyu/article/details/52651960
kafka生產者網絡層總結