1. 程式人生 > 實用技巧 >23.Kafka(一):概述

23.Kafka(一):概述

一、Kafka概述

Kafka是一個分散式的基於釋出/訂閱模式的訊息佇列,主要應用於大資料實時處理領域。

1.1 MQ應用場景和優缺點

https://hucheng.blog.csdn.net/article/details/102961102

1.2 訊息佇列的兩種模式

①點對點模式(一對一,消費者主動拉取資料,訊息收到後訊息清除)

訊息生產者生產訊息傳送到Queue中,然後訊息消費者從Queue中取出並且消費訊息。

訊息被消費以後,Queue中不再有儲存,所以訊息消費者不可能消費到已經被消費的訊息。Queue支援存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費。

②釋出/訂閱模式(一對多,消費者消費資料之後不會清除訊息)

訊息生產者(釋出)將訊息釋出到topic中,同時有多個訊息消費者(訂閱)消費該訊息。和點對點方式不同,釋出到topic的訊息會被所有訂閱者消費。

1.3 Kafka基礎架構


Provider: 訊息生產者,就是向kafka broker發訊息的客戶端。

Consumer: 訊息消費者,向kafka broker取訊息的客戶端 。

Consumer Group (CG): 消費者組由多個consumer組成。消費者組內每個消費者負責消費不同分割槽的資料,一個分割槽只能由一個消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。

Broker : 一臺kafka

伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic

Topic : 可以理解為一個佇列,生產者和消費者面向的都是一個topic

Partition: 為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列;

Replica: 副本,為保證叢集中的某個節點發生故障時,該節點上的partition資料不丟失,且kafka仍然能夠繼續工作,kafka提供了副本機制,一個topic的每個分割槽都有若干個副本,一個leader和若干個follower

leader: 每個分割槽多個副本的“主”,生產者傳送資料的物件,以及消費者消費資料的物件都是leader

follower: 每個分割槽多個副本中的“從”,實時從leader中同步資料,保持和leader資料的同步。leader發生故障時,某個follower會成為新的leader

二、Kafka快速入門

2.1 安裝部署

叢集規劃:

hadoop100 hadoop101 hadoop102
zk zk zk
kafka kafka kafka

叢集部署:

  1. 解壓安裝包
[root@hadoop100 software]# tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
  1. 修改解壓後的檔名稱
[root@hadoop100 module]# mv kafka_2.11-0.11.0.0/ kafka-0.11.0.0
  1. kafka目錄下建立logs資料夾
[root@hadoop100 kafka-0.11.0.0]# mkdir logs
  1. 修改配置檔案server.properties
  2. 配置環境變數
[root@hadoop100 module]# vi /etc/profile

#KAFKA_HOME
export KAFKA_HOME= /opt/module/kafka-0.11.0.2
export PATH=$PATH:$KAFKA_HOME/bin

[root@hadoop100 module]# source /etc/profile
  1. 分發kafka安裝包和環境變數,並修改配置檔案中broke.id為1、2。
  2. 啟動叢集
[root@hadoop100 kafka-0.11.0.0]#  bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop101 kafka-0.11.0.0]#  bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop102 kafka-0.11.0.0]#  bin/kafka-server-start.sh -daemon config/server.properties
  1. 關閉叢集
[root@hadoop100 kafka-0.11.0.0]#  bin/kafka-server-stop.sh stop
[root@hadoop101 kafka-0.11.0.0]#  bin/kafka-server-stop.sh stop
[root@hadoop102 kafka-0.11.0.0]#  bin/kafka-server-stop.sh stop
  1. kafka群起指令碼
for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves`
do
echo "========== $i ==========" 
ssh $i 'source /etc/profile&&/opt/module/kafka-0.11.0.2/bin/kafka-server-start.sh -daemon 
 /opt/module/kafka-0.11.0.2/config/server.properties &'
echo $?
done

2.2 Kafka命令列操作

  1. 檢視當前伺服器中的所有topic
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop100:2181 --list
  1. 建立topic
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop100:2181 \
--create --replication-factor 3 --partitions 1 --topic first

選項說明:
--topic:定義topic
--replication-factor :定義副本數
--partitions :定義分割槽數

  1. 刪除topic
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop100:2181 \
--delete --topic first

需要server.properties中設定delete.topic.enable=true否則只是標記刪除。

  1. 傳送訊息
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-console-producer.sh \
--broker-list hadoop100:9092 --topic first
>hello
>world
  1. 消費訊息
[root@hadoop101 kafka-0.11.0.0]# bin/kafka-console-consumer.sh \
--bootstrap-server hadoop100:9092 --from-beginning --topic first

hello
world

--from-beginning:會把主題中以往所有的資料都讀取出來。

  1. 檢視某個Topic的詳情
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic firs
  1. 修改分割槽數(修改的分割槽數只能大於之前的分割槽數)
[root@hadoop100 kafka-0.11.0.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181
 --alter --topic first --partitions 6

三、Kafka架構深入

3.1 Kafka工作流程


Kafka中訊息是以topic進行分類的,生產者生產訊息,消費者消費訊息,都是面向topic的。

topic是邏輯上的概念,而partition是物理上的概念,每個partition對應於一個log檔案,該log檔案中儲存的就是producer生產的資料。Producer生產的資料會被不斷追加到該log檔案末端,且每條資料都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢復時,從上次的位置繼續消費。

3.2 Kafka檔案儲存機制


由於生產者生產的訊息會不斷追加到log檔案末尾,為防止log檔案過大導致資料定位效率低下,Kafka採取了分片索引機制,將每個partition分為多個segment。每個segment對應兩個檔案:.index檔案和.log檔案。這些檔案位於一個資料夾下,該資料夾的命名規則為:topic名稱+分割槽序號。例如,first這個topic有三個分割槽,則其對應的資料夾為first-0,first-1,first-2

indexlog檔案以當前segment的第一條訊息的offset命名,下圖為index檔案和log檔案的結構示意圖:

.index檔案儲存大量的索引資訊,.log檔案儲存大量的資料,索引檔案中的元資料指向對應資料檔案中message的物理偏移地址。

3.3 Kafka生產者

① 分割槽策略

分割槽的原因:

  1. 方便在叢集中擴充套件,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料了;
  2. 可以提高併發,因為可以以Partition為單位讀寫了。

分割槽的原則:

我們需要將producer傳送的資料封裝成一個ProducerRecord物件。

  1. 指明partition的情況下,直接將指明的值直接作為partiton值;
  2. 沒有指明partition值但有key的情況下,將keyhash值與topicpartition數進行取餘得到partition值;
  3. 既沒有partition值又沒有key值的情況下,第一次呼叫時隨機生成一個整數(後面每次呼叫在這個整數上自增),將這個值與topic可用的partition總數取餘得到partition值,也就是常說的round-robin演算法。

② 資料可靠性保證

為保證producer傳送的資料,能可靠的傳送到指定的topictopic的每個partition收到producer傳送的資料後,都需要向producer傳送ackacknowledgement確認收到),如果producer收到ack,就會進行下一輪的傳送,否則重新發送資料。

1.副本資料同步策略

方案 優點 缺點
半數以上完成同步,就傳送ack 延遲低 選舉新的leader時,容忍n臺節點的故障,需要2n+1個副本
全部完成同步,才傳送ack 選舉新的leader時,容忍n臺節點的故障,需要n+1個副本 延遲高

Kafka選擇了第二種方案,原因如下:

  1. 同樣為了容忍n臺節點的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個分割槽都有大量的資料,第一種方案會造成大量資料的冗餘。
  2. 雖然第二種方案的網路延遲會比較高,但網路延遲對Kafka的影響較小。

2.ISR

採用第二種方案之後,設想以下情景:leader收到資料,所有follower都開始同步資料,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能傳送ack。這個問題怎麼解決呢?

Leader維護了一個動態的in-sync replica set (ISR),意為和Leader保持同步的follower集合。當ISR中的follower完成資料的同步之後,leader就會給follower傳送ack。如果follower長時間未向leader同步資料,則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms引數設定。Leader發生故障之後,就會從ISR中選舉新的leader

3.ack應答機制

對於某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丟失,所以沒必要等ISR中的follower全部接收成功。所以Kafka為使用者提供了三種可靠性級別,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。

acks引數配置:

  • 0:producer不等待brokerack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁碟就已經返回,當broker故障時有可能丟失資料;
  • 1:producer等待brokerackpartitionleader落盤成功後返回ack。如果在follower同步成功之前leader故障,那麼將會丟失資料
  • -1:producer等待brokerackpartitionleaderfollower全部落盤成功後才返回ack。但是如果在follower同步完成後,broker傳送ack之前,leader發生故障,那麼會造成資料重複

4.故障處理細節

LEO:每個副本的最後一個offset
HW:所有副本中最小的LEO

  1. follower故障
    follower發生故障後會被臨時踢出ISR,待該follower恢復後,follower會讀取本地磁碟記錄的上次的HW,並將log檔案高於HW的部分擷取掉,從HW開始向leader進行同步。等該followerLEO大於等於該PartitionHW,即follower追上leader之後,就可以重新加入ISR了。
  2. leader故障
    leader發生故障之後,會從ISR中選出一個新的leader,之後,為保證多個副本之間的資料一致性,其餘的follower會先將各自的log檔案高於HW的部分截掉,然後從新的leader同步資料。
    注意:這隻能保證副本之間的資料一致性,並不能保證資料不丟失或者不重複。

③ Exactly Once語義

對於某些比較重要的訊息,我們需要保證exactly once語義,即保證每條訊息被髮送且僅被髮送一次。

0.11版本之後,Kafka引入了冪等性機制(idempotent),配合acks = -1時的at least once語義,實現了producerbrokerexactly once語義。

idempotent + at least once = exactly once

使用時,只需將enable.idempotence屬性設定為truekafka自動將acks屬性設為-1。

3.4 Kafka消費者

① 消費方式

consumer採用pull(拉)模式從broker中讀取資料。

push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由broker決定的。它的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費訊息。

pull模式不足之處是,如果kafka沒有資料,消費者可能會陷入迴圈中,一直返回空資料。針對這一點,Kafka的消費者在消費資料時會傳入一個時長引數timeout,如果當前沒有資料可供消費,consumer會等待一段時間之後再返回,這段時長即為timeout

② 分割槽分配策略

一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。

Kafka有兩種分配策略,一是round robin(輪詢,預設),一是rangerange分配是以topic分配partition,當有多個topic時,可能會造成partition分配不均勻

③ offset的維護

由於consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復後,需要從故障前的位置的繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復後繼續消費。

Kafka 0.9版本之前,consumer預設將offset儲存在Zookeeper中,從0.9版本開始,consumer預設將offset儲存在Kafka一個內建的topic中,該topic__consumer_offsets

3.5 Kafka 高效讀寫資料

①順序寫磁碟

Kafkaproducer生產資料,要寫入到log檔案中,寫的過程是一直追加到檔案末端,為順序寫。官網有資料表明,同樣的磁碟,順序寫能到到600M/s,而隨機寫只有100k/s。這與磁碟的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭定址的時間。

②零複製技術

3.6 Zookeeper在Kafka中的作用

Kafka叢集中有一個broker會被選舉為Controller,負責管理叢集broker的上下線,所有topic的分割槽副本分配和leader選舉等工作。Controller的管理工作都是依賴於Zookeeper的。

以下為partitionleader選舉過程:

23