kafka性能測試
Kafka 壓力測試文檔
1 概述
1.1 測試背景
在雲平臺研發SR IAD的過程中,出現SR IAD對硬件資源消耗較為嚴重的情況,其中在雲平臺研發中利用Kafka軟件對流式數據進行數據處理。我們針對Kafka高吞吐量的特性,對kafka進行壓力測試。
1.2 測試目標
測試kafka的吞吐性能(Producer/Consumer性能)。我們主要對分區、磁盤和線程等影響因素進行測試。
2 測試條件
6臺配置相同的服務器搭建的兩套相同的集群環境
2.1測試環境
硬件條件
序號 |
硬件 |
配置 |
備註 |
1 |
CPU |
E5-2640v2 *2 |
|
2 |
內存 |
128G |
|
3 |
硬盤 |
共掛載15塊磁盤,剩余空間10T左右 |
|
4 |
網絡 |
Intel I350 Gigabit Network Connection *4 Intel 82599ES 10-Gigabit SFI/SFP+ Network Connection *2 Intel I350 Gigabit Fiber Network Connection *2 |
使用 萬兆網卡 |
5 |
Kafka |
3臺單磁盤服務組成的kafka集群 |
|
軟件版本:
序號 |
軟件 |
版本 |
備註 |
1 |
CentOS |
7.3 |
|
2 |
Hadoop |
2.7.3 |
|
3 |
HBase |
1.1.7 |
|
4 |
Spark |
1.6.2 |
|
5 |
Elastic Search |
2.4.1 |
|
6 |
Scala |
2.11.8 |
|
7 |
Kafka |
2.11-0.10.0.1 |
|
8 |
Flume |
1.7 |
|
系統架構:
Kafka默認配置:(CDH裏的配置)
(1) broker中默認配置
1)網絡和io操作線程配置
# broker處理消息的最大線程數 (一般不需要修改)
num.network.threads=3
# broker處理磁盤IO的線程數
num.io.threads=8
2)log數據文件刷盤策略
為了大幅度提高producer寫入吞吐量,需要定期批量寫文件
#每當producer寫入10000條消息時,刷數據到磁盤
log.flush.interval.messages=10000
# 每間隔1秒鐘時間,刷數據到磁盤
log.flush.interval.ms=1000
3)日誌保留策略配置
# 保留三天,也可以更短
log.retention.hours=72
# 段文件配置1GB,有利於快速回收磁盤空間,重啟kafka加載也會加快
log.segment.bytes=1073741824
4)Replica相關配置
replica.lag.time.max.ms:10000
replica.lag.max.messages:4000
num.replica.fetchers:1
#用來控制Fetch線程的數量。
default.replication.factor:1
#自動創建topic時默認的Replica數量,一般建議在2~3為宜。
5)其他
num.partitions:1
#分區數量
queued.max.requests:500
#用於緩存網絡請求的隊列的最大容量
compression.codec:none
#Message落地時是否采用以及采用何種壓縮算法。
in.insync.replicas:1
#指定每次Producer寫操作至少要保證有多少個在ISR的Replica確認,一般配合request.required.acks使用。過高可能會大幅降低吞吐量。
(2) producer 配置
buffer.memory:33554432 (32m)
#在Producer端用來存放尚未發送出去的Message的緩沖區大小。
compression.type:none
#生產者生成的所有數據壓縮格式,默認發送不進行壓縮。若啟用壓縮,可以大幅度的減緩網絡壓力和Broker的存儲壓力,但會對cpu 造成壓力。
linger.ms:0
#Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然後再發送,以此提高吞吐量,這個參數為每次發送增加一些delay,以此來聚合更多的Message。
batch.size:16384
#Producer會嘗試去把發往同一個Partition的多個Requests進行合並,batch.size指明了一次Batch合並後Requests總大小的上限。如果這個值設置的太小,可能會導致所有的Request都不進行Batch。
acks:1
#設定發送消息後是否需要Broker端返回確認。
0: 不需要進行確認,速度最快。存在丟失數據的風險。
1: 僅需要Leader進行確認,不需要ISR進行確認。是一種效率和安全折中的方式。
all: 需要ISR中所有的Replica給予接收確認,速度最慢,安全性最高,但是由於ISR可能會縮小到僅包含一個Replica,設置參數為all並不能一定避免數據丟失。
(3) Consumer
num.consumer.fetchers:1
#啟動Consumer的個數,適當增加可以提高並發度。
fetch.min.bytes:1
#每次Fetch Request至少要獲取多少字節的數據才可以返回。
#在Fetch Request獲取的數據至少達到fetch.min.bytes之前,允許等待的最大時長。
fetch.wait.max.ms:100
(4) 其他
zookeeper.session.timeout.ms=6s
message.max.bytes=1000000B
replica.fetch.max.bytes=1MB
num.producers=1
### Number of Producers
num.streams=1
###Number of Consumer Threads
producer.request.timeout.ms=30s
consumer.request.timeout.ms=40s
session.timeout.ms=30s
kafka.log4j.dir=/var/log/kafka
##kafka數據的存放地址
2.2 測試方法
壓力發起:kafka官方提供的自測腳本
4項指標:總共發送消息量(以MB為單位),每秒發送消息量(MB/second),發送消息總數,每秒發送消息數(records/second)
監控信息:自定義腳本來記錄服務情況,包括CPU、內存、磁盤、網絡等情況。
(1) 生產者測試腳本kafka-producer-perf-test.sh
參數說明:
--topic topic名稱,
--num-records 總共需要發送的消息數,
--record-size 每個記錄的字節數,
--throughput 每秒鐘發送的記錄數,
--producer-props bootstrap.servers=localhost:9092 發送端的配置信息
(2)消費者測試腳本kafka-consumer-perf-test.sh
參數說明:
--zookeeper 指定zookeeper的鏈接信息,
--topic 指定topic的名稱,
--fetch-size 指定每次fetch的數據的大小,
--messages 總共要消費的消息個數,
2.2.1 producer 吞吐量測試方法
1、基於原有配置下對kafka進行數量級的加壓,選用一臺或三臺客戶機先測試在沒有消費者時的吞吐量。測試影響因素主要包括partitions、磁盤和thread三個主要因素,通過記錄每秒鐘條數(Records/s)、每秒日誌量大小(MB/s)衡量吞吐量,並找到各個因素之間的規律。
註:
(1) 線程數的設置根據cpu內核數設置,本測試環境為16核(thread <=16)。
(2) Broker 基於物理環境設置(broker<=3),三個
2、通過修改以上3個影響因素測試kafka吞吐量。
3、測試命令
(1)創建topic
1)./kafka-topics.sh --zookeepr IP:2181 --create --topic test --partitions 3 --replication-factor 1
2)./ppt.sh --topic pc918 --num-records 50000000 --record-size 1000 --throughput 10000000 --threads 3 --producer-props
bootstrap.servers=192.168.17.57:9092,192.168.17.64:9092,192.168.17.65:9092
註:ppt.sh 基於kafka-producer-perf-test.sh修改,增加了生產者的thread 選項。
2.2.2 consumer 吞吐量測試方法
1、基於原有配置下進行消費,並測試主要影響因素partitions、thread和磁盤等因素,選用一臺或三臺客戶機先測試沒有生產者時的吞吐量。通過記錄Records/second、MB/second衡量吞吐量,並找到各個因素之間的規律。
2、通過修改影響因素測試kafka吞吐量。
3、測試命令
./kafka-consumer-perf-test.sh --zookeeper IP:2181 --topic test1 --fetch-size 1048576 --messages 10000000 --threads 3
2.2.3 消費與吞吐量
分別在一臺或三臺客戶機同時開啟生產者和消費者測試命令,進行測試
3 測試數據(部分樣例)
3.1 寫入測試(only生產者)
3.1.1 Partition VS. Throughput
實驗條件:3個Broker,1個Topic, Replication-factor=1,3個磁盤,throughput=1000w,num-records=2000w ,batch-size=16K ,1個客戶端,1個producer
測試項目:分別測試1,3,6,12,15,20,30,50個partition時的吞吐量
表1 partition影響因子
類型 |
partition |
Records/second(avg) |
MB/second(avg) |
延遲時間(avg) |
延遲時間(max) |
partition |
1 |
102760 |
98 |
297.65 |
929 |
3 |
283639 |
270.5 |
107.1 |
1965 |
|
6 |
332457 |
317.06 |
61.22 |
1085 |
|
9 |
305838 |
291.67 |
39.37 |
2842 |
|
12 |
344245 |
328.3 |
21.68 |
645 |
|
15 |
349846 |
333.64 |
20.51 |
618 |
|
20 |
349852 |
333.6 |
15.06 |
371 |
|
30 |
349870 |
333.66 |
15.47 |
843 |
|
50 |
346296 |
330.25 |
13.07 |
619 |
當partition的個數為3時,吞吐量成線性增長,當partition多於broker的個數時增長並不明顯,甚至還有所下降。同時隨著,partition個數的增多,延遲時間逐漸減少,當partition個數在3-6之間延遲時間降低較快,越大延遲時間降低趨於平穩。
3.1.2 Replica VS. Throughput
實驗條件:3個Broker,1個Topic,3個磁盤,throughput=35w,num-records=5000w ,batch-size=16K ,1個客戶端,1個producer
測試項目:分別測試replication-factor為1,2,3,6時的吞吐量
表2 Repica影響因子
類型 |
replications |
Records/second(avg) |
MB/second(avg) |
延遲時間(avg) |
延遲時間(max) |
replication |
1 |
349870 |
333.66 |
12.69 |
713 |
2 |
349846 |
333.64 |
10.8 |
392 |
|
3 |
345077 |
329.09 |
36.36 |
897 |
|
6 |
錯誤 |
Error while executing topic command : replication factor: 6 larger than available brokers: 3 |
|
|
由表可知,replication-factor的個數應該不大於broker的個數,否則就會報錯。隨著replication-factor 個數的增加吞吐量減小,但並非是線性下降,因為多個Follower的數據復制是並行進行的,而非串行進行,因此基於數據的安全性及延遲性考慮,最多選擇2-3個副本。
3.1.3 Thread VS. Throughput
實驗條件:3個Broker,1個Topic,3個磁盤,3個partitions , throughput=1000w,num-records=1000w ,batch-size=16K ,1個客戶端,1個producer
測試項目:分別測試thread為1,2,3,4,5時的吞吐量
表3 Thread 影響因子
類型 |
partition |
thread |
Records/second(avg) |
MB/second(avg) |
延遲時間(avg) |
延遲時間(max) |
thread |
3 |
1 |
265505 |
253.21 |
113.79 |
571 |
2 |
559252 |
533.35 |
93.75 |
388 |
||
3 |
730300 |
696.47 |
68.96 |
337 |
||
4 |
722700 |
689.22 |
42.49 |
343 |
||
5 |
740137 |
705.85 |
36.88 |
283 |
由表可知隨著線程數的增加吞吐量不斷增加,當線程數小於分區數時增長較快,大於分區數時增長較慢,並趨於平穩。目前單個客戶端可以達700MB-800MB/s以上的網速流量。
3.1.4 磁盤 VS. Throughput
實驗條件:3個Broker,1個Topic,throughput=1000w,num-records=1000w ,batch-size=16K ,1個客戶端,1個producer
測試項目:分別測試磁盤個數為3,6,9時的吞吐量
表4 磁盤影響因子
類型 |
partition |
磁盤個數 |
Records/second(avg) |
MB/second(avg) |
延遲時間(avg) |
延遲時間(max) |
磁盤 |
3 |
3 |
265505 |
253.21 |
113.79 |
571 |
6 |
6 |
354886 |
338.45 |
58.13 |
512 |
|
9 |
9 |
363240 |
346.41 |
6.33 |
409 |
由表可知,磁盤個數的增加與partition的增加具有相關性,並非越多越好,partition的增加受broker的影響,因此磁盤的個數設置應在broker個數的1-3倍之間較為合適,同時隨著磁盤個數的增加,平均延遲時間在逐漸減小,因此磁盤的數量會影響平均延遲時間。
3.1.5 Others VS. Throughput
1、Network線程數
Network線程數即broker處理消息的最大線程數 (一般不需要修改)。
實驗條件:3個Broker,1個Topic,throughput=1000w,num-records=5000w ,batch-size=16K ,1個客戶端,1個producer,9個磁盤,9個partitions
測試項目:分別測試線程個數為3,8時的吞吐量
表5 Network網絡線程數
net.io.thread |
生產者y99_thread |
Records/second(avg) |
MB/second(avg) |
CPU(max%) |
3 |
3 |
516913 |
492.97 |
560.5 |
6 |
528966 |
504 |
943.2 |
|
9 |
441789 |
421.32 |
799.3 |
|
8 |
3 |
629945 |
600.76 |
636 |
6 |
554865 |
529.16 |
855 |
|
9 |
543371 |
518.2 |
859 |
由表所示,增加網絡線程數可以提高吞吐量,但隨著生產者線程數增加逐漸趨於平穩,此時CPU最大值為998.2%,對於16核的CPU約占到10核。
2、測試客戶機的影響
由於在單個客戶機測試時,網卡硬件條件的限制會影響測試的吞吐量,因此換用3臺客戶機測試,吞吐量的測試結果即為三個客戶機吞吐總量。分別測試6個磁盤和9個磁盤在不同分區和線程數的情況,測試結果如表6所示。
表6 客戶機影響因素
topic_partition_thread |
66 |
67 |
68 |
總量 |
||||
Records/second(avg) |
MB/second(avg) |
Records/second(avg) |
MB/second(avg) |
Records/second(avg) |
MB/second(avg) |
Records/second(avg) |
MB/second(avg) |
|
p66(3) |
439382 |
419.03 |
461382 |
440.01 |
449458 |
428.64 |
1350222 |
1287.68 |
p66(6) |
471635 |
449.79 |
468059 |
446.38 |
466409 |
444.8 |
1406103 |
1340.97 |
p66(12) |
145292 |
138.56 |
145266 |
138.54 |
144017 |
137.35 |
434575 |
414.45 |
p612(3) |
498037 |
474.97 |
488181 |
465.57 |
584098 |
557.04 |
1570316 |
1497.58 |
p612(6) |
305658 |
291.5 |
305255 |
291.11 |
305715 |
291.55 |
916628 |
874.16 |
p612(12) |
102037 |
97.31 |
102032 |
97.31 |
101951 |
97.23 |
306020 |
291.85 |
p99(3) |
473771 |
451.82 |
481621 |
459.31 |
498658 |
475.56 |
1454050 |
1386.69 |
p99(6) |
207400 |
197.79 |
207337 |
197.73 |
206322 |
196.76 |
621059 |
592.28 |
p99(12) |
174175 |
166.11 |
178429 |
170.16 |
177564 |
169.34 |
530168 |
505.61 |
由表6可以看出,在三臺測試機下,同時向一個topic產生數據,生產者的數據總量是在增大的。在網絡穩定的情況下,曾測試3個磁盤3個partition最大吞吐量在18000條左右 ,大小為1683.31MB/s,如圖1所示。
其中在單個broker上cpu最大上線為700%-800%,內存利用率為10%-20%。在單臺測試機情況下,曾測試單臺測試下最大吞吐為80w條左右,大小約為80MB/s,由於資源限制三臺機器總量可能達不到一臺機器3倍的量。
圖1 3臺機器的吞吐量
3、IO線程數
IO線程數即broker處理磁盤IO的線程數
實驗條件:3個Broker,1個Topic,throughput=1000w,num-records=5000w ,batch-size=16K ,3個客戶端,1個producer,6個磁盤,6個partitions
測試項目:分別測試線程個數為8,12時的吞吐量
圖2 線程數為8
圖3 線程數為12
由圖2,3可知,當線程數增加後,吞吐量有所增加,但增加比較緩和。
4、生產者的個數
實驗條件:3個Broker,1個Topic,throughput=1000w,num-records=5000w ,batch-size=16K ,3個客戶端,1個producer,9個磁盤,9個partitions
測試項目:分別測試生產者個數為1,2,3時的吞吐量
表7 不同生產者吞吐量
thread |
1 |
2 |
3 |
|||||||||
1窗口 |
1窗口 |
2窗口 |
1窗口 |
2窗口 |
3窗口 |
|||||||
條 |
MB |
條 |
MB |
條 |
MB |
條 |
MB/s |
條 |
MB/s |
條 |
MB/s |
|
3 |
629945 |
600.76 |
338900 |
323.2 |
328357 |
313.15 |
209465 |
199.76 |
205923 |
196.38 |
209129 |
199.44 |
6 |
554865 |
529.16 |
354172 |
337.77 |
382564 |
364.84 |
235881 |
224.95 |
236085 |
225.15 |
245225 |
233.87 |
9 |
543371 |
518.2 |
370090 |
352.95 |
359056 |
342.42 |
209646 |
199.93 |
221790 |
211.52 |
209058 |
199.37 |
由表7 可知,在同一客戶端下,開啟不同的生產者,2個生產者和3個生產的總吞吐量基本等於1個生產者的總量。
5、壓縮因素
實驗條件:3個Broker,1個Topic,throughput=1000w,num-records=5000w ,batch-size=16K ,1個客戶端,1個producer,9個磁盤,9個partitions
測試項目:分別測試不同壓縮類型的吞吐量,kafka支持的壓縮類型包括none、Snappy、gzip、LZ4。
測試命令:
./ppt.sh --topic py --num-records 50000000 --record-size 1000 --throughput 10000000 --threads 3 --producer-props bootstrap.servers=192.168.17.68:9092,192.168.17.69:9092,192.168.17.70:9092 compression.type=gzip
通過運行時初始化參數看到修改值生效,如圖所示。
圖4 初始化參數
表8 壓縮因素
壓縮類型 |
Records/second(avg) |
MB/second(avg) |
延遲時間(avg) |
延遲時間(max) |
耗時 |
cpu max |
cpu avg |
none |
636472 |
606 |
122.56 |
3281 |
78s |
899 |
545 |
Snappy |
392233 |
374.06 |
50.88 |
672 |
80s |
670 |
574 |
gzip |
92653 |
88.36 |
2.38 |
386 |
8分59s |
959.5 |
522 |
LZ4 |
618444 |
589.79 |
4 |
329 |
81s |
887.7 |
480 |
由表可知snappy、gzip壓縮可以大幅度的減緩網絡壓力和Broker的存儲壓力,同時降低平均延遲時間,LZ4壓縮在降低平均延遲時間時最顯著,gzip在生產數據時耗時最大。
3.2 讀取測試(only消費者)
3.2.1 Thread VS. Throughput
實驗條件:3個Broker,1個Topic,3個磁盤,throughput=1000w,num-records=5000w ,1個客戶端
測試項目:分別測試thread為1,2,3,6時的吞吐量
表9 線程因素
console&thread |
Records/second(avg) |
MB/second(avg) |
p33(11) |
340124 |
324 |
p33(12) |
799073 |
762 |
p33(13) |
973551 |
928 |
p33(16) |
963081 |
918 |
由表可知,增加線程數會增加消費者的吞吐量,當小於3個線程時吞吐量增速較快,大於3個的時候趨於大體平穩,單個客戶端消費者的吞吐量可達到928MB/s。同時,消費者吞吐量大於生產者的吞吐量並且處理消息的時間總大於生產者。因此在合理的配置下可以保證消息被及時處理。
3.2.2 消費者 VS. Throughput
實驗條件:3個Broker,1個Topic,3個磁盤,throughput=1000w,num-records=5000w ,3個客戶端,即三個消費者
測試項目:分別測試消費者為1, 3時的吞吐量
當消費者個數有1增為3時,吞吐量由324MB/s增加到1324MB/s。Consumer消費消息時以Partition為分配單位,當只有1個Consumer時,該Consumer需要同時從3個Partition拉取消息,該Consumer所在機器的I/O成為整個消費過程的瓶頸,而當Consumer個數增加至3個時,多個Consumer同時從集群拉取消息,充分利用了集群的吞吐率。對於同一個組內的消費者只允許一個消費者消費一個partition,不同組之間可以消費相同的partition。
3.2.3 磁盤 VS. Throughput
實驗條件:3個Broker,1個Topic, throughput=1000w,num-records=5000w ,1個客戶端
測試項目:分別測試消費者為3,6,9時的吞吐量
表10 磁盤影響因素
console&thread |
Records/second(avg) |
MB/second(avg) |
p36(33) |
938827 |
895 |
p66(33) |
888458 |
847 |
p96(33) |
259475 |
247 |
由表看出,磁盤數增加並不能提高吞吐率,反而吞吐率相應下降。因此,在一定broker和partition 下可以按兩者值合理增加磁盤。
Consumer 小結
(1) Consumer 吞吐率遠大於生產者,因此可以保證在合理的配置下,消息可被及時處理。
(2) Consumer 處理消息時所消耗單個broker的CPU一般在70%以下,所占內存也較小,屬於輕量級進程。
(3) 本實驗在三臺客戶端測得Consumer的最大吞吐量在2617MB/s
3.3 生產者&消費者
模擬類似真實的環境,生產消費過程
實驗條件:3個Broker,1個Topic, throughput=1000w,num-records=5000w ,3個客戶端,即三個消費者
測試項目:分別測試磁盤為3,6,9時的吞吐量
(1) Producer
表11 生產者吞吐量
topic |
Records/second(avg) |
MB/second(avg) |
pc36(366) |
1023426 |
976.02 |
p66(366) |
1319431 |
1258.31 |
p612(366) |
1273905 |
1214.88 |
p99(366) |
989373 |
943.54 |
p918(366) |
1159443 |
1105.73 |
(2) Consumer
表12 消費者吞吐量
topic |
Records/second(avg) |
MB/second(avg) |
pc36(366) |
934007 |
889 |
p66(366) |
2259154 |
2153 |
p612(366) |
1982417 |
1889 |
p99(366) |
649876 |
618 |
p918(366) |
1313163 |
1817 |
由producer與consumer兩表,測得生產的最大吞吐量為1258.31MB/s,消費最大吞吐量為2153MB/s。由此看出,與單生產、單消費時的吞吐量大體相當,所用CPU及內存量並未顯著上升。
kafka性能測試