訊息佇列Kafka高可靠性原理深度解讀上篇
1 概述
Kakfa起初是由LinkedIn公司開發的一個分散式的訊息系統,後成為Apache的一部分,它使用Scala編寫,以可水平擴充套件和高吞吐率而被廣泛使用。目前越來越多的開源分散式處理系統如Cloudera、Apache Storm、Spark等都支援與Kafka整合。
Kafka憑藉著自身的優勢,越來越受到網際網路企業的青睞,唯品會也採用Kafka作為其內部核心訊息引擎之一。Kafka作為一個商業級訊息中介軟體,訊息可靠性的重要性可想而知。如何確保訊息的精確傳輸?如何確保訊息的準確儲存?如何確保訊息的正確消費?這些都是需要考慮的問題。本文首先從Kafka的架構著手,先了解下Kafka的基本原理,然後通過對kakfa的
2 Kafka體系架構
如上圖所示,一個典型的Kafka體系架構包括若干Producer(可以是伺服器日誌,業務資料,頁面前端產生的page view等等),若干broker(Kafka支援水平擴充套件,一般broker數量越多,叢集吞吐率越高),若干Consumer (Group),以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。Producer使用push(推)模式將訊息釋出到broker,Consumer使用pull(拉)模式從broker訂閱並消費訊息。
名詞解釋:
2.1 Topic & Partition
一個topic可以認為一個一類訊息,每個topic將被分成多個partition,每個partition在儲存層面是append log檔案。任何釋出到此partition的訊息都會被追加到log檔案的尾部,每條訊息在檔案中的位置稱為offset(偏移量),offset為一個long型的數字,它唯一標記一條訊息。每條訊息都被append到partition中,是順序寫磁碟,因此效率非常高(經驗證,順序寫磁碟效率比隨機寫記憶體還要高,這是Kafka高吞吐率的一個很重要的保證)。
每一條訊息被髮送到broker中,會根據partition規則選擇被儲存到哪一個partition。如果partition規則設定的合理,所有訊息可以均勻分佈到不同的partition裡,這樣就實現了水平擴充套件。(如果一個topic對應一個檔案,那這個檔案所在的機器I/O將會成為這個topic的效能瓶頸,而partition解決了這個問題)。在建立topic時可以在$KAFKA_HOME/config/server.properties中指定這個partition的數量(如下所示),當然可以在topic建立之後去修改partition的數量。
1 2 3 4 |
#
The default number
of log partitions per topic. More partitions allow greater
#
parallelism for consumption,
but this will
also result in more files across
#
the brokers.
num.partitions= 3
|
在傳送一條訊息時,可以指定這個訊息的key,producer根據這個key和partition機制來判斷這個訊息傳送到哪個partition。partition機制可以通過指定producer的partition.class這一引數來指定,該class必須實現kafka.producer.Partitioner介面。
有關Topic與Partition的更多細節,可以參考下面的“Kafka檔案儲存機制”這一節。
3 高可靠性儲存分析
Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。通過調節其副本相關引數,可以使得Kafka在效能和可靠性之間運轉的遊刃有餘。Kafka從0.8.x版本開始提供partition級別的複製,replication的數量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。
這裡先從Kafka檔案儲存機制入手,從最底層瞭解Kafka的儲存細節,進而對其的儲存有個微觀的認知。之後通過Kafka複製原理和同步方式來闡述巨集觀層面的概念。最後從ISR,HW,leader選舉以及資料可靠性和永續性保證等等各個維度來豐富對Kafka相關知識點的認知。
3.1 Kafka檔案儲存機制
Kafka中訊息是以topic進行分類的,生產者通過topic向Kafka broker傳送訊息,消費者通過topic讀取資料。然而topic在物理層面又能以partition為分組,一個topic可以分成若干個partition,那麼topic以及partition又是怎麼儲存的呢?partition還可以細分為segment,一個partition物理上由多個segment組成,那麼這些segment又是什麼呢?下面我們來一一揭曉。
為了便於說明問題,假設這裡只有一個Kafka叢集,且這個叢集只有一個Kafka broker,即只有一臺物理機。
在這個Kafka broker中配置($KAFKA_HOME/config/server.properties中)log.dirs=/tmp/kafka-logs,以此來設定Kafka訊息檔案儲存目錄;
與此同時建立一個topic:topic_zzh_test;
partition的數量為4($KAFKA_HOME/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic topic_vms_test –replication-factor 4)
。那麼我們此時可以在/tmp/kafka-logs目錄中可以看到生成了4個目錄:
1 2 3 4 |
drwxr-xr-x 2 root
root 4096 Apr 10 16 : 10 topic_zzh_test- 0
drwxr-xr-x 2 root
root 4096 Apr 10 16 : 10 topic_zzh_test- 1
drwxr-xr-x 2 root
root 4096 Apr 10 16 : 10 topic_zzh_test- 2
drwxr-xr-x 2 root
root 4096 Apr 10 16 : 10 topic_zzh_test- 3
|
在Kafka檔案儲存中,同一個topic下有多個不同的partition,每個partiton為一個目錄;
partition的名稱規則為:topic名稱+有序序號,第一個序號從0開始計,最大的序號為partition數量減1,
partition是實際【物理上】的概念,而topic是【邏輯上】的概念。
上面提到partition還可以細分為segment,這個segment又是什麼?如果就以partition為最小儲存單位,我們可以想象當Kafka producer不斷髮送訊息,必然會引起partition檔案的無限擴張,這樣對於訊息檔案的維護以及已經被消費的訊息的清理帶來嚴重的影響,所以這裡以segment為單位又將partition細分。每個partition(目錄)相當於一個巨型檔案被平均分配到多個大小相等的segment(段)資料檔案中(每個segment 檔案中訊息數量不一定相等)這種特性也方便old segment的刪除,即方便已被消費的訊息的清理,提高磁碟的利用率。每個partition只需要支援順序讀寫就行,segment的檔案生命週期由服務端配置引數(log.segment.bytes,log.roll.{ms,hours}等若干引數)決定。
segment檔案由兩部分組成,分別為“.index”檔案和“.log”檔案,分別表示為segment索引檔案和資料檔案。這兩個檔案的命令規則為:partition全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值,數值大小為64位,20位數字字元長度,沒有數字用0填充,如下:
1 2 3 4 5 6 |
00000000000000000000 .index
00000000000000000000 .log
00000000000000170410 .index
00000000000000170410 .log
00000000000000239430 .index
00000000000000239430 .log
|
以上面的segment檔案為例,展示出segment:00000000000000170410的“.index”檔案和“.log”檔案的對應的關係,如下圖:
如上圖,“.index”索引檔案儲存大量的元資料,“.log”資料檔案儲存大量的訊息,索引檔案中的元資料指向對應資料檔案中message的物理偏移地址。其中以“.index”索引檔案中的元資料[3, 348]為例,在“.log”資料檔案表示第3個訊息,即在全域性partition中表示170410+3=170413個訊息,該訊息的物理偏移地址為348。
那麼如何從partition中通過offset查詢message呢?
以上圖為例,讀取offset=170418的訊息,首先查詢segment檔案,其中00000000000000000000.index為最開始的檔案,第二個檔案為00000000000000170410.index(起始偏移為170410+1=170411),而第三個檔案為00000000000000239430.index(起始偏移為239430+1=239431),所以這個offset=170418就落到了第二個檔案之中。其他後續檔案可以依次類推,以其實偏移量命名並排列這些檔案,然後根據二分查詢法就可以快速定位到具體檔案位置。其次根據00000000000000170410.index檔案中的[8,1325]定位到00000000000000170410.log檔案中的1325的位置進行讀取。
要是讀取offset=170418的訊息,從00000000000000170410.log檔案中的1325的位置進行讀取,那麼怎麼知道何時讀完本條訊息,否則就讀到下一條訊息的內容了?
這個就需要聯絡到訊息的物理結構了,訊息都具有固定的物理結構,包括:offset(8 Bytes)、訊息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等欄位,可以確定一條訊息的大小,即讀取到哪裡截止。
3.2 複製原理和同步方式
Kafka中topic的每個partition有一個預寫式的日誌檔案,雖然partition可以繼續細分為若干個segment檔案,但是對於上層應用來說可以將partition看成最小的儲存單元(一個有多個segment檔案拼接的“巨型”檔案),每個partition都由一些列有序的、不可變的訊息組成,這些訊息被連續的追加到partition中。
上圖中有兩個新名詞:HW和LEO。這裡先介紹下LEO,LogEndOffset的縮寫,表示每個partition的log最後一條Message的位置。HW是HighWatermark的縮寫,是指consumer能夠看到的此partition的位置,這個涉及到多副本的概念,這裡先提及一下,下節再詳表。
為了提高訊息的可靠性,Kafka 【每個】 topic 的 partition 有 N 個 副本(replicas)--------(副本個數)
其中 N (大於等於1) 是topic的 複製因子(replica fator)的個數。 -----------------------(複製因子個數),
Kafka通過多副本機制實現故障自動轉移,當Kafka叢集中一個broker失效情況下仍然保證服務可用。在Kafka中發生複製時確保partition的日誌能有序地寫到其他節點上,
N個replicas中,其中一個replica為leader,其他都為follower,
leader處理partition的所有讀寫請求,與此同時,follower會被動定期地去複製leader上的資料。
如下圖所示,例如 Kafka叢集中有4個broker, 某topic有3個partition, 且複製因子即副本個數也為3:
Kafka提供了資料複製演算法保證,如果leader發生故障或掛掉,一個新leader被選舉並被接受客戶端的訊息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader,或者說follower追趕leader資料。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步佇列,具體可參考下節)中所有follower滯後的狀態。當producer傳送一條訊息到broker後,leader寫入訊息並複製到所有follower。訊息提交之後才被成功複製到所有的同步副本。訊息複製延遲受最慢的follower限制,重要的是快速檢測慢副本,如果follower“落後”太多或者失效,leader將會把它從ISR中刪除。
3.3 ISR
上節我們涉及到ISR (In-Sync Replicas),這個是指副本同步佇列。副本數對Kafka的吞吐率是有一定的影響,但極大的增強了可用性。預設情況下Kafka的replica數量為1,即每個partition都有一個唯一的leader,為了確保訊息的可靠性,通常應用中將其值(由broker的引數offsets.topic.replication.factor指定)大小設定為大於1,比如3。 所有的副本(replicas)統稱為Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步資料有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支援replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。
Kafka 0.10.x版本後移除了replica.lag.max.messages引數,只保留了replica.lag.time.max.ms作為ISR中副本管理的引數。為什麼這樣做呢?replica.lag.max.messages表示當前某個副本落後leaeder的訊息數量超過了這個引數的值,那麼leader就會把follower從ISR中刪除。假設設定replica.lag.max.messages=4,那麼如果producer一次傳送至broker的訊息數量都小於4條時,因為在leader接受到producer傳送的訊息之後而follower副本開始拉取這些訊息之前,follower落後leader的訊息數不會超過4條訊息,故此沒有follower移出ISR,所以這時候replica.lag.max.message的設定似乎是合理的。但是producer發起瞬時高峰流量,producer一次傳送的訊息超過4條時,也就是超過replica.lag.max.messages,此時follower都會被認為是與leader副本不同步了,從而被踢出了ISR。但實際上這些follower都是存活狀態的且沒有效能問題。那麼在之後追上leader,並被重新加入了ISR。於是就會出現它們不斷地剔出ISR然後重新迴歸ISR,這無疑增加了無謂的效能損耗。而且這個引數是broker全域性的。設定太大了,影響真正“落後”follower的移除;設定的太小了,導致follower的頻繁進出。無法給定一個合適的replica.lag.max.messages的值,故此,新版本的Kafka移除了這個引數。
注:ISR中包括:leader和follower。
上面一節還涉及到一個概念,即HW。HW俗稱高水位,HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO作為HW,consumer最多隻能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀態。對於leader新寫入的訊息,consumer不能立刻消費,leader會等待該訊息被所有ISR中的replicas同步後更新HW,此時訊息才能被consumer消費。這樣就保證瞭如果leader所在的broker失效,該訊息仍然可以從新選舉的leader中獲取。對於來自內部broKer的讀取請求,沒有HW的限制。
下圖詳細的說明了當producer生產訊息至broker後,ISR以及HW和LEO的流轉過程:
由此可見,Kafka的複製機制既不是完全的同步複製,也不是單純的非同步複製。事實上,同步複製要求所有能工作的follower都複製完,這條訊息才會被commit,這種複製方式極大的影響了吞吐率。而非同步複製方式下,follower非同步的從leader複製資料,資料只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有複製完,落後於leader時,突然leader宕機,則會丟失資料。而Kafka的這種使用ISR的方式則很好的均衡了確保資料不丟失以及吞吐率。
Kafka的ISR的管理最終都會反饋到Zookeeper節點上。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個地方會對這個Zookeeper的節點進行維護:
-
Controller來維護:Kafka叢集中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配partition之類的管理任務。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節點中。同時發起LeaderAndIsrRequest通知所有的replicas。
-
leader來維護:leader有單獨的執行緒定期檢測ISR中follower是否脫離ISR, 如果發現ISR變化,則會將新的ISR的資訊返回到Zookeeper的相關節點中。
3.4 資料可靠性和永續性保證
當producer向leader傳送資料時,可以通過request.required.acks引數來設定資料可靠性的級別:
-
1(預設):這意味著producer在ISR中的leader已成功收到的資料並得到確認後傳送下一條message。如果leader宕機了,則會丟失資料。
-
0:這意味著producer無需等待來自broker的確認而繼續傳送下一批訊息。這種情況下資料傳輸效率最高,但是資料可靠性確是最低的。
-
-1:producer需要等待ISR中的所有follower都確認接收到資料後才算一次傳送完成,可靠性最高。但是這樣也不能保證資料不丟失,比如當ISR中只有leader時(前面ISR那一節講到,ISR中的成員由於某些情況會增加也會減少,最少就只剩一個leader),這樣就變成了acks=1的情況。
相關推薦
訊息佇列Kafka高可靠性原理深度解讀上篇
1 概述 Kakfa起初是由LinkedIn公司開發的一個分散式的訊息系統,後成為Apache的一部分,它使用Scala編寫,以可水平擴充套件和高吞吐率而被廣泛使用。目前越來越多的開源分散式處理系統如Cloudera、Apache Storm、Spark等都支援與Kafka整合。 Kafka憑藉著自身
分散式訊息佇列kafka原理簡介
kafka原理簡介 Kafka是由LinkedIn開發的一個分散式的訊息系統,使用Scala編寫,它以可水平擴充套件和高吞吐率而被廣泛使用。目前越來越多的開源分散式處理系統如Cloudera、Apache Storm、Spark都支援與Kafka整合
如何保證訊息佇列的高可用
RabbitMQ的高可用性 RabbitMQ是基於主從做高可用性的,有三種模式:單機模式,普通叢集模式,映象叢集模式 單機模式: demo級別 普通叢集模式: 在多臺機器上啟動rabbitmq例項,每個機器啟動一個。 但是你建立的
基於Docker搭建分散式訊息佇列Kafka
本文基於Docker搭建一套單節點的Kafka訊息佇列,Kafka依賴Zookeeper為其管理叢集資訊,雖然本例不涉及叢集,但是該有的元件都還是會有,典型的kafka分散式架構如下圖所示。本例搭建的示例包含Zookeeper + Kafka + Kafka-manger mark &
訊息佇列處理高併發
用mq來將耗時比較長或者耗費資源的請求排隊,非同步處理,減輕伺服器壓力增加穩定性。如果是高併發的實時請求,我個人覺得不適用這個方案。如果是為了高併發,我覺得應該朝解決高併發的方向考慮。叢集、分散式、動靜分離、資料庫讀寫分離之類的。web的話,只能客戶端頁面輪訓處理結果。因為,據我個人瞭解啊,現
Spark Streaming實時流處理筆記(4)—— 分散式訊息佇列Kafka
1 Kafka概述 和訊息系統類似 1.1 訊息中介軟體 生產者和消費者 1.2 Kafka 架構和概念 producer:生產者(生產饅頭) consumer:消費者(吃饅頭) broker:籃子 topic : 主題,給饅頭帶一個標籤,(
Kafka工作流程-KafkaCluster和Kafka 高可靠性儲存
1.KafkaCluster 在使用 Kafka 低階消費者時,可以通過 KafkaCluster 類實現 offset 向 ZooKeeper 的提交 和獲取。 Kafka 協議非常簡單,只有六個核心客戶端請求 API: 元資料(Met
kafka(01)——分散式訊息佇列kafka概述
kafka是什麼? Apache Kafka是一個開源訊息系統,由Scala寫成。是由Apache軟體基金會開發的一個開源訊息系統專案。 Kafka最初是由LinkedIn開發,並於2011年初開源。 該專案的目標是為處理實時資料提供一個統一、高通量、低等待的
【圖文詳細 】Kafka訊息佇列——Kafka 的各種 Shell 操作
1、啟動叢集每個節點的程序: 2、建立 topic 3、檢視已經建立的所有 kafka topic 4、檢視某個指定的 kafka topic 的詳細資訊: 4、開啟生產者模擬生成資料:
【圖文詳細 】Kafka訊息佇列——Kafka 的各種 API 操作
7.1、Kafka 的 API 分類 1、The Producer API 允許一個應用程式釋出一串流式的資料到一個或者多個 Kafka Topic。 2、The Consumer API 允許一個應用程式訂閱一個或多個 Topi
【圖文詳細 】Kafka訊息佇列——kafka 叢集部署
5.1、Kafka 初體驗 單機 Kafka 試玩 官網網址:http://kafka.apache.org/quickstart 中文官網:http://kafka.apachecn.org/quickstart.html 5.2、叢集部署的基本流程總結&n
【圖文詳細 】Kafka訊息佇列——Kafka的核心元件
4.1、kafka的核心元件概述 Kafka 是 LinkedIn 用於日誌處理的分散式訊息佇列,同時支援離線和線上日誌處理。 Kafka 對訊息儲存時根據 Topic 進行歸類: 傳送訊息者就是 Producer,訊息的釋出描述為 Producer
【圖文詳細 】Kafka訊息佇列——Kafka的應用場景
3.1、訊息系統 Kafka 很好地替代了傳統的 message broker(訊息代理)。Message Brokers 可用於各種場合(如 將資料生成器與資料處理解耦,緩衝未處理的訊息等)。與大多數訊息系統相比,Kafka 擁有 更好的吞吐量、內建分割槽、具有複製和容錯的功能,這使它成為
【圖文詳細 】Kafka訊息佇列——Kafka的優點
2、Kafka的優點 1、解耦:在專案啟動之初來預測將來專案會碰到什麼需求,是極其困難的。訊息系統在處理過程中間 插入了一個隱含的、基於資料的介面層,兩邊的處理過程都要實現這一介面。這允許你獨立 的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。 2、冗餘:有些
【圖文詳細 】Kafka訊息佇列——Kafka是什麼
1.1、Kafka的概述 在流式計算中,Kafka一般用於資料的快取,Storm通過消費Kafka的資料進行計算。 經典架構:Flume + Kafka + Storm/SparkStreaming + Redis Apache Kafka最初由LinkedIn開發的基於
分散式訊息佇列Kafka
概述 Kafka是Apache旗下,由LinkedIn公司開發,Scala語言編寫的訊息佇列。Kafka是一種分散式的,基於釋出/訂閱的訊息系統,能夠高效並實時的吞吐資料,以及通過分散式叢集及資料複製冗餘機制(副本冗餘機制)實現資料的安全。 特點 1 高吞吐量 Kaf
分散式訊息佇列-Kafka
我們的資料,常見的情況下,是源源不斷的產生的,有時候會產生大量的資料,但是資料的接受方,可能一下無法處理那麼大的併發量。所以,一般採用的方式是訊息佇列的方式。 在大資料的領域裡,我們可能不僅僅要考慮資料的平穩過渡問題,我們還要考慮不同的型別的資料的接受和處理問題
如何保證訊息佇列的高可用和冪等性以及資料丟失,順序一致性
如何保證訊息佇列的高可用和冪等性以及資料丟失,順序一致性 <!-- more --> RabbitMQ的高可用性 RabbitMQ是比較有代表性的,因為是基於主從做高可用性的,我們就以他為例子講解第一種MQ的高可用性怎麼實現。 rabbitmq有三種模式: 單機模式 普通叢集模
PHP中利用redis實現訊息佇列處理高併發請求
將請求存入redis 為了模擬多個使用者的請求,使用一個for迴圈替代 //redis資料入隊操作 $redis = new Redis(); $redis->connect('127.0.0.1',6379); for($i=0;$i<50;$i++){
PHP中利用redis實現訊息佇列處理高併發請求--簡潔程式碼實現效果
將請求存入redis 為了模擬多個使用者的請求,使用一個for迴圈替代 //redis資料入隊操作 $redis = new Redis(); $redis->connect('127.0.