1. 程式人生 > >kafka效能引數和壓力測試揭祕

kafka效能引數和壓力測試揭祕

上一篇文章介紹了Kafka在設計上是如何來保證高時效、大吞吐量的,主要的內容集中在底層原理和架構上,屬於理論知識範疇。這次我們站在應用和運維的角度,聊一聊叢集到位後要怎麼才能最好的配置引數和進行測試效能。Kafka的配置詳盡且複雜,想要進行全面的效能調優需要掌握大量資訊,我也只是通過工作中的一些實戰經驗來篩選出對叢集效能影響最大的幾個要點,接下來要闡述的觀點也僅限於我所描述的環境下,請大家根據自己的環境適當取捨。
今天的文章分為兩大部分,第一部分介紹一下我總結的跟效能有關的一些引數、含義以及調優策略。第二部分會給出一些我自己實踐過的測試結果對照組,具體的數值和結果可能因場景、機器、環境而異,但是總體的思路和方法應該是一致的。

在正式進入主題之前,介紹一下本次測試所使用的機器配置:
  • 6臺物理機,其中三臺部署Broker,三臺專門用來launch request。
  • 每臺物理機:24 Processors,189G Memory,2G 單機頻寬。
  • 執行本次測試時為了能夠覆蓋到到一些“非常規”的用法,我把Broker的HeapSize設定到了30G。
相關引數介紹 在除錯和優化使用Java開發的系統時,第一步肯定繞不開對JVM的調優,Kafka自然也不例外,而JVM調優的重點則是在記憶體上。
其實Kafka服務本身並不需要很大記憶體,上篇文章也已經詳細介紹過Kafka依賴系統提供的PageCache來滿足效能上的要求,利用VisualJVM等工具可以很清晰的分析出Heap Space的佔用比例情況。本文中測試時設定30G記憶體的目的是支援更高的併發,高併發本身就必然會需要更多的記憶體來支援,同時高併發也意味著SocketBuffer等相關快取容量會成倍增長。實際使用中,調整記憶體大小的準則是留給系統儘可能多的空閒記憶體,Broker本身則是夠用就好。

說完了大小設定我們再來聊一下JVM上的垃圾回收器,官方文件裡推薦使用最新的G1來代替CMS作為垃圾回收器。不過也明確指出在某些低版本(1.7u21)的JDK上還是會存在一些不穩定的問題。推薦使用的最低版本為JDK 1.7u51。下面是本次試驗中Broker的JVM記憶體配置引數:
-Xms30g -Xmx30g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35

其實G1早在JDK 1.6u14中就已經作為體驗版首次被引入,但是由於最初誤宣傳需要收費才能使用,和其自身尚不穩定存在Bug等因素,一直等到1.7的後期update版本才逐漸走入我們的視野。

G1相比較於CMS的優勢:
  • G1是一種適用於伺服器端的垃圾回收器,很好的平衡了吞吐量和響應能力。
  • 對於記憶體的劃分方法不同,Eden, Survivor, Old區域不再固定,使用記憶體會更高效。G1通過對記憶體進行Region的劃分,有效避免了記憶體碎片問題。
  • G1可以指定GC時可用於暫停執行緒的時間(不保證嚴格遵守)。而CMS並不提供可控選項。
  • CMS只有在FullGC之後會重新合併壓縮記憶體,而G1把回收和合並集合在一起。
  • CMS只能使用在Old區,在清理Young時一般是配合使用ParNew,而G1可以統一兩類分割槽的回收演算法。

G1的適用場景:
  • JVM佔用記憶體較大(At least 4G)
  • 應用本身頻繁申請、釋放記憶體,進而產生大量記憶體碎片時。
  • 對於GC時間較為敏感的應用。

接下來,我們來總結一下Kafka本身可能會對效能產生影響的配置項。
Broker
num.network.threads:3

用於接收並處理網路請求的執行緒數,預設為3。其內部實現是採用Selector模型。啟動一個執行緒作為Acceptor來負責建立連線,再配合啟動num.network.threads個執行緒來輪流負責從Sockets裡讀取請求,一般無需改動,除非上下游併發請求量過大。
num.partitions:1

Partition的數量選取也會直接影響到Kafka叢集的吞吐效能。例如我寫過MapReduce任務從Kafka中讀取資料,每個Partition對應一個Mapper去消費資料,如果Partition數量太少,則任務會因為Mapper數不足而非常慢。此外,當Partition數量相對於流入流出的資料量顯得較少,或由於業務邏輯和Partition數量沒有匹配好造成個別Partition讀寫資料量大,大量的讀寫請求集中落在一臺或幾臺機器上時,很容易就會打滿NIC的全部流量。不難想象這時不僅這一個Partition的讀寫會出現效能瓶頸,同Broker上的其他Partition或服務都會陷入一個網路資源匱乏的情況。
queued.max.requests:500

這個引數是指定用於快取網路請求的佇列的最大容量,這個佇列達到上限之後將不再接收新請求。一般不會成為瓶頸點,除非I/O效能太差,這時需要配合num.io.threads等配置一同進行調整。
Replica相關配置:
replica.lag.time.max.ms:10000replica.lag.max.messages:4000num.replica.fetchers:1

上篇文章已經簡單介紹過上兩項配置的含義,這裡不再重複,重點說一下第三項配置。對於任意(Broker, Leader)元組,都會有replication.factor-1個Broker作為Replica,在Replica上會啟動若干Fetch執行緒把對應的資料同步到本地,而num.replica.fetchers這個引數是用來控制Fetch執行緒的數量。
一般來說如果發現Partition的ISR當中只有自己一個Partition,且長時間沒有新的Replica增加進來時,就可以考慮適當的增大這個引數加快複製進度。其內部實現上,每個Fetch就對應了一個SimpleConsumer,對於任意一臺其他機器上需要Catch-up的Leader,會建立num.replica.fetchers個SimpleConsumer來拉取Log。
當初剛知道這塊設計的時候還蠻疑惑的,在Kafka文件開篇的時候就鄭重介紹過,同一個ConsumerGroup內的Consumer和Partition在同一時間內必須保證是一對一的消費關係,而這麼簡單地增加SimpleConsumer就可以提高效率又是什麼原因呢?
檢視原始碼,在AbstractFetcherThread.scala裡可以看到,Fetch啟動的多執行緒其實就是一個個的SimpleConsumer。
首先,getFetcherId()利用numFetcher來控制FetchId的範圍,進而控制Consumer數量。partitionsPerFetcher結構則是一個從Partition到Partition上啟動的Fetchers的Mapping。

上面為每個Partition啟動的多個Fetcher(也就是SimpleConsumer)之間通過partitionMap: mutable.HashMap[TopicAndPartition, Long]來共享offset,達到並行Fetch資料的目的。因此,通過共享offset既保證了同一時間內Consumer和Partition之間的一對一關係,又允許我們通過增多Fetch執行緒來提高效率。

default.replication.factor:1

這個引數指新建立一個topic時,預設的Replica數量。當Producer中的 acks!=0 && acks!=1時,Replica的大小可能會導致在Produce資料時的效能表現有很大不同。Replica過少會影響資料的可用性,太多則會白白浪費儲存資源,一般建議在2~3為宜。
fetch.purgatory.purge.interval.requests:1000producer.purgatory.purge.interval.requests:1000

首先讓我先來介紹一下這個“煉獄”究竟是用來做什麼用的。Broker的一項主要工作就是接收並處理網路上發來的Request。這些Request其中有一些是可以立即答覆的,那很自然這些Request會被直接回復。另外還有一部分是沒辦法或者Request自發的要求延時答覆(例如傳送和接收的Batch),Broker會把這種Request放入Paurgatory當中,同時每一個加入Purgatory當中的Request還會額外的加入到兩個監控對佇列:
  • WatcherFor佇列:用於檢查Request是否被滿足。
  • DelayedQueue佇列:用於檢測Request是否超時。

Request最終的狀態只有一個,就是Complete。請求被滿足和超時最終都會被統一的認為是Complete。
目前版本的Purgatory設計上是存在一定缺陷的。Request狀態轉變為Complete後,並沒能立即從Purgatory中移除,而是繼續佔用資源,因此佔用記憶體累積最終會引發OOM。這種情況一般只會在topic流量較少的情況下觸發。更詳細的資料可以查閱擴充套件閱讀,在此不做展開。
在實際使用中我也是踩了這個坑過來的,當時的情況是叢集新上了一個topic,初期該topic資料很少(Low volume topic),導致那段時間在凌晨3,4點左右會隨機有Broker因為OOM掛掉。定位原因後把*.purgatory.purge.interval.requests的配置調整小至100就解決了這個問題。
Kafka的研發團隊已經開始著手重新設計Purgatory,力求能夠讓Request在Complete時立即從Purgatory中移除。
log.flush.interval.ms:Long.MaxValuelog.flush.scheduler.interval.ms:Long.MaxValuelog.flush.interval.messages:Long.MaxValue

Flush相關的配置引數控制著Broker寫盤的頻率,一般無需改動。如果topic的資料量較小可以考慮減少log.flush.interval.mslog.flush.interval.messages來強制刷寫資料,減少可能由於快取資料未寫盤帶來的不一致。
in.insync.replicas:1

這個引數只能在topic層級配置,指定每次Producer寫操作至少要保證有多少個在ISR的Replica確認,一般配合request.required.acks使用。要注意,這個引數如果設定的過高可能會大幅降低吞吐量。
compression.codec:none

Message落地時是否採用以及採用何種壓縮演算法。一般都是把Producer發過來Message直接儲存,不再改變壓縮方式。
Producer" style="font-weight: 400; font-size: 16px; color: rgb(0, 0, 0); font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif; line-height: 25.6px;">Producer
buffer.memory:33554432 (32m)

在Producer端用來存放尚未傳送出去的Message的緩衝區大小。緩衝區滿了之後可以選擇阻塞傳送或丟擲異常,由block.on.buffer.full的配置來決定。
compression.type:none

預設傳送不進行壓縮,推薦配置一種適合的壓縮演算法,可以大幅度的減緩網路壓力和Broker的儲存壓力。
linger.ms:0

Producer預設會把兩次傳送時間間隔內收集到的所有Requests進行一次聚合然後再發送,以此提高吞吐量,而linger.ms則更進一步,這個引數為每次傳送增加一些delay,以此來聚合更多的Message。
batch.size:16384

Producer會嘗試去把發往同一個Partition的多個Requests進行合併,batch.size指明瞭一次Batch合併後Requests總大小的上限。如果這個值設定的太小,可能會導致所有的Request都不進行Batch。
acks:1

這個配置可以設定傳送訊息後是否需要Broker端返回確認。
  • 0: 不需要進行確認,速度最快。存在丟失資料的風險。
  • 1: 僅需要Leader進行確認,不需要ISR進行確認。是一種效率和安全折中的方式。
  • all: 需要ISR中所有的Replica給予接收確認,速度最慢,安全性最高,但是由於ISR可能會縮小到僅包含一個Replica,所以設定引數為all並不能一定避免資料丟失。

注:新老Producer的引數有很大不同,其他配置含義可以對照參考Kafka官方文件。
Consumer
num.consumer.fetchers:1

啟動Consumer的個數,適當增加可以提高併發度。
fetch.min.bytes:1

每次Fetch Request至少要拿到多少位元組的資料才可以返回。
fetch.wait.max.ms:100

在Fetch Request獲取的資料至少達到fetch.min.bytes之前,允許等待的最大時長。對應上面說到的Purgatory中請求的超時時間。 效能測試實戰
由於可調整的配置引數較多,為了可以準確的展示不同配置對效能產生的影響,我們每次只調整一個引數,觀察對照組結果。測試工具使用Kafka提供的Performance工具ProducerPerformanceConsumerPerformance
Producer
Kafka在0.8版本推出了新的Producer Client,較之前版本有極大的效能提升,所以後續的示例無需說明都採用的是新Producer,這裡就只給出一組新舊Producer的對照組資料。 其中,Producer的message.size為1024,不壓縮,測試時都發送500000條Message。相信大家看過上面結果,就很清楚以後為什麼要乖乖地用新設計的Producer來發訊息了。
Kafka釋出時提供了兩個Producer的效能測試工具:
  • kafka.tools.ProducerPerformance (Scala)
  • org.apache.kafka.clients.tools.ProducerPerformance (Java)

兩份工具的大體功能類似。通過Scala版的程式碼可以很方便的輸出CVS檔案,通過Patch:1190(https://issues.apache.org/jira/browse/KAFKA-1190)中包含的一個R指令碼可以將這個CVS檔案結果視覺化。
注:如果使用Scala版程式碼,不建議開啟--vary-message-size功能。這個功能使得每次構造訊息時都會在內部呼叫random方法生成隨機長度的訊息,尤其是在進行壓力測試時,構造隨機串的消耗累計佔比飆高,嚴重影響傳送效率最終致使測試結果失準。
下面,從thread, acks, linger.ms, replica, compression幾個主要維度測試了一下Producer的組合效能表現。其中,公共指標如下:
        message.size=1024
        batch.siz=10240
        message.count=50000000

測試結果如下: 注:部分提高了linger.ms的Case效果不明顯是由於觸發了其他的flush條件。
Consumer

Consumer的測試相對來說就簡單很多,畢竟拉取資料時只從Leader讀,無論多少Replica都是如此。所以比較關鍵的引數就聚焦到了fetch.sizethread上。

上述本文給出的引數只是一種參考,適用於我們的叢集配置。大家有興趣可以根據上面提供的方法,在自己的叢集上新建獨立topic,在實際環境中測試,這樣得出的配置才是最適合你的配置。希望大家都能通過上面的方法把自己手頭的Kafka調教好,榨乾最後一絲效能。
擴充套件閱讀
Request Purgatory潛在引發OOM的問題: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=34839465 Purgatory Redesign: https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal 深入理解G1記憶體收集器: http://t.cn/RAUulGC How to choose the number of topics/partitions in a Kafka cluster?: http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster Tips for improving performance of kafka producer: http://ingest.tips/2015/07/19/tips-for-improving-performance-of-kafka-producer/