1. 程式人生 > >日誌收集之kafka

日誌收集之kafka

一、介紹

Kafka是一種分散式的,基於釋出/訂閱的訊息系統。主要設計目標如下:

  • 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的訪問效能
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條以上訊息的傳輸
  • 支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個Partition內的訊息順序傳輸
  • 同時支援離線資料處理和實時資料處理
  • Scale out:支援線上水平擴充套件

架構與原理

kafka的架構和原理想必大家都已經在很多地方看過,今天暫時不講,下次再開篇詳談,整個kafka的具體工作流和架構如下圖:

整體架構
  如上圖所示,一個典型的Kafka叢集中包含若干Producer(可以是web前端產生的Page View,或者是伺服器日誌,系統CPU、Memory等),若干broker(Kafka支援水平擴充套件,一般broker數量越多,叢集吞吐率越高),若干Consumer Group,以及一個
Zookeeper
叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將訊息釋出到broker,Consumer使用pull模式從broker訂閱並消費訊息。

二、安裝

在centos上安裝kafka,我推薦安裝confluent公司的kafka套裝,我們可以選擇自己想要的元件就行。

2.1 yum安裝

  1. sudo rpm --import http://packages.confluent.io/rpm/2.0/archive.key
  2. 新增yum源, confluent.repo
  1. sudo yum install confluent-platform-2.11.7
    安裝即可,裡面包含confluent-kafka-2.11.7confluent-schema-registry等元件。

安裝完成後馬上快速開始吧。

三、kafka命令列

kafka工具安裝後,會有很多自帶的工具來測試kafka,下面就舉幾個例子

3.1 kafka-topics

建立、改變、展示全部和描述topics, 例子:

 [[email protected] ~]#/usr/bin/kafka-topics --zookeeper zk01.example.com:2181 --list
sink1
test
[[email protected] ~]#/usr/bin/kafka-topics --zookeeper zk01.example.com:2181 --create --topic

3.2 kafka-console-consumer

從kafka中讀取資料,輸出到控制檯

[[email protected] ~]#kafka-console-consumer --zookeeper zk01.example.com:2181 --topic test

3.3 kafka-console-producer

從標準輸出讀取資料然後寫入到kafka佇列中

[[email protected] ~]#/usr/bin/kafka-console-producer --broker-list kafka02.example.com:9092,kafka03.example.com:9092 --topic test2

3.4 kafka-consumer-offset-checker

檢查讀寫的訊息量

[[email protected] ~]#/usr/bin/kafka-consumer-offset-checker --group flume --topic test1 --zookeeper zk01.example.com:2181

四、kafka web UI

4.1 執行KafkaOffsetMonitor

java -cp /root/kafka_web/KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb  --dbName kafka --zk  zk-server1,zk-server2 --port 8080 --refresh 10.seconds --retain 2.days```
- 利用supervisor執行, 在/etc/supervisord.d目錄下建立kafka_web.conf檔案,內容如下

[program:kafka_web]
command=java -cp /root/kafka_web/KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --dbName kafka -zk zk-server1,zk-server2 --port 8080 --refresh 10.seconds --retain 2.days
startsecs=0
stopwaitsecs=0
autostart=true
autorestart=true


## 4.2 執行kafka-manager
執行kafka-manager需要sbt編譯,但是編譯起來太麻煩了,而且還不一定成功,所以我就直接用docker跑了一個。

- 在centos上,在docker的配置```/etc/sysconfig/docker```上增加[daocloud](https://dashboard.daocloud.io/mirror)的加速mirror, 修改docker執行引數:
      other_args=" --registry-mirror=http://7919bcde.m.daocloud.io --insecure-registry=0.0.0.0:5000 -H tcp://0.0.0.0:2375 -H unix:///var/run/docker.sock -api-enable-cors=true"
直接重啟docker即可。
- 執行docker命令即可訪問
        docker run -p 9000:9000 -e ZK_HOSTS="zk_host:2181" -e APPLICATION_SECRET=kafka-manager  sheepkiller/kafka-manager

# 五 、效能測試
   關於效能測試,找到了Kafka的創始人之一的[Jay Kreps](http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines)的bechmark。以下描述皆基於該benchmark。(該benchmark基於Kafka0.8.1)
## 5.1 測試環境
  該benchmark用到了六臺機器,機器配置如下
>Intel Xeon 2.5 GHz processor with six cores
>Six 7200 RPM SATA drives
>32GB of RAM
>1Gb Ethernet    

這6臺機器其中3臺用來搭建Kafka broker叢集,另外3臺用來安裝Zookeeper及生成測試資料。6個drive都直接以非RAID方式掛載。實際上kafka對機器的需求與Hadoop的類似。

##5.2 producer吞吐率
  該項測試只測producer的吞吐率,也就是資料只被持久化,沒有consumer讀資料。
* 1個producer執行緒,無replication
  在這一測試中,建立了一個包含6個partition且沒有replication的topic。然後通過一個執行緒儘可能快的生成50 million條比較短(payload100位元組長)的訊息。測試結果是***821,557 records/second***(***78.3MB/second***)。  
  之所以使用短訊息,是因為對於訊息系統來說這種使用場景更難。因為如果使用MB/second來表徵吞吐率,那傳送長訊息無疑能使得測試結果更好。  
  整個測試中,都是用每秒鐘delivery的訊息的數量乘以payload的長度來計算MB/second的,沒有把訊息的元資訊算在內,所以實際的網路使用量會比這個大。對於本測試來說,每次還需傳輸額外的22個位元組,包括一個可選的key,訊息長度描述,CRC等。另外,還包含一些請求相關的overhead,比如topic,partition,acknowledgement等。這就導致我們比較難判斷是否已經達到網絡卡極限,但是把這些overhead都算在吞吐率裡面應該更合理一些。因此,我們已經基本達到了網絡卡的極限。  
  初步觀察此結果會認為它比人們所預期的要高很多,尤其當考慮到Kafka要把資料持久化到磁碟當中。實際上,如果使用隨機訪問資料系統,比如RDBMS,或者key-velue store,可預期的最高訪問頻率大概是500050000個請求每秒,這和一個好的RPC層所能接受的遠端請求量差不多。而該測試中遠超於此的原因有兩個。
Kafka確保寫磁碟的過程是線性磁碟I/O,測試中使用的6塊廉價磁碟線性I/O的最大吞吐量是822MB/second,這已經遠大於1Gb網絡卡所能帶來的吞吐量了。許多訊息系統把資料持久化到磁碟當成是一個開銷很大的事情,這是因為他們對磁碟的操作都不是線性I/O。
在每一個階段,Kafka都儘量使用批量處理。如果想了解批處理在I/O操作中的重要性,可以參考David Patterson的”[Latency Lags Bandwidth](http://www.ll.mit.edu/HPEC/agendas/proc04/invited/patterson_keynote.pdf)“
* 1個producer執行緒,3個非同步replication
  該項測試與上一測試基本一樣,唯一的區別是每個partition有3個replica(所以網路傳輸的和寫入磁碟的總的資料量增加了3倍)。每一個broker即要寫作為leader的partition,也要讀(從leader讀資料)寫(將資料寫到磁碟)作為follower的partition。測試結果為***75.1MB/second***。  
  該項測試中replication是非同步的,也就是說broker收到資料並寫入本地磁碟後就acknowledge producer,而不必等所有replica都完replication。也就是說,如果leader crash了,可能會丟掉一些最新的還未備份的資料。但這也會讓message acknowledgement延遲更少,實時性更好。  
  這項測試說明,replication可以很快。整個叢集的寫能力可能會由於3倍的replication而只有原來的三分之一,但是對於每一個producer來說吞吐率依然足夠好。  
* 1個producer執行緒,3個同步replication
  該項測試與上一測試的唯一區別是replication是同步的,每條訊息只有在被in sync集合裡的所有replica都複製過去後才會被置為committed(此時broker會向producer傳送acknowledgement)。
  在這種模式下,Kafka可以保證即使leader crash了,也不會有資料丟失。測試結果***40.2MB/second***。Kafka同步複製與非同步複製並沒有本質的不同。leader會始終track follower replica從而監控它們是否還alive,只有所有in sync集合裡的replica都acknowledge的訊息才可能被consumer所消費。而對follower的等待影響了吞吐率。可以通過增大batch size來改善這種情況,但為了避免特定的優化而影響測試結果的可比性,本次測試並沒有做這種調整。  
* 3個producer,3個非同步replication
  該測試相當於把上文中的1個producer,複製到了3臺不同的機器上(在1臺機器上跑多個例項對吞吐率的增加不會有太大幫忙,因為網絡卡已經基本飽和了),這3個producer同時傳送資料。整個叢集的吞吐率為***193,0MB/second***。

###5.3 Producer Throughput Vs. Stored Data
  訊息系統的一個潛在的危險是當資料能都存於記憶體時效能很好,但當資料量太大無法完全存於記憶體中時(然後很多訊息系統都會刪除已經被消費的資料,但當消費速度比生產速度慢時,仍會造成資料的堆積),資料會被轉移到磁碟,從而使得吞吐率下降,這又反過來造成系統無法及時接收資料。這樣就非常糟糕,而實際上很多情景下使用queue的目的就是解決資料消費速度和生產速度不一致的問題。  
  但Kafka不存在這一問題,因為Kafka始終以O(1)的時間複雜度將資料持久化到磁碟,所以其吞吐率不受磁碟上所儲存的資料量的影響。為了驗證這一特性,做了一個長時間的大資料量的測試。測試表明當磁碟資料量達到1TB時,吞吐率和磁碟資料只有幾百MB時沒有明顯區別,這個variance是由Linux I/O管理造成的,它會把資料快取起來再批量flush。 
### 5.4 consumer吞吐率
  需要注意的是,replication factor並不會影響consumer的吞吐率測試,因為consumer只會從每個partition的leader讀資料,而與replicaiton factor無關。同樣,consumer吞吐率也與同步複製還是非同步複製無關。  
1個consumer
  該測試從有6個partition,3個replication的topic消費50 million的訊息。測試結果為***89.7MB/second***。可以看到,Kafka的consumer是非常高效的。它直接從broker的檔案系統裡讀取檔案塊。Kafka使用[sendfile API](http://www.ibm.com/developerworks/library/j-zerocopy/)來直接通過作業系統直接傳輸,而不用把資料拷貝到使用者空間。該項測試實際上從log的起始處開始讀資料,所以它做了真實的I/O。在生產環境下,consumer可以直接讀取producer剛剛寫下的資料(它可能還在快取中)。實際上,如果在生產環境下跑[I/O stat](http://en.wikipedia.org/wiki/Iostat),你可以看到基本上沒有物理“讀”。也就是說生產環境下consumer的吞吐率會比該項測試中的要高。
3個consumer
  將上面的consumer複製到3臺不同的機器上,並且並行執行它們(從同一個topic上消費資料)。測試結果為***249.5MB/second***,正如所預期的那樣,consumer的吞吐率幾乎線性增漲。  
### 5.5 Producer and Consumer
  上面的測試只是把producer和consumer分開測試,而該項測試同時執行producer和consumer,這更接近使用場景。實際上目前的replication系統中follower就相當於consumer在工作。  
  該項測試,在具有6個partition和3個replica的topic上同時使用1個producer和1個consumer,並且使用非同步複製。測試結果為***75.8MB/second***, 可以看到,該項測試結果與單獨測試1個producer時的結果幾乎一致。所以說consumer非常輕量級。  
### 5.6 訊息長度對吞吐率的影響
  上面的所有測試都基於短訊息(payload 100位元組),而正如上文所說,短訊息對Kafka來說是更難處理的使用方式,可以預期,隨著訊息長度的增大,records/second會減小,但MB/second會有所提高。正如我們所預期的那樣,隨著訊息長度的增加,每秒鐘所能傳送的訊息的數量逐漸減小。但是如果看每秒鐘傳送的訊息的總大小,它會隨著訊息長度的增加而增加,當訊息長度為10位元組時,因為要頻繁入隊,花了太多時間獲取鎖,CPU成了瓶頸,並不能充分利用頻寬。但從100位元組開始,我們可以看到頻寬的使用逐漸趨於飽和(雖然MB/second還是會隨著訊息長度的增加而增加,但增加的幅度也越來越小)。  
### 5.7 端到端的Latency
  上文中討論了吞吐率,那訊息傳輸的latency如何呢?也就是說訊息從producer到consumer需要多少時間呢?該項測試建立1個producer和1個consumer並反覆計時。結果是,***2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile)***,(這裡並沒有說明topic有多少個partition,也沒有說明有多少個replica,replication是同步還是非同步。實際上這會極大影響producer傳送的訊息被commit的latency,而只有committed的訊息才能被consumer所消費,所以它會最終影響端到端的latency)  
### 5.8 重現該benchmark
  如果讀者想要在自己的機器上重現本次benchmark測試,可以參考[本次測試的配置和所使用的命令](https://gist.github.com/jkreps/c7ddb4041ef62a900e6c)。  
  實際上Kafka Distribution提供了producer效能測試工具,可通過```bin/kafka-producer-perf-test.sh```指令碼來啟動。 
  讀者也可參考另外一份[Kafka效能測試報告](http://liveramp.com/blog/kafka-0-8-producer-performance-2/)  
作者:modeyangg_cs