1. 程式人生 > >Apache Kafka學習之Kafka基本原理

Apache Kafka學習之Kafka基本原理

       Kafka是一個使用Scala編寫的訊息系統,原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營資料處理管道(Pipeline)的基礎。現在它已被多家不同型別的公司作為多種型別的資料管道和訊息系統使用。

Kafka是一種分散式的,基於釋出/訂閱的訊息系統。

      Kafka使用zookeeper作為其分散式協調框架,很好的將訊息生產、訊息儲存、訊息消費的過程結合在一起。同時藉助zookeeper,kafka能夠生產者、消費者和broker在內的所以元件在無狀態的情況下,建立起生產者和消費者的訂閱關係,並實現生產者與消費者的負載均衡。

2、kafka的特性

(1)以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的訪問效能。

(2)高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條以上訊息的傳輸。

(3)支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個Partition內的訊息順序儲存和傳輸。

(4)同時支援離線資料處理(Offline)和實時資料處理(Online)。

(5)Scale out:支援線上水平擴充套件。無需停機即可擴充套件機器。

(6)支援定期刪除資料機制。可以按照時間段來刪除,也可以按照文件大小來刪除。

(7)Consumer採用pull的方式消費資料,消費狀態由Consumer控制,減輕Broker負擔。

3、Kafka架構

(1)Broker:和RabbitMQ中的Broker概念類似。一個kafka伺服器就是一個Broker,而一個kafka叢集包含一個或多個Broker。Broker會持久化資料到相應的Partition中,不會有cache壓力。

(2)Topic:主題。每條訊息都有一個類別,這個類別就叫做Topic。Kafka的Topic可以理解為RabbitMQ的Queue訊息佇列,相同類別的訊息被髮送到同一個Topic中,然後再被此Topic的Consumer消費。Topic是邏輯上的概念,而物理上的實現就是Partition。

(3)Partition:分割槽。分割槽是物理上的概念,每個Topic包含一個或者多個Partition,每個Partition都是一個有序佇列

。傳送給Topic的訊息經過分割槽演算法(可以自定義),決定訊息儲存在哪一個Partition當中。每一條資料都會被分配一個有序id:Offset。注意:kafka只保證按一個partition中的順序將訊息發給Consumer,不保證一個Topic的整體(多個partition間)的順序。

(4)Replication:備份。Replication是基於Partition而不是Topic的。每個Partition都有自己的備份,且分佈在不同的Broker上。

(5)Offset:偏移量。kafka的儲存檔案都是用offset來命名,用offset做名字的好處是方便查詢。例如你想找位於2049的位置,只要找到2048.log的檔案即可。當然the first offset就是00000000000.log。注意:每個Partition中的Offset都是各不影響的從0開始的有序數列。

(6)Producer:訊息生產者。

(7)Consumer:訊息消費者。Consumer採用pull的方式從Broker獲取訊息,並且Consumer要維護消費狀態,因此Kafaka系統中,業務重心一般都在Consumer身上,而不像RabbitMQ那樣Broker做了大部分的事情。

(8)Consumer Group:消費組。每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。每個Topic可以被多個Group訂閱,每個Group中可以有多個Consumer。傳送到Topic的一條訊息會被每個Group中的一個Consumer消費,多個Consumer之間將交錯消費整個Topic的訊息,實現負載均衡。

(9)Record:訊息。每一個訊息由一個Key、一個Value和一個時間戳構成。

Kafka內部結構圖(圖片源於網路)

Kafka拓撲結構圖(圖片源於網路)

4、Topic、Partition檔案儲存

4.1、Topic與Partition的關係

Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條訊息放進哪個queue裡。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個資料夾,該資料夾下儲存這個Partition的所有訊息和索引檔案。若建立topic1和topic2兩個topic,且分別有13個和19個分割槽,則整個叢集上會相應會生成共32個資料夾。partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。

4.2、Partition檔案儲存的特點

(1)每個partition目錄相當於一個巨型檔案被平均分配到多個大小相等segment資料檔案中。但每個segment file訊息數量不一定相等,這種特性方便old segment file快速被刪除。

(2)每個partiton只需要支援順序讀寫就行了,segment檔案生命週期由服務端配置引數決定。

(3)segment file組成:由2大部分組成,分別為index file(字尾“.index”)和data file(字尾“.log”),此2個檔案一一對應,成對出現。

(4)segment檔案命名規則:partition全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值。數值最大為64位long大小,19位數字字元長度,沒有數字用0填充。

\

Segment file結構圖(圖片來源於網路)

以上述圖2中一對segment file檔案為例,說明segment中index和log檔案對應關係物理結構如下:

\

Index和log檔案對應圖(圖片來源於網路)

其中以索引檔案中元資料3,497為例,依次在資料檔案中表示第3個message(在全域性partition表示第368772個message)、以及該訊息的物理偏移地址為497。

4.3、在partition中如何通過offset查詢message

例如讀取offset=368776的message,需要通過下面2個步驟查詢。

(1)第一步查詢segment file

上圖為例,其中00000000000000000000.index表示最開始的檔案,起始偏移量(offset)為0.第二個檔案00000000000000368769.index的訊息量起始訊息為368770 = 368769 + 1.同樣,第三個檔案00000000000000737337.index的起始訊息為737338=737337 + 1,只要根據offset 進行二分查詢檔案列表,就可以快速定位到具體檔案。當offset=368776時定位到00000000000000368769.index|log。

(2)第二步通過segment file查詢message

通過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元資料物理位置和00000000000000368769.log的物理偏移地址,然後再通過00000000000000368769.log順序查詢直到offset=368776為止。

4.4、Kafka叢集中Partition分佈規則

首先來看一條在Linux下建立topic的命令:

bin/kafka-topics.sh --create --zookeeper ip1:2181,ip2:2181,ip3:2181,ip4:2181 --replication-factor 2 --partitions 4 --topic test

此命令的意思是在四個Broker的kafka叢集上建立一個名為test的Topic,並且有4個分割槽2個備份(此處比較容易搞混,2個Replication表示Leader和Follower一共加起來有2個)。此時在四臺機器上面就有8個Partition,如圖所示。

\

Kafka叢集Partition分佈圖1(圖片來源於網路)

當叢集中新增2節點,Partition增加到6個時分佈情況如下:

\

Kafka叢集Partition分佈圖2(圖片來源於網路)

在Kafka叢集中,每個Broker都有均等分配Leader Partition機會。

上述圖Broker Partition中,箭頭指向為副本,以Partition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本。每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本,如此迴圈迭代分配,多副本都遵循此規則。

副本分配演算法

(1)將所有n個Broker和待分配的i個Partition排序。

(2)將第i個Partition分配到第(i mod n)個Broker上。

(3)將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上

例如圖2中的第三個Partition:partition-2,將被分配到Broker3((3 mod 6)=3)上,partition-2的副本將被分配到Broker4上((3+1) mod 6=4)。

4.5、kafka檔案儲存特點

(1)Kafka把topic中一個parition大檔案分成多個小檔案段,通過多個小檔案段,就容易定期清除或刪除已經消費完檔案,減少磁碟佔用。可以設定segment檔案大小定期刪除訊息過期時間定期刪除

(2)通過索引資訊可以快速定位message。

(3)通過index元資料全部對映到memory,可以避免segment file的IO磁碟操作。

(4)通過索引檔案稀疏儲存,可以大幅降低index檔案元資料佔用空間大小。

4.6、Consumer和Partition的關係

對於多個Partition,多個Consumer

(1)如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許併發的,所以consumer數不要大於partition數。

(2)如果consumer比partition少,一個consumer會對應於多個partition,這裡要合理分配consumer數和partition數,否則會導致partition裡面的資料被取的不均勻。最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目。

(3)如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同

(4)增減consumer,broker,partition會導致rebalance,所以rebalance後consumer對應的partition會發生變化

(5)High-level介面中獲取不到資料的時候是會block的。

關於zookeeper中Offset初始值的問題:

Zookeeper中Offset的初始值預設是非法的,因此通過設定Consumer的引數auto.offset.reset來告訴Consumer讀取到Offset非法時該怎麼做。

auto.offset.reset有三個值:

(1)smallest : 自動把zookeeper中的offset設為Partition中最小的offset;

(2)largest : 自動把zookeeper中offset設為Partition中最大的offset;

(3)anything else: 丟擲異常;

auto.offset.reset預設值是largest,此種情況下如果producer先發送了10條資料到某個Partition,然後Consumer啟功後修改zookeeper中非法Offset值為Partition中的最大值9(Offset從0開始),這樣Consumer就忽略了這10條訊息。就算現在再次設定成smallest也讀取不到之前的10條資料了,因為此時Offset是合法的了。

所以,想要讀取之前的資料,就需要在一開始指定auto.offset.reset=smallest。

5、Replication副本同步機制

Replication是基於Partition而不是Topic的。每個Partition都有自己的備份,且分佈在不同的Broker上。這些Partition當中有一個是Leader,其他都是Follower。Leader Partition負責讀寫操作,Follower Partition只負責從Leader處複製資料,使自己與Leader保持一致。Zookeeper負責兩者間的故障切換(fail over,可以理解為Leader選舉)。

訊息複製延遲受最慢的Follower限制,Leader負責跟蹤所有Follower的狀態,如果Follower“落後”太多或者失效,Leader就將此Follower從Replication同步列表中移除,但此時Follower是活著的,並且一直從Leader拉取資料,直到差距小於replica.lag.max.messages值,然後重新加入同步列表。當一條訊息被所有的Follower儲存成功,此訊息才被認為是“committed”,Consumer才能消費這條訊息。這種同步方式就要求Leader和Follower之間要有良好的網路環境。

一個partition的follower落後於leader足夠多時,會被認為不在同步副本列表或處於滯後狀態。在Kafka-0.8.2.x中,副本滯後判斷依據是副本落後於leader最大訊息數量(replica.lag.max.messages)或replication響應Leader partition的最長等待時間(replica.lag.time.max.ms)。前者是用來檢測緩慢的副本,而後者是用來檢測失效或死亡的副本。假設replica.lag.max.messages設定為4,表明只要follower落後leader的訊息數小於4,就不會從同步副本列表中移除。replica.lag.time.max設定為500 ms,表明只要follower向leader傳送拉取資料請求時間間隔超過500 ms,就會被標記為死亡,並且會從同步副本列表中移除。

當Leader處於流量高峰時,比如一瞬間就收到了4條資料,此時所有Follower將被認為是“out-of-sync”並且從同步副本列表中移除,然後Follower拉取資料趕上Leader過後又重新加入同步列表,就這樣Follower頻繁在副本同步列表移除和重新加入之間來回切換。

即使只有一個replicas例項存活,仍然可以保證訊息的正常傳送和接收,只要zookeeper叢集存活即可(注意:不同於其他分散式儲存,比如hbase需要"多數派"存活才行)。

當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,因此需要選擇一個"up-to-date"的follower。kafka中leader選舉並沒有採用"投票多數派"的演算法,因為這種演算法對於"網路穩定性"/"投票參與者數量"等條件有較高的要求,而且kafka叢集的設計,還需要容忍N-1個replicas失效。對於kafka而言,每個partition中所有的replicas資訊都可以在zookeeper中獲得,那麼選舉leader將是一件非常簡單的事情。選擇follower時需要兼顧一個問題,就是新leader 所在的server伺服器上已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力。在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader。在整個叢集中,只要有一個replicas存活,那麼此partition都可以繼續接受讀寫操作。

6、Consumer均衡演算法

當一個Group中,有Consumer加入或者離開時,會觸發Partitions均衡。均衡的最終目的,是提升Topic的併發消費能力。

(1)假如topic1,具有如下partitions: P0,P1,P2,P3

(2)加入group中,有如下consumer: C0,C1

(3)首先根據partition索引號對partitions排序: P0,P1,P2,P3

(4)根據consumer.id排序: C0,C1

(5)計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

(6)然後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

通過此演算法,就能知道具體Consumer消費的是哪個分割槽中的資料。

7、Producer訊息路由機制

在kafka-Client-0.11.0.0.jar中,提供的有預設的KafkaProducer和DefaultPartitioner實現。其中DefaultPartitioner主要提供了Producer傳送訊息到分割槽的路由演算法,如果給定Key值,就通過Key的雜湊值和分割槽個數取餘來計算;如果沒有給定Key,就通過ThreadLocalRandom.current().nextInt()產生的隨機數與分割槽數取餘(其中涉及複雜步奏參考如下程式碼)。具體程式碼如下:

public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<string, atomicinteger=""> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<string,> configs) {}
    /**
     * 計算給定記錄的分割槽
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<partitioninfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<partitioninfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
    public void close() {}
}

我們也可以設定自己的Partition路由規則,需要繼承Partitioner類實現

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

8、kafka訊息投遞保證(delivery保證)

Kafka的訊息delivery保證主要有三種:

(1)At most once 最多一次。訊息可能會丟失,但絕不會重複傳輸。

(2)At least once 最少一次。訊息絕不會丟失,但可能會重複傳輸。

(3)Exactly once 正好一次。每條訊息正好被傳輸一次和消費一次。

8.1、Producer delivery保證

Producer的delivery保證可以通過引數request.required.acks設定來保證:

(1)request.required.acks=0。

相當於訊息非同步傳送。訊息一發送完畢馬上傳送下一條。由於不需要ack,可能會造成資料丟失,相當於實現了At most once。

(2)request.required.acks=1。

訊息傳送給Leader Partition,在Leader Partition確認訊息並ack 生產者過後才發下一條。

(3)request.required.acks=-1。

訊息傳送給Leader,在Leader收到所有Follower確認儲存訊息的ack後對producer進行ack才傳送下一條。

所以一條訊息從Producer到Broker至少是確保了At least once的,因為有Replication的存在,只要訊息到達Broker就不會丟失。如果ack出現問題,比如網路中斷,有可能會導致producer收不到ack而重複傳送訊息。Exactly once這種方式,沒有查到相關的實現。

第(3)種方式的具體步奏如下:

a. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader

b. producer 將訊息傳送給該 leader

c. leader 將訊息寫入本地 log

d. followers 從 leader pull 訊息,寫入本地 log 後向 leader 傳送 ACK

e. leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 並向 producer 傳送 ACK

8.2、Consumer delivery保證

Consumer從Broker拉取資料過後,可以選擇commit,此操作會在zookeeper中存下此Consumer讀取對應Partition訊息的Offset,以便下一次拉取資料時會從Partition的下一個Offset消費,避免重複消費。

同樣,Consumer可以通過設定引數enable.auto.commit=true來自動確認訊息,即Consumer一收到訊息立刻自動commit。如果只看訊息的讀取過程,kafka是確保了Exactly once的,但是實際情況中Consumer不可能讀取到資料就結束了,往往還需要處理讀取到的資料。因此Consumer處理訊息和commit訊息的順序就決定了delivery保證的類別。

(1)先處理後commit

這種方式實現了At least once。Consumer收到訊息先處理後提交,如果在處理完成後機器崩潰了,導致Offset沒有更新,Consumer下次啟動時又會重新讀取上一次消費的資料,實際上此訊息已經處理過了。

(2)先commit後處理

這種方式實現了At most once。Consumer收到訊息過後立刻commit,更新zookeeper上的Offset,然後再處理訊息。如果處理還未結束Consumer崩潰了,等Consumer再次啟動的時候會讀取Offset更新過後的下一條資料,這就導致了資料丟失。

9、High Level API和Low Level API

Kafka提供了兩種Consumer API,選用哪種API需要視具體情況而定。

9.1、High Level Consumer API

High Level Consumer API圍繞著Consumer Group這個邏輯概念展開,它遮蔽了每個Topic的每個Partition的Offset管理(自動讀取zookeeper中該Partition的last offset )、Broker失敗轉移以及增減Partition、Consumer時的負載均衡(當Partition和Consumer增減時,Kafka自動進行Rebalance)。

9.2、Low Level Consumer API

Low Level Consumer API,作為底層的Consumer API,提供了消費Kafka Message更大的控制,使用者可以實現重複讀取、跳讀等功能。

使用Low Level Consumer API,是沒有對Broker、Consumer、Partition增減進行處理,如果出現這些的增減時,需要自己處理負載均衡。

Low Level Consumer API提供更大靈活控制是以增加複雜性為代價的:

(1)Offset不再透明

(2)Broker自動失敗轉移需要處理

(3)增加Consumer、Partition、Broker需要自己做負載