如何確定Kafka的分割槽數、key和consumer執行緒數 【原創】如何確定Kafka的分割槽數、key和consumer執行緒數
阿新 • • 發佈:2019-01-11
【原創】如何確定Kafka的分割槽數、key和consumer執行緒數
在Kafak中國社群的qq群中,這個問題被提及的比例是相當高的,這也是Kafka使用者最常碰到的問題之一。本文結合Kafka原始碼試圖對該問題相關的因素進行探討。希望對大家有所幫助。 怎麼確定分割槽數? “我應該選擇幾個分割槽?”——如果你在Kafka中國社群的群裡,這樣的問題你會經常碰到的。不過有些遺憾的是,我們似乎並沒有很權威的答案能夠解答這樣的問題。其實這也不奇怪,畢竟這樣的問題通常都是沒有固定答案的。Kafka官網上標榜自己是"high-throughput distributed messaging system",即一個高吞吐量的分散式訊息引擎。那麼怎麼達到高吞吐量呢?Kafka在底層摒棄了Java堆快取機制,採用了作業系統級別的頁快取,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO效能。但是,這只是一個方面,畢竟單機優化的能力是有上限的。如何通過水平擴充套件甚至是線性擴充套件來進一步提升吞吐量呢? Kafka就是使用了分割槽(partition),通過將topic的訊息打散到多個分割槽並分佈儲存在不同的broker上實現了訊息處理(不管是producer還是consumer)的高吞吐量。 Kafka的生產者和消費者都可以多執行緒地並行操作,而每個執行緒處理的是一個分割槽的資料。因此分割槽實際上是調優Kafka並行度的最小單元。對於producer而言,它實際上是用多個執行緒併發地向不同分割槽所在的broker發起Socket連線同時給這些分割槽傳送訊息;而consumer呢,同一個消費組內的所有consumer執行緒都被指定topic的某一個分割槽進行消費(具體如何確定consumer執行緒數目我們後面會詳細說明)。所以說,如果一個topic分割槽越多,理論上整個叢集所能達到的吞吐量就越大。 但分割槽是否越多越好呢?顯然也不是,因為每個分割槽都有自己的開銷: 一、客戶端/伺服器端需要使用的記憶體就越多 先說說客戶端的情況。Kafka 0.8.2之後推出了Java版的全新的producer,這個producer有個引數batch.size,預設是16KB。它會為每個分割槽快取訊息,一旦滿了就打包將訊息批量發出。看上去這是個能夠提升效能的設計。不過很顯然,因為這個引數是分割槽級別的,如果分割槽數越多,這部分快取所需的記憶體佔用也會更多。假設你有10000個分割槽,按照預設設定,這部分快取需要佔用約157MB的記憶體。而consumer端呢?我們拋開獲取資料所需的記憶體不說,只說執行緒的開銷。如果還是假設有10000個分割槽,同時consumer執行緒數要匹配分割槽數(大部分情況下是最佳的消費吞吐量配置)的話,那麼在consumer client就要建立10000個執行緒,也需要建立大約10000個Socket去獲取分割槽資料。這裡面的執行緒切換的開銷本身已經不容小覷了。 伺服器端的開銷也不小,如果閱讀Kafka原始碼的話可以發現,伺服器端的很多元件都在記憶體中維護了分割槽級別的快取,比如controller,FetcherManager等,因此分割槽數越多,這種快取的成本越久越大。 二、檔案控制代碼的開銷 每個分割槽在底層檔案系統都有屬於自己的一個目錄。該目錄下通常會有兩個檔案: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都儲存這兩個檔案控制代碼(file handler)。很明顯,如果分割槽數越多,所需要保持開啟狀態的檔案控制代碼數也就越多,最終可能會突破你的ulimit -n的限制。 三、降低高可用性 Kafka通過副本(replica)機制來保證高可用。具體做法就是為每個分割槽儲存若干個副本(replica_factor指定副本數)。每個副本儲存在不同的broker上。期中的一個副本充當leader 副本,負責處理producer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間視窗,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分割槽,10個broker,也就是說平均每個broker上有1000個分割槽。此時這個broker掛掉了,那麼zookeeper和controller需要立即對這1000個分割槽進行leader選舉。比起很少的分割槽leader選舉而言,這必然要花更長的時間,並且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。 說了這麼多“廢話”,很多人肯定已經不耐煩了。那你說到底要怎麼確定分割槽數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬體、軟體、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似於,官網說每秒能到10MB,為什麼我的producer每秒才1MB? —— 且不說硬體條件,最後發現他使用的訊息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分割槽數:建立一個只有1個分割槽的topic,然後測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麼分割槽數 = Tt / max(Tp, Tc) Tp表示producer的吞吐量。測試producer通常是很容易的,因為它的邏輯非常簡單,就是直接傳送訊息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關係更大, 因為Tc的值取決於你拿到訊息之後執行什麼操作,因此Tc的測試通常也要麻煩一些。 另外,Kafka並不能真正地做到線性擴充套件(其實任何系統都不能),所以你在規劃你的分割槽數的時候最好多規劃一下,這樣未來擴充套件時候也更加方便。 訊息-分割槽的分配def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions }
這就保證了相同key的訊息一定會被路由到相同的分割槽。如果你沒有指定key,那麼Kafka是如何確定這條訊息去往哪個分割槽的呢?
if(key == null) { // 如果沒有指定key val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有快取的現成的分割槽Id id match { case Some(partitionId) => partitionId // 如果有的話直接使用這個分割槽Id就好了 case None => // 如果沒有的話, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分割槽的leader所在的broker if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機挑一個 val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 更新快取以備下一次直接使用 partitionId } }
可以看出,Kafka幾乎就是隨機找一個分割槽傳送無key的訊息,然後把這個分割槽號加入到快取中以備後面直接使用——當然了,Kafka本身也會清空該快取(預設每10分鐘或每次請求topic元資料時)
如何設定consumer執行緒數 我個人的觀點,如果你的分割槽數是N,那麼最好執行緒數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的執行緒不會被分配到任何分割槽。讓我們來看看具體Kafka是如何分配的。 topic下的一個分割槽只能被同一個consumer group下的一個consumer執行緒來消費,但反之並不成立,即一個consumer執行緒可以消費多個分割槽的資料,比如Kafka提供的ConsoleConsumer,預設就只是一個執行緒來消費所有分割槽的資料。——其實ConsoleConsumer可以使用萬用字元的功能實現同時消費多個topic資料,但這和本文無關。 再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用於consumer程式呼叫實現資料的消費。其底層維護了一個阻塞佇列,所以在沒有新訊息到來時,consumer是處於阻塞狀態的,表現出來的狀態就是consumer程式一直在等待新訊息的到來。——你當然可以配置成帶超時的consumer,具體參看引數consumer.timeout.ms的用法。 下面說說Kafka提供的兩種分配策略: range和roundrobin,由引數partition.assignment.strategy指定,預設是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分割槽,P0 ~ P9,consumer執行緒數是3, C0 ~ C2,那麼每個執行緒都分配哪些分割槽呢? C0 消費分割槽 0, 1, 2, 3 C1 消費分割槽 4, 5, 6 C2 消費分割槽 7, 8, 9 具體演算法就是:val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每個consumer至少保證消費的分割槽數 val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分割槽需要單獨分配給開頭的執行緒們 ... for (consumerThreadId <- consumerThreadIdSet) { // 對於每一個consumer執行緒 val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出該執行緒在所有執行緒中的位置,介於[0, n-1] assert(myConsumerPosition >= 0) // startPart 就是這個執行緒要消費的起始分割槽數 val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) // nParts 就是這個執行緒總共要消費多少個分割槽 val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) ... }
針對於這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個執行緒至少保證3個分割槽,還剩下1個分割槽需要單獨分配給開頭的若干個執行緒。這就是為什麼C0消費4個分割槽,後面的2個執行緒每個消費3個分割槽,具體過程詳見下面的Debug截圖資訊:
ctx.myTopicThreadIds nPartsPerConsumer = 10 / 3 = 3 nConsumersWithExtraPart = 10 % 3 = 1 第一次: myConsumerPosition = 1 startPart = 1 * 3 + min(1, 1) = 4 ---也就是從分割槽4開始讀 nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 讀取3個分割槽, 即4,5,6 第二次: myConsumerPosition = 0 startPart = 3 * 0 + min(1, 0) =0 --- 從分割槽0開始讀 nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 讀取4個分割槽,即0,1,2,3 第三次: myConsumerPosition = 2 startPart = 3 * 2 + min(2, 1) = 7 --- 從分割槽7開始讀 nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 讀取3個分割槽,即7, 8, 9 至此10個分割槽都已經分配完畢 說到這裡,經常有個需求就是我想讓某個consumer執行緒消費指定的分割槽而不消費其他的分割槽。坦率來說,目前Kafka並沒有提供自定義分配策略。做到這點很難,但仔細想一想,也許我們期望Kafka做的事情太多了,畢竟它只是個訊息引擎,在Kafka中加入訊息消費的邏輯也許並不是Kafka該做的事情。 在Kafak中國社群的qq群中,這個問題被提及的比例是相當高的,這也是Kafka使用者最常碰到的問題之一。本文結合Kafka原始碼試圖對該問題相關的因素進行探討。希望對大家有所幫助。 怎麼確定分割槽數? “我應該選擇幾個分割槽?”——如果你在Kafka中國社群的群裡,這樣的問題你會經常碰到的。不過有些遺憾的是,我們似乎並沒有很權威的答案能夠解答這樣的問題。其實這也不奇怪,畢竟這樣的問題通常都是沒有固定答案的。Kafka官網上標榜自己是"high-throughput distributed messaging system",即一個高吞吐量的分散式訊息引擎。那麼怎麼達到高吞吐量呢?Kafka在底層摒棄了Java堆快取機制,採用了作業系統級別的頁快取,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO效能。但是,這只是一個方面,畢竟單機優化的能力是有上限的。如何通過水平擴充套件甚至是線性擴充套件來進一步提升吞吐量呢? Kafka就是使用了分割槽(partition),通過將topic的訊息打散到多個分割槽並分佈儲存在不同的broker上實現了訊息處理(不管是producer還是consumer)的高吞吐量。 Kafka的生產者和消費者都可以多執行緒地並行操作,而每個執行緒處理的是一個分割槽的資料。因此分割槽實際上是調優Kafka並行度的最小單元。對於producer而言,它實際上是用多個執行緒併發地向不同分割槽所在的broker發起Socket連線同時給這些分割槽傳送訊息;而consumer呢,同一個消費組內的所有consumer執行緒都被指定topic的某一個分割槽進行消費(具體如何確定consumer執行緒數目我們後面會詳細說明)。所以說,如果一個topic分割槽越多,理論上整個叢集所能達到的吞吐量就越大。 但分割槽是否越多越好呢?顯然也不是,因為每個分割槽都有自己的開銷: 一、客戶端/伺服器端需要使用的記憶體就越多 先說說客戶端的情況。Kafka 0.8.2之後推出了Java版的全新的producer,這個producer有個引數batch.size,預設是16KB。它會為每個分割槽快取訊息,一旦滿了就打包將訊息批量發出。看上去這是個能夠提升效能的設計。不過很顯然,因為這個引數是分割槽級別的,如果分割槽數越多,這部分快取所需的記憶體佔用也會更多。假設你有10000個分割槽,按照預設設定,這部分快取需要佔用約157MB的記憶體。而consumer端呢?我們拋開獲取資料所需的記憶體不說,只說執行緒的開銷。如果還是假設有10000個分割槽,同時consumer執行緒數要匹配分割槽數(大部分情況下是最佳的消費吞吐量配置)的話,那麼在consumer client就要建立10000個執行緒,也需要建立大約10000個Socket去獲取分割槽資料。這裡面的執行緒切換的開銷本身已經不容小覷了。 伺服器端的開銷也不小,如果閱讀Kafka原始碼的話可以發現,伺服器端的很多元件都在記憶體中維護了分割槽級別的快取,比如controller,FetcherManager等,因此分割槽數越多,這種快取的成本越久越大。 二、檔案控制代碼的開銷 每個分割槽在底層檔案系統都有屬於自己的一個目錄。該目錄下通常會有兩個檔案: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都儲存這兩個檔案控制代碼(file handler)。很明顯,如果分割槽數越多,所需要保持開啟狀態的檔案控制代碼數也就越多,最終可能會突破你的ulimit -n的限制。 三、降低高可用性 Kafka通過副本(replica)機制來保證高可用。具體做法就是為每個分割槽儲存若干個副本(replica_factor指定副本數)。每個副本儲存在不同的broker上。期中的一個副本充當leader 副本,負責處理producer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間視窗,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分割槽,10個broker,也就是說平均每個broker上有1000個分割槽。此時這個broker掛掉了,那麼zookeeper和controller需要立即對這1000個分割槽進行leader選舉。比起很少的分割槽leader選舉而言,這必然要花更長的時間,並且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。 說了這麼多“廢話”,很多人肯定已經不耐煩了。那你說到底要怎麼確定分割槽數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬體、軟體、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似於,官網說每秒能到10MB,為什麼我的producer每秒才1MB? —— 且不說硬體條件,最後發現他使用的訊息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分割槽數:建立一個只有1個分割槽的topic,然後測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麼分割槽數 = Tt / max(Tp, Tc) Tp表示producer的吞吐量。測試producer通常是很容易的,因為它的邏輯非常簡單,就是直接傳送訊息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關係更大, 因為Tc的值取決於你拿到訊息之後執行什麼操作,因此Tc的測試通常也要麻煩一些。 另外,Kafka並不能真正地做到線性擴充套件(其實任何系統都不能),所以你在規劃你的分割槽數的時候最好多規劃一下,這樣未來擴充套件時候也更加方便。 訊息-分割槽的分配 預設情況下,Kafka根據傳遞訊息的key來進行分割槽的分配,即hash(key) % numPartitions,如下圖所示:def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions }
這就保證了相同key的訊息一定會被路由到相同的分割槽。如果你沒有指定key,那麼Kafka是如何確定這條訊息去往哪個分割槽的呢?
if(key == null) { // 如果沒有指定key val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有快取的現成的分割槽Id id match { case Some(partitionId) => partitionId // 如果有的話直接使用這個分割槽Id就好了 case None => // 如果沒有的話, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分割槽的leader所在的broker if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機挑一個 val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 更新快取以備下一次直接使用 partitionId } }
可以看出,Kafka幾乎就是隨機找一個分割槽傳送無key的訊息,然後把這個分割槽號加入到快取中以備後面直接使用——當然了,Kafka本身也會清空該快取(預設每10分鐘或每次請求topic元資料時)
如何設定consumer執行緒數 我個人的觀點,如果你的分割槽數是N,那麼最好執行緒數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的執行緒不會被分配到任何分割槽。讓我們來看看具體Kafka是如何分配的。 topic下的一個分割槽只能被同一個consumer group下的一個consumer執行緒來消費,但反之並不成立,即一個consumer執行緒可以消費多個分割槽的資料,比如Kafka提供的ConsoleConsumer,預設就只是一個執行緒來消費所有分割槽的資料。——其實ConsoleConsumer可以使用萬用字元的功能實現同時消費多個topic資料,但這和本文無關。 再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用於consumer程式呼叫實現資料的消費。其底層維護了一個阻塞佇列,所以在沒有新訊息到來時,consumer是處於阻塞狀態的,表現出來的狀態就是consumer程式一直在等待新訊息的到來。——你當然可以配置成帶超時的consumer,具體參看引數consumer.timeout.ms的用法。 下面說說Kafka提供的兩種分配策略: range和roundrobin,由引數partition.assignment.strategy指定,預設是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分割槽,P0 ~ P9,consumer執行緒數是3, C0 ~ C2,那麼每個執行緒都分配哪些分割槽呢? C0 消費分割槽 0, 1, 2, 3 C1 消費分割槽 4, 5, 6 C2 消費分割槽 7, 8, 9 具體演算法就是:val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每個consumer至少保證消費的分割槽數 val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分割槽需要單獨分配給開頭的執行緒們 ... for (consumerThreadId <- consumerThreadIdSet) { // 對於每一個consumer執行緒 val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出該執行緒在所有執行緒中的位置,介於[0, n-1] assert(myConsumerPosition >= 0) // startPart 就是這個執行緒要消費的起始分割槽數 val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) // nParts 就是這個執行緒總共要消費多少個分割槽 val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) ... }
針對於這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個執行緒至少保證3個分割槽,還剩下1個分割槽需要單獨分配給開頭的若干個執行緒。這就是為什麼C0消費4個分割槽,後面的2個執行緒每個消費3個分割槽,具體過程詳見下面的Debug截圖資訊:
ctx.myTopicThreadIds nPartsPerConsumer = 10 / 3 = 3 nConsumersWithExtraPart = 10 % 3 = 1 第一次: myConsumerPosition = 1 startPart = 1 * 3 + min(1, 1) = 4 ---也就是從分割槽4開始讀 nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 讀取3個分割槽, 即4,5,6 第二次: myConsumerPosition = 0 startPart = 3 * 0 + min(1, 0) =0 --- 從分割槽0開始讀 nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 讀取4個分割槽,即0,1,2,3 第三次: myConsumerPosition = 2 startPart = 3 * 2 + min(2, 1) = 7 --- 從分割槽7開始讀 nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 讀取3個分割槽,即7, 8, 9 至此10個分割槽都已經分配完畢 說到這裡,經常有個需求就是我想讓某個consumer執行緒消費指定的分割槽而不消費其他的分割槽。坦率來說,目前Kafka並沒有提供自定義分配策略。做到這點很難,但仔細想一想,也許我們期望Kafka做的事情太多了,畢竟它只是個訊息引擎,在Kafka中加入訊息消費的邏輯也許並不是Kafka該做的事情。