23.Kafka(一):概述
一、Kafka概述
Kafka
是一個分散式的基於釋出/訂閱模式的訊息佇列,主要應用於大資料實時處理領域。
1.1 MQ應用場景和優缺點
https://hucheng.blog.csdn.net/article/details/102961102
1.2 訊息佇列的兩種模式
①點對點模式(一對一,消費者主動拉取資料,訊息收到後訊息清除)
訊息生產者生產訊息傳送到Queue
中,然後訊息消費者從Queue
中取出並且消費訊息。
訊息被消費以後,Queue
中不再有儲存,所以訊息消費者不可能消費到已經被消費的訊息。Queue
支援存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費。
②釋出/訂閱模式(一對多,消費者消費資料之後不會清除訊息)
訊息生產者(釋出)將訊息釋出到topic
中,同時有多個訊息消費者(訂閱)消費該訊息。和點對點方式不同,釋出到topic
的訊息會被所有訂閱者消費。
1.3 Kafka基礎架構
Provider: 訊息生產者,就是向kafka broker
發訊息的客戶端。
Consumer: 訊息消費者,向kafka broker
取訊息的客戶端 。
Consumer Group (CG): 消費者組由多個consumer
組成。消費者組內每個消費者負責消費不同分割槽的資料,一個分割槽只能由一個消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。
Broker : 一臺kafka
broker
。一個叢集由多個broker
組成。一個broker
可以容納多個topic
。
Topic : 可以理解為一個佇列,生產者和消費者面向的都是一個topic
;
Partition: 為了實現擴充套件性,一個非常大的topic
可以分佈到多個broker
(即伺服器)上,一個topic
可以分為多個partition
,每個partition
是一個有序的佇列;
Replica: 副本,為保證叢集中的某個節點發生故障時,該節點上的partition
資料不丟失,且kafka
仍然能夠繼續工作,kafka
提供了副本機制,一個topic
的每個分割槽都有若干個副本,一個leader
和若干個follower
leader: 每個分割槽多個副本的“主”,生產者傳送資料的物件,以及消費者消費資料的物件都是leader
。
follower: 每個分割槽多個副本中的“從”,實時從leader
中同步資料,保持和leader
資料的同步。leader
發生故障時,某個follower
會成為新的leader
。
二、Kafka快速入門
2.1 安裝部署
叢集規劃:
hadoop100 | hadoop101 | hadoop102 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
叢集部署:
- 解壓安裝包
[root@hadoop100 software]# tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
- 修改解壓後的檔名稱
[root@hadoop100 module]# mv kafka_2.11-0.11.0.0/ kafka-0.11.0.0
- 在
kafka
目錄下建立logs
資料夾
[root@hadoop100 kafka-0.11.0.0]# mkdir logs
- 修改配置檔案
server.properties
- 配置環境變數
[root@hadoop100 module]# vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME= /opt/module/kafka-0.11.0.2
export PATH=$PATH:$KAFKA_HOME/bin
[root@hadoop100 module]# source /etc/profile
- 分發
kafka
安裝包和環境變數,並修改配置檔案中broke.id
為1、2。 - 啟動叢集
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop101 kafka-0.11.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop102 kafka-0.11.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
- 關閉叢集
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-server-stop.sh stop
[root@hadoop101 kafka-0.11.0.0]# bin/kafka-server-stop.sh stop
[root@hadoop102 kafka-0.11.0.0]# bin/kafka-server-stop.sh stop
kafka
群起指令碼
for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves`
do
echo "========== $i =========="
ssh $i 'source /etc/profile&&/opt/module/kafka-0.11.0.2/bin/kafka-server-start.sh -daemon
/opt/module/kafka-0.11.0.2/config/server.properties &'
echo $?
done
2.2 Kafka命令列操作
- 檢視當前伺服器中的所有
topic
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop100:2181 --list
- 建立
topic
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop100:2181 \
--create --replication-factor 3 --partitions 1 --topic first
選項說明:
--topic
:定義topic
名
--replication-factor
:定義副本數
--partitions
:定義分割槽數
- 刪除
topic
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop100:2181 \
--delete --topic first
需要server.properties
中設定delete.topic.enable=true
否則只是標記刪除。
- 傳送訊息
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-console-producer.sh \
--broker-list hadoop100:9092 --topic first
>hello
>world
- 消費訊息
[root@hadoop101 kafka-0.11.0.0]# bin/kafka-console-consumer.sh \
--bootstrap-server hadoop100:9092 --from-beginning --topic first
hello
world
--from-beginning
:會把主題中以往所有的資料都讀取出來。
- 檢視某個
Topic
的詳情
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic firs
- 修改分割槽數(修改的分割槽數只能大於之前的分割槽數)
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181
--alter --topic first --partitions 6
三、Kafka架構深入
3.1 Kafka工作流程
Kafka
中訊息是以topic
進行分類的,生產者生產訊息,消費者消費訊息,都是面向topic
的。
topic
是邏輯上的概念,而partition
是物理上的概念,每個partition
對應於一個log
檔案,該log
檔案中儲存的就是producer
生產的資料。Producer
生產的資料會被不斷追加到該log
檔案末端,且每條資料都有自己的offset
。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset
,以便出錯恢復時,從上次的位置繼續消費。
3.2 Kafka檔案儲存機制
由於生產者生產的訊息會不斷追加到log
檔案末尾,為防止log
檔案過大導致資料定位效率低下,Kafka
採取了分片和索引機制,將每個partition
分為多個segment
。每個segment
對應兩個檔案:.index
檔案和.log
檔案。這些檔案位於一個資料夾下,該資料夾的命名規則為:topic
名稱+分割槽序號。例如,first
這個topic
有三個分割槽,則其對應的資料夾為first-0
,first-1
,first-2
。
index
和log
檔案以當前segment
的第一條訊息的offset
命名,下圖為index
檔案和log
檔案的結構示意圖:
.index
檔案儲存大量的索引資訊,.log
檔案儲存大量的資料,索引檔案中的元資料指向對應資料檔案中message
的物理偏移地址。
3.3 Kafka生產者
① 分割槽策略
分割槽的原因:
- 方便在叢集中擴充套件,每個
Partition
可以通過調整以適應它所在的機器,而一個topic
又可以有多個Partition
組成,因此整個叢集就可以適應任意大小的資料了; - 可以提高併發,因為可以以
Partition
為單位讀寫了。
分割槽的原則:
我們需要將producer
傳送的資料封裝成一個ProducerRecord
物件。
- 指明
partition
的情況下,直接將指明的值直接作為partiton
值; - 沒有指明
partition
值但有key
的情況下,將key
的hash
值與topic
的partition
數進行取餘得到partition
值; - 既沒有
partition
值又沒有key
值的情況下,第一次呼叫時隨機生成一個整數(後面每次呼叫在這個整數上自增),將這個值與topic
可用的partition
總數取餘得到partition
值,也就是常說的round-robin
演算法。
② 資料可靠性保證
為保證producer
傳送的資料,能可靠的傳送到指定的topic
,topic
的每個partition
收到producer
傳送的資料後,都需要向producer
傳送ack
(acknowledgement
確認收到),如果producer
收到ack
,就會進行下一輪的傳送,否則重新發送資料。
1.副本資料同步策略
方案 | 優點 | 缺點 |
---|---|---|
半數以上完成同步,就傳送ack | 延遲低 | 選舉新的leader時,容忍n臺節點的故障,需要2n+1個副本 |
全部完成同步,才傳送ack | 選舉新的leader時,容忍n臺節點的故障,需要n+1個副本 | 延遲高 |
Kafka
選擇了第二種方案,原因如下:
- 同樣為了容忍
n
臺節點的故障,第一種方案需要2n+1
個副本,而第二種方案只需要n+1
個副本,而Kafka
的每個分割槽都有大量的資料,第一種方案會造成大量資料的冗餘。 - 雖然第二種方案的網路延遲會比較高,但網路延遲對
Kafka
的影響較小。
2.ISR
採用第二種方案之後,設想以下情景:leader
收到資料,所有follower
都開始同步資料,但有一個follower
,因為某種故障,遲遲不能與leader
進行同步,那leader
就要一直等下去,直到它完成同步,才能傳送ack
。這個問題怎麼解決呢?
Leader
維護了一個動態的in-sync replica set (ISR)
,意為和Leader
保持同步的follower
集合。當ISR
中的follower
完成資料的同步之後,leader
就會給follower
傳送ack
。如果follower
長時間未向leader
同步資料,則該follower
將被踢出ISR
,該時間閾值由replica.lag.time.max.ms
引數設定。Leader
發生故障之後,就會從ISR
中選舉新的leader
。
3.ack應答機制
對於某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丟失,所以沒必要等ISR
中的follower
全部接收成功。所以Kafka
為使用者提供了三種可靠性級別,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。
acks
引數配置:
- 0:
producer
不等待broker
的ack
,這一操作提供了一個最低的延遲,broker
一接收到還沒有寫入磁碟就已經返回,當broker
故障時有可能丟失資料; - 1:
producer
等待broker
的ack
,partition
的leader
落盤成功後返回ack
。如果在follower
同步成功之前leader
故障,那麼將會丟失資料;
- -1:
producer
等待broker
的ack
,partition
的leader
和follower
全部落盤成功後才返回ack
。但是如果在follower
同步完成後,broker
傳送ack
之前,leader
發生故障,那麼會造成資料重複。
4.故障處理細節
LEO:每個副本的最後一個offset
;
HW:所有副本中最小的LEO
。
- follower故障
follower
發生故障後會被臨時踢出ISR
,待該follower
恢復後,follower
會讀取本地磁碟記錄的上次的HW
,並將log
檔案高於HW
的部分擷取掉,從HW
開始向leader
進行同步。等該follower
的LEO
大於等於該Partition
的HW
,即follower
追上leader
之後,就可以重新加入ISR
了。 - leader故障
leader
發生故障之後,會從ISR
中選出一個新的leader
,之後,為保證多個副本之間的資料一致性,其餘的follower
會先將各自的log
檔案高於HW
的部分截掉,然後從新的leader
同步資料。
注意:這隻能保證副本之間的資料一致性,並不能保證資料不丟失或者不重複。
③ Exactly Once語義
對於某些比較重要的訊息,我們需要保證exactly once
語義,即保證每條訊息被髮送且僅被髮送一次。
在0.11
版本之後,Kafka
引入了冪等性機制(idempotent
),配合acks = -1
時的at least once
語義,實現了producer
到broker
的exactly once
語義。
idempotent
+ at least once
= exactly once
使用時,只需將enable.idempotence
屬性設定為true
,kafka
自動將acks
屬性設為-1。
3.4 Kafka消費者
① 消費方式
consumer
採用pull
(拉)模式從broker
中讀取資料。
push
(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由broker
決定的。它的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成consumer
來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull
模式則可以根據consumer
的消費能力以適當的速率消費訊息。
pull
模式不足之處是,如果kafka
沒有資料,消費者可能會陷入迴圈中,一直返回空資料。針對這一點,Kafka
的消費者在消費資料時會傳入一個時長引數timeout
,如果當前沒有資料可供消費,consumer
會等待一段時間之後再返回,這段時長即為timeout
。
② 分割槽分配策略
一個consumer group
中有多個consumer
,一個topic
有多個partition
,所以必然會涉及到partition
的分配問題,即確定那個partition
由哪個consumer
來消費。
Kafka
有兩種分配策略,一是round robin
(輪詢,預設),一是range
。range
分配是以topic
分配partition
,當有多個topic
時,可能會造成partition
分配不均勻
③ offset的維護
由於consumer
在消費過程中可能會出現斷電宕機等故障,consumer
恢復後,需要從故障前的位置的繼續消費,所以consumer
需要實時記錄自己消費到了哪個offset
,以便故障恢復後繼續消費。
Kafka 0.9
版本之前,consumer
預設將offset
儲存在Zookeeper
中,從0.9
版本開始,consumer
預設將offset
儲存在Kafka
一個內建的topic
中,該topic
為__consumer_offsets
。
3.5 Kafka 高效讀寫資料
①順序寫磁碟
Kafka
的producer
生產資料,要寫入到log
檔案中,寫的過程是一直追加到檔案末端,為順序寫。官網有資料表明,同樣的磁碟,順序寫能到到600M/s
,而隨機寫只有100k/s
。這與磁碟的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭定址的時間。
②零複製技術
3.6 Zookeeper在Kafka中的作用
Kafka
叢集中有一個broker
會被選舉為Controller
,負責管理叢集broker
的上下線,所有topic
的分割槽副本分配和leader
選舉等工作。Controller
的管理工作都是依賴於Zookeeper
的。
以下為partition
的leader
選舉過程:
23