1. 程式人生 > 其它 >Apache Kafka分散式流處理平臺及大廠面試寶典v3.0.0

Apache Kafka分散式流處理平臺及大廠面試寶典v3.0.0

Apache Kafka是一個開源的分散式事件流平臺,使用Scala和Java混合編寫,Kafka最初由Linkedin公司開發,2011年貢獻給了Apache基金會併成為頂級開源專案。訊息佇列就是用於資料生產方和消費方解耦合的中介軟體。顧名思義,主體就是一個佇列的形式收集訊息,資料在消費端按照FIFO的原則被消費。近幾天連續學習兩個Apache的開源專案,今天我們又來學習另外一個Apache頂級開源專案Kafka,可以見得Apache在開源世界的絕對大佬地位。Kafka是一個基於Zookeeper協調的支援分割槽(partition)、多副本(replica)的分散式訊息系統,最大特性是可以實時處理大量資料以滿足各種需求場景,常用於大資料場景訊息流中介軟體;其他訊息佇列有ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ,目前比較主流訊息中介軟體是Kafka、RocketMQ和RabbitMQ。

概述

**本人部落格網站 **IT小神 www.itxiaoshen.com

定義

Apache Kafka官網地址 http://kafka.apache.org/ 最新版本為 3.0.0

Apache Kafka是一個開源的分散式事件流平臺,使用Scala和Java混合編寫,Kafka最初由Linkedin公司開發,2011年貢獻給了Apache基金會併成為頂級開源專案。訊息佇列就是用於資料生產方和消費方解耦合的中介軟體。顧名思義,主體就是一個佇列的形式收集訊息,資料在消費端按照FIFO的原則被消費。

Apache Kafka主要以Java 8、11和17原始碼構建及測試,但Java 8支援從Apache Kafka 3.0開始就已棄用,並將在Apache Kafka 4.0中被移除。

近幾天連續學習兩個Apache的開源專案,今天我們又來學習另外一個Apache頂級開源專案Kafka,可以見得Apache在開源世界的絕對大佬地位。Kafka是一個基於Zookeeper協調的支援分割槽(partition)、多副本(replica)的分散式訊息系統,最大特性是可以實時處理大量資料以滿足各種需求場景,常用於大資料場景訊息流中介軟體;其他訊息佇列有ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ,目前比較主流訊息中介軟體是Kafka、RocketMQ和RabbitMQ。

核心能力

  • 高吞吐量
    • Kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒,即使儲存TB量級訊息也能有穩定的效能。
  • 可擴充套件
    • Kafka叢集支援動態擴充套件,可彈性伸縮將生產叢集規模擴大到上千個代理和數十萬個分割槽。
  • 持久化
    • 將資料流安全地儲存在分散式、持久、容錯的叢集中。
  • 高可用
    • 分散式、分割槽、複製和容錯,支援資料備份,在可用性區域上有效地擴充套件叢集或者跨地理區域連線獨立的叢集。

應用場景

Kafka作為一個訊息中介軟體最基本的用途為解耦、非同步、削峰、訊息通訊,下面為其常用的場景:

  • 日誌收集:可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer。
  • 訊息系統:分散式釋出、訂閱訊息系統,訊息佇列將訊息生產者和訂閱者分離,實現應用解耦和快取訊息等。
  • 流量削峰:在應用前端以訊息佇列接收請求,當請求超過佇列長度,直接不處理重定向至一個靜態頁面,來達到削峰的目的,比如用於秒殺活動,因為秒殺活動高度集中使用者訪問導致流量暴增,可能導致應用系統雪崩。
  • 流式處理:結合主流的流式處理框架如Flink、Spark Streaming、Storm。
  • 使用者活動跟蹤:kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後消費者通過訂閱這些topic來做實時的監控分析,亦可儲存到資料庫。
  • 運營指標:kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告。

安裝

Kafka官方下載地址 http://kafka.apache.org/downloads

我們這裡規劃部署192.168.50.34、192.168.50.35、192.168.50.36共3個節點的Kafka叢集,當然是需要有基本JDK環境,Kafka部署還需依賴ZooKeeper,剛好上一篇文章我們也非常愉快的學習和部署Zookeeper叢集,直接拿來使用,奧利給!

#官網下載
wget --no-check-certificate https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
#解壓
tar -zxvf kafka_2.13-3.0.0.tgz 
#進入kafka目錄下建立logs資料夾
cd kafka_2.13-3.0.0
mkdir logs
#並修改config目錄的server.properties
vim config/server.properties

修改server.properties配置檔案內容如下:

// 作為當前機器在叢集中的唯一標識,和zookeeper的myid性質一樣
broker.id=1
// 監聽地址和埠號,預設是9092
listeners=PLAINTEXT://192.168.50.34:9092
// 訊息存放的目錄
log.dirs=/home/commons/kafka_2.13-3.0.0/logs
// zookeeper叢集地址
zookeeper.connect=192.168.50.34:2181,192.168.50.35:2181,192.168.50.36:2181
#然後將上整個kafka_2.13-3.0.0拷貝到另外兩臺192.168.50.35、192.168.50.36上的相同目錄(我們這裡是指/home/commons),配置檔案主要修改broker.id和listeners,broker.id唯一那我們就順序編號為2和3,listeners中的host就分配與主機地址對應即可,修改完成後我們可以在kafka根目錄下分別啟動三臺kafka server,後臺常駐方式帶上引數 -daemon
./bin/kafka-server-start.sh -daemon config/server.properties
#啟動後可以通過ps或者jps檢查程序資訊和logs下的日誌檔案如server.log
ps -ef | grep kafka
#停止kafka server
./bin/kafka-server-stop.sh

叢集簡單測試

  • 建立、列出所有topic、檢視指定的topic
#3.0版本官方使用-–bootstrap-server替代–-zookeeper,建立名稱為mytopic1的Topic,指定分割槽數為1,分割槽的副本數為1
bin/kafka-topics.sh --create --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --replication-factor 1 --partitions 1 --topic mytopic1
#建立名稱為mytopic2的Topic,指定分割槽數為2,分割槽的副本數為2
bin/kafka-topics.sh --create --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --replication-factor 2 --partitions 2 --topic mytopic2
#列出所有的topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092
#檢視指定topic
$bin/kafka-topics.sh --describe --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1
$bin/kafka-topics.sh --describe --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic2
  • 增加Topic的partition數量和檢視指定檢視 topic 指定分割槽 offset 的最大值或最小值和刪除topic
#將mytopic1的分割槽數量擴充到5個
bin/kafka-topics.sh --alter --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --partitions 5 --topic mytopic1
#time 為 -1 時表示最大值,為 -2 時表示最小值:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic1 --time -1 --broker-list 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --partitions 0 
  • 刪除topic
#刪除mytopic2
kafka-topics.sh --delete --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic2 
  • 生產和消費訊息
#往mytopic1裡生產訊息,這裡可以使用--broker-list也可以使用--bootstrap-server
./bin/kafka-console-producer.sh --broker-list 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1
#從頭開始消費訊息
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --from-beginning
#從尾部開始消費訊息,從尾部開始取資料,必需要指定分割槽:如果需要取指定個數訊息可以加上如--max-messages 5的引數 
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --offset latest --partition 1

傳送多條訊息,最終在該分割槽下收到分發的該分割槽下的訊息

  • 指定消費者組
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 -group mygroup1 --from-beginning
  • 檢視消費者組列表
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --list
  • 檢視消費者組詳情和其他
#消費者組詳情./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --group mygroup1 --describe#刪除消費者組./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --group mygroup1 --delete#平衡Leader,--partition:指定需要重新分配leader的partition編號./bin/kafka-leader-election.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --partition=2 --election-type preferred#此外還有kafka自帶壓測工具./bin/kafka-producer-perf-test.sh --topic mytopic1 --num-records 1000 --record-size 1 --throughput 1000 --producer-props bootstrap.servers=192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 

架構原理面試寶典

Kafka的架構和組成

一個典型的 Kafka 體系架構包括若干 Producer、若干 Broker、若干 Consumer,以及一個 ZooKeeper 叢集,如下圖所示。其中 ZooKeeper 是 Kafka 用來負責叢集元資料的管理、控制器的選舉等操作的。Producer 將訊息傳送到 Broker,Broker 負責將收到的訊息儲存到磁碟中,而 Consumer 負責從 Broker 訂閱並消費訊息。

  • Broker:服務代理節點,Kafka 叢集包含一個或多個伺服器,這種伺服器被稱為 broker。對於 Kafka 而言,Broker 可以簡單地看作一個獨立的 Kafka 服務節點或 Kafka 服務例項。大多數情況下也可以將 Broker 看作一臺 Kafka 伺服器,前提是這臺伺服器上只部署了一個 Kafka 例項。一個或多個 Broker 組成了一個 Kafka 叢集。
  • Topic:每條釋出到 Kafka 叢集的訊息都有一個類別,這個類別被稱為 Topic, Topic屬於邏輯概念,也即是物理上不一樣 Topic 的訊息分開儲存但邏輯上一個 Topic 的訊息雖然保存於一個或多個 broker 上,但使用者只需指定訊息的 Topic 便可生產或消費資料而沒必要關心資料存於何處。
  • Partition:Partition 屬於物理概念,Topic的分割槽,每一個 Topic 包含一個或多個 Partition,分割槽的作用是做負載,提高kafka的吞吐量,同一個topic在不同的分割槽的資料是不重複的,partition的表現形式就是一個一個的資料夾。分割槽主要實現擴充套件和提高併發,以partition作為讀寫單位,可以同時多個生產者消費者讀寫。
  • Replication:每一個分割槽可以有一個主分割槽(Leader)和多個副本(Follower),副本的作用是做備份。當主分割槽故障的時候會選擇一個副本成為新的Leader。在kafka中副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分割槽也只可能存放一個副本(包括自己)。
  • Producer: 生產者,也就是傳送訊息的一方。生產者負責建立訊息,然後將其投遞到 Kafka 中,負責釋出訊息到 Kafka broker。
  • Consumer:消費者,也就是接收訊息的一方,向 Kafka broker 讀取訊息的客戶端,消費者連線到 Kafka 上並接收訊息,進而進行相應的業務邏輯處理。
  • Consumer Group:每一個 Consumer 屬於一個特定的 Consumer Group(可為每一個 Consumer 指定 group name,若不指定 group name 則屬於預設的 group),我們可以將多個消費者組成一個消費者組,在kafka的設計中同一個分割槽的資料只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分割槽的資料,這也是為了提高kafka的吞吐量。
  • Message:每一條傳送的訊息主體,也即是訊息內容。
  • Zookeeper:kafka叢集依賴zookeeper來儲存叢集的的元資訊,來保證系統的可用性,管理broker上下線、Topic的分割槽和副本分配、leader選舉、consumer動態擴縮容和觸發負載均衡,維護消費者關係和每個partition分割槽的消費資訊;broker啟動將ip、埠註冊儲存Zookeeper,儲存topic和partition。

Kafka的四個核心API

  • Producer API:生產者API允許應用程式將一組記錄釋出到一個或多個Kafka Topic中。
  • Consumer AIP:消費者API允許應用程式訂閱一個或多個Topic,並處理向他們傳輸的記錄流。
  • Streams API:流API允許應用程式充當流處理器,從一個或者多個Topic中消費輸入流,並將輸出流生成為一個或多個輸出主題,從而將輸入流有效地轉換為輸出流。
  • Connector API:聯結器API允許構建和執行可重用的生產者或消費者,這些生產者或消費者將Kafka Topic連線到現有的應用程式或資料系統。例如:連線到關係資料庫的聯結器可以捕獲對錶的修改資訊。

訊息佇列的模式?

  • 點對點模式

    • 訊息生產者傳送訊息到訊息佇列中,然後訊息消費者從佇列中取出並且消費訊息,訊息被消費後,佇列中不在儲存。所以訊息消費者不可能消費到已經被消費的訊息;佇列支援存在多個消費者,但是對於一個訊息而言,只會有一個消費者可以消費;如果想發給多個消費者,則需要多次傳送該條訊息。
  • 釋出/訂閱模式(一對多,消費者消費資料之後不會清除訊息)

    • 訊息生產者將訊息釋出到topic中,同時有多個訊息消費者(訂閱)消費該訊息,和點對點的方式不同,釋出到topic的訊息會被所有的訂閱者消費;但是訊息佇列不是儲存系統資料保留是期限的,例如Kafka預設是7天;kafka就是這種模式的,分為兩種方式
      • 一種是是消費者去主動去消費(拉取)訊息,而不是生產者推送訊息給消費者。
      • 另外一種就是生產者主動推送訊息給消費者,類似公眾號。

Kafka的儲存模型

  • Kafka中訊息是以Topic進行分類的,生產者生產訊息,消費者消費訊息,都是面向Topic的。而Topic在物理上的儲存是分割槽儲存的,即按Partition分散式儲存。
  • 每個Partition中的資料又是順序寫入log檔案中進行儲存。這樣會出現分割槽log檔案過大,導致的讀取效能下降的問題。所以Kafka將log檔案切分成了segment每個segment由 .log資料儲存檔案 和 .index索引檔案 和 .timeindex檔案組成。其中.log用於儲存訊息本身的資料內容,.index儲存訊息在檔案中的位置(包括訊息的邏輯offset和物理儲存offset),.timeindex*儲存訊息建立時間和對應邏輯地址的對映關係。詳細的結構如下圖所示:
  • 每個log檔案和index檔案的命名就是 檔案中起始資料的偏移量,一個segment中由index定位到對應log檔案中執行資料的原理如下圖:
  • Kafka將分割槽拆分成多個段是為了控制儲存的檔案大小,如果整個分割槽只儲存為一個檔案,那隨著分割槽裡訊息的增多,檔案也將越來越大,最後不可控制。而如果每個訊息都儲存為一個檔案,那檔案數量又將變得巨大,同樣容易失去控制。所以kafka採用段設計方式,控制了每個檔案的大小和檔案的數量。同時可以很方便地通過作業系統mmap機制對映到記憶體中,提高寫入和讀取效率。還有另一個好處是當系統要清除過期資料時可以直接將過期的段檔案刪除即可。但是這裡也會有一個問題,如果每個訊息都要在index檔案中儲存位置資訊,那麼index檔案也很容易變得很大,這樣又會減弱上文所說的好處。所以在kafka中,index設計為稀疏索引來降低index的檔案大小,這樣,index檔案儲存的實際內容為:該段訊息在訊息佇列中的相對offset和在log檔案中的物理偏移量對映的稀疏記錄。那麼多少條訊息會在index中儲存一條記錄呢?這個可以通過系統配置來進行設定。索引記錄固定為8個位元組大小,分別為4個位元組的相對offset(訊息在partition中全域性offset減去該segment的起始offset),4個位元組的訊息具體儲存檔案的物理偏移量。
  • index檔案中根據需要查詢的offset根據儲存起始偏移量(檔名)的相對偏移量,定位到log中資料真實的位置。(類似HBase中採用LSM日誌結構合併樹設計思想,順序寫入HFile,磁頭定址次數少,順序讀/寫效能好),Kafka不會在消費者拉取完訊息後馬上就清理訊息,而是會儲存段檔案一段時間,直到其過期再標記為可清理,由後臺程式定期進行清理。這種機制使得消費者可以重複消費訊息,滿足更靈活的需求
  • 查詢機制:建立在offset為有序的基礎上,利用segment+有序offset+稀疏索引+二分查詢+順序查詢等多種手段來高效的查詢資料。
  • 儲存策略:無論訊息是否被消費,kafka都會儲存所有的訊息。那對於舊資料有什麼刪除策略呢?
    • 基於時間,預設配置是168小時(7天)。
    • 基於大小,預設配置是1073741824。
  • 需要注意的是kafka讀取特定訊息的時間複雜度是O(1),所以刪除過期的檔案並不會提高kafka的效能。

說說Kafka消費者端使用的技術點?

  • 消費模式

    • Kafka的消費模式是poll模式,就是每個消費者按照自己的消費能力在Broker中讀取資料。從 Broker 主動拉取資料,需要維護一個長輪詢,針對這一點, Kafka 的消費者在消費資料時會傳入一個時長引數 timeout,如果當前沒有資料可供消費,Consumer 會等待一段時間之後再返回,使用這個機制解決了沒有新的資料消費者就在迴圈中空轉開銷問題。
  • 分割槽分配策略

    • 如果Consumer組中Consumer數量>Topic Partition數量根據Random-Robin策略(實際是個輪詢)或者Range(劃分範圍,一個範圍內的給到一個消費者)對Partition進行分配。此後這個消費者組中的消費者消費的Partition就被定下來了。所以在實際的應用中,建議消費者組的consumer的數量與partition的數量一致
  • Offset 維護

    • comsumer需要記錄已經消費到的偏移量以便故障或者後續繼續消費。在0.9版本後Kafka將這些資訊都儲存在一個內建的Topic(__comsumer_offset),預設5s自動提交一次,而此前的版本是儲存在ZK中的,這樣改進目的是:
      • 優化效率,減輕ZK壓力。
      • 可以自己實現偏移量維護。

Kafka傳送資料的流程和訊息結構?

  • Producer在寫入資料的時候永遠的找leader,不會直接將資料寫入follower,傳送的流程如下圖所示。
  • 訊息寫入leader後,follower是主動的去leader進行同步的。producer採用push模式將資料釋出到broker,每條訊息追加到分割槽中,順序寫入磁碟,所以保證同一分割槽內的資料是有序的。
  • Message結構:上面說到log檔案就實際是儲存message的地方,我們在producer往kafka寫入的也是一條一條的message,將生產者傳送資料封裝為ProducerRecord物件,包括topic、partition、時間戳、key、value、header等;封裝訊息結構包含訊息體、訊息大小、offset、壓縮型別等。
    • offset:offset是一個佔8byte的有序id號,它可以唯一確定每條訊息在parition內的位置。
    • 訊息大小:訊息大小佔用4byte,用於描述訊息的大小。
    • 訊息體:訊息體存放的是實際的訊息資料(被壓縮過),佔用的空間根據具體的訊息而異。

Kafka資料分割槽的目的和原則?

  • 資料會寫入到不同的分割槽,分割槽的主要目的是:
    • 增加讀寫的吞吐量。
    • 方便擴充套件,因為一個topic可以有多個partition,所以我們可以通過擴充套件機器去輕鬆的應對日益增長的資料量。
    • 提高併發,以partition為讀寫單位,可以多個消費者同時消費資料,提高了訊息的處理效率。
  • 分割槽原則:如果某個topic有多個partition,kafka的producer選擇分割槽情況如下:
    • partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
    • 如果沒有指定partition,但是設定了資料的key,則會根據key的值hash出一個partition。
    • 如果既沒指定partition,又沒有設定key,則會通過random-robin演算法輪詢得到分割槽(第一個得到一個隨機數,後續的在此基礎上自增)。

Kafka如何保證訊息不丟失或者可靠性?

為保證 Producer 傳送的資料,能可靠地傳送到指定的 Topic,Topic 的每個 Partition 收到 Producer 傳送的資料後,都需要向 Producer 傳送 ACK 確認收到,如果 Producer 收到 ACK,就會進行下一輪的傳送,否則重新發送資料。

  • 副本資料同步策略

    • 何時傳送 ACK?確保有 Follower 與 Leader 同步完成,Leader 再發送 ACK,這樣才能保證 Leader 掛掉之後,能在 Follower 中選舉出新的 Leader 而不丟資料。
    • 多少個 Follower 同步完成後傳送 ACK?全部 Follower 同步完成,再發送 ACK。
  • ISR機制

    • 如果採用第二種方案,所有 Follower 完成同步,Producer 才能繼續傳送資料,設想有一個 Follower 因為某種原因出現故障,那 Leader 就要一直等到它完成同步。這個問題怎麼解決?
      • Leader維護了一個動態的 in-sync replica set(ISR,同步副本的作用),只需要這個列表的中的follower和leader同步,當ISR中的follower完成資料的同步之後,leader就會給生產者傳送ack。
      • 當 ISR 集合中的 Follower 完成資料的同步之後,Leader 就會給 Follower 傳送 ACK。
      • 如果 Follower 長時間未向 Leader 同步資料,則該 Follower 將被踢出 ISR 集合,該時間閾值由 replica.lag.time.max.ms 引數設定。Leader 發生故障後,就會從 ISR 中選舉出新的 Leader。
      • Leader和follower(ISR)落盤才會返回ack,會有資料重複現象,如果在leader已經寫完成,且follower同步完成,但是在返回ack的出現故障,則會出現資料重複現象;極限情況下,這個也會有資料丟失的情況,比如follower和leader通訊都很慢,所以ISR中只有一個leader節點,這個時候,leader完成落盤,就會返回ack,如果此時leader故障後,就會導致丟失資料
  • ACK 應答機制

    • 對於某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丟失,所以沒必要等 ISR 中的 Follower 全部接受成功。
    • 為保證生產者傳送的資料能可靠的傳送到指定的topic,Kafka在Topic的每個partition收到生產者傳送的資料後,都需要向生產者傳送ack(確認收到),如果生產者收到ack,就會進行下一輪的傳送,否則重新發送資料。也即是核心是通過ACK應答機制,在生產者向佇列寫入資料的時候可以設定引數來確認是否kafka接收到資料,這個引數可設定的值為如下:
      • 0代表producer往叢集傳送資料不需要等到叢集的返回,不確保訊息傳送成功。安全性最低但是效率最高。
      • 1代表producer往叢集傳送資料只要leader應答就可以傳送下一條,只確保leader傳送成功。
      • -1代表all代表producer往叢集傳送資料需要所有的follower都完成從leader的同步才會傳送下一條,確保leader傳送成功和所有的副本都完成備份。安全性最高,但是效率最低,這裡的all是指ISR列表中。
  • 如果往不存在的topic寫資料,kafka會自動建立topic,分割槽和副本的數量根據預設配置都是1。

  • 生產者:同步傳送,或者通過傳送的callback來實現,producer.send(msg,callback)。

  • 消費者:需要考慮先更新offset還是先做消費處理,先做消費則可能引出重複消費。

Kafka Broker節點故障時處理細節?

  • 首先了解下LEO(每個副本最大的 Offset)和HW(消費者能見到的最大的 Offset,ISR 佇列中最小的 LEO)。
  • Follower 故障:Follower 發生故障後會被臨時踢出 ISR 集合,待該 Follower 恢復後,Follower 會 讀取本地磁碟記錄的上次的 HW(high watermark,ISR中所有副本中結尾offset的最小值),並將 log 檔案高於 HW 的部分擷取掉,從 HW 開始向 Leader 進行同步資料操作。等該 Follower 的 LEO 大於等於該 Partition 的 HW,即 Follower 追上 Leader 後,就可以重新加入 ISR 了。
  • Leader 故障:Leader 發生故障後,會從 ISR 中選出一個新的 Leader,之後,為保證多個副本之間的資料一致性,其餘的 Follower 會先將各自的 log 檔案高於 HW 的部分資料截掉,重新向新的leader同步。
  • 注意:這隻能保證副本之間的資料一致性,並不能保證資料不丟失或者不重複。

Kafka如何保證訊息不重複消費?

  • 將伺服器的 ACK 級別設定為 -1,可以保證 Producer 到 Server 之間不會丟失資料,即 At Least Once 語義,但是會出現資料重複(at least once)。
  • 將伺服器 ACK 級別設定為 0,可以保證生產者每條訊息只會被髮送一次,即 At Most Once 語義,但是不能保證資料不丟失(at most once)。
  • 對於一些非常重要的資訊,比如交易資料,下游資料消費者要求資料既不重複也不丟失,即 Exactly Once 語義。在0.11版本後,引入冪等性解決kakfa叢集內部的資料重複,在0.11版本之前,在消費者處自己做處理:Producer 不論向 Server 傳送多少重複資料,Server 端都只會持久化一條。即:At Least Once + 冪等性 = Exactly Once。
  • 如果啟用了冪等性,只需要將 Producer 的引數中 enable.idompotence 設定為 true 即可,且ack預設就是-1,kafka就會為每個生產者分配一個pid,併為每條訊息分配Sequence Number,如果pid、partition、seqnumber三者一樣,也即是對 <PID,Partition,SeqNumber> 做快取,當具有相同主鍵的訊息提交時,Broker kafka認為是重複資料只會持久化一條;但是如果生產者掛掉後,也會出現有資料重複的現象;所以冪等性解決在單次會話的單個分割槽的資料重複,但是在分割槽間或者跨會話的是資料重複的是無法解決的。

Kafka如何保證消費資料的一致性?

  • 消費資料的一致性主要通過HW來保證。
  • LEO:指每個follower的最大的offset。
  • HW(高水位):指消費者能見到的最大的offset,LSR佇列中最小的LEO,也就是說消費者只能看到1~6的資料,後面的資料看不到,也消費不了,避免leader掛掉後,比如當前消費者消費8這條資料後,leader掛了,此時比如f2成為leader,f2根本就沒有9這條資料,那麼消費者就會報錯,所以設計了HW這個引數,只暴露最少的資料給消費者,避免上面的問題。

Kafka如何處理訊息積壓?

增加新topic,增大分割槽,將原來topic資料消費轉移到這個新topic,然後開多個消費者去處理新topic。

Kafka如何保證訊息順序消費?

  • Kafka不能保證訊息的全域性有序,只能保證訊息在partition內有序,因為消費者消費訊息是在不同的partition中隨機的。如要保證可以使用一個topic、一個partition、一個消費者且內部單執行緒處理實現訊息順序消費,但一般不建議這樣做消費效能較低。
  • 另外思路是採用N個queue,將相同key的資料寫入同一個queue,N個執行緒每個執行緒處理一個queue。

說說Kafka的事務?

Kafka事務在0.11版本後引入,主要解決的是Producer在Exactly Once語義上跨分割槽跨會話的精準一次寫入,要麼成功要麼失敗。

  • Producer事務(斷點續傳)

    • 為了實現跨分割槽跨會話的事務,每個Producer需要一個全域性唯一的Transaction ID,並將Producer獲得的PID和Transaction ID繫結。這樣當Producer重啟後就可以通過正在進行的Transaction ID獲得原來的PID。
    • 為了管理Transaction,Kafka引入了一個新的元件Transaction Coordinator(事務排程器)。Producer就是通過和Transaction Coordinator互動獲得繫結的PID和對應的任務狀態。Transaction Coordinator還負責將事務所有寫入Kafka的一個內部Topic,這樣即使整個服務重啟,由於事務狀態得到儲存,進行中的事務狀態可以得到恢復,從而繼續進行。
  • 注意:Kakfa事務回滾不會直接去刪除訊息,而是將訊息對Consumer不可見。

  • Consumer事務

    • Kafka對Consumer的事務較弱,一般是通過Consumer端自己實現精確一次性消費(將消費過程和提交offset作為一個原子操作實現)。

Kafka如何通過ZooKeeper來進行選舉和狀態更新?

  • 首先Kafka叢集啟動時,會從Broker中選舉一個Controller(分散式鎖實現搶先建立臨時節點的broker當選),負責管理叢集所有Broker的上下線(監聽zk的/brokers/ids/節點)、所有Topic分割槽副本分配、leader選舉等工作。
  • 當某個Broker掛了以後,Controller監聽到臨時節點/brokers/ids/中的變化,從ZK各個分割槽狀態資訊中獲取ISR(此時去除了掛掉節點所有的Partition,失去leader的Partition重新選舉leader),並完成ZooKeeper各個分割槽狀態更新。

Kafka高效讀寫的保證?

  • Kafka高效能設計包括多分割槽、順序讀寫、page cache、預讀、記憶體對映-零拷貝、無鎖offset 管理機制、NIO、壓縮、批量讀寫、高效能序列化(二進位制)等。

  • 順序讀寫:如同HBase順序寫HFile檔案一樣,Kafka順序寫log檔案寫入磁碟效率極高(據Kafka官網文件說比隨機寫快6000倍)。

  • 使用作業系統的Page Cache來快取要寫入的資料,好處在於:

    • 寫入前可以做一些優化,提高磁碟寫入效能。
    • 快取也可以用於資料被讀取,當資料寫入與讀取速率相近的情況下,可以直接記憶體讀取。
    • Page Cache非JVM記憶體,不會影響JVM,導致GC的增加。同時,Kafka節點宕機,資料還在此機器快取。
  • 零拷貝機制

    • 正常情況下,先把資料讀到核心空間,在從核心空間把資料讀到使用者空間,然後在調作業系統的io介面寫到核心空間,最終在寫到硬碟中。磁碟 -> 核心page cache -> 應用快取 -> 核心page cache -> 磁碟的過程。
    • 如果資料只是單純的拷貝而不需要修改,那麼拷貝到應用快取的步驟完全是多餘的。所以Kafka利用了作業系統提供的零拷貝機制,來減少不需要的系統呼叫和資料拷貝次數,直接在核心空間流轉io流,所以kafka的效能非常高。

本篇主要是以Kafka的基礎理論、架構原理和麵試為主,後續我們再分享Kafka有關 API 以及事務、攔截器、監控等高階篇,已達到Kafka實戰程式設計和應用的目的。