1. 程式人生 > >史上最全、最詳細的 kafka 學習筆記!

史上最全、最詳細的 kafka 學習筆記!

一、為什麼需要訊息系統

  • 1.解耦:

    允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。

  • 2.冗餘:

    訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。許多訊息佇列所採用的"插入-獲取-刪除"正規化中,在把一個訊息從佇列中刪除之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的儲存直到你使用完畢。

  • 3.擴充套件性:

    因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。

  • 4.靈活性 & 峰值處理能力:

    在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

  • 5.可恢復性:

    系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。

  • 6.順序保證:

    在大多使用場景下,資料處理的順序都很重要。大部分訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。(Kafka 保證一個 Partition 內的訊息的有序性)

  • 7.緩衝:

    有助於控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況。

  • 8.非同步通訊:

    很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。


二、kafka 架構

2.1 拓撲結構

如下圖:

圖.1

2.2 相關概念

如圖.1中,kafka 相關名詞解釋如下:

1.producer
  訊息生產者,釋出訊息到 kafka 叢集的終端或服務。
2.broker
  kafka 叢集中包含的伺服器。
3.topic
  每條釋出到 kafka 叢集的訊息屬於的類別,即 kafka 是面向 topic 的。
4.partition
  partition 是物理上的概念,每個 topic 包含一個或多個 partition
kafka 分配的單位是 partition
5.consumer
  從 kafka 叢集中消費訊息的終端或服務。
6.Consumer group
  high-level consumer API 中,每個 consumer 都屬於一個 consumer group,每條訊息只能被 consumer group 中的一個 Consumer 消費,但可以被多個 consumer group 消費。
7.replica
  partition 的副本,保障 partition 的高可用。
8.leader
  replica 中的一個角色, producer 和 consumer 只跟 leader 互動。
9.follower
  replica 中的一個角色,從 leader 中複製資料。
10.controller
  kafka 叢集中的其中一個伺服器,用來進行 leader election 以及 各種 failover
11.zookeeper
  kafka 通過 zookeeper 來儲存叢集的 meta 資訊。


2.3 zookeeper 節點

kafka 在 zookeeper 中的儲存結構如下圖所示:

圖.2

 

三、producer 釋出訊息

3.1 寫入方式

producer 採用 push 模式將訊息釋出到 broker,每條訊息都被 append 到 patition 中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障 kafka 吞吐率)。

3.2 訊息路由

producer 傳送訊息到 broker 時,會根據分割槽演算法選擇將其儲存到哪一個 partition。其路由機制為:

  • 指定了 patition,則直接使用;

  • 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition

  •  patition 和 key 都未指定,使用輪詢選出一個 patition。

 

附上 java 客戶端分割槽原始碼,一目瞭然:

//建立訊息例項
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value{
     if (topic == null)
          throw new IllegalArgumentException("Topic cannot be null");
     if (timestamp != null && timestamp < 0)
          throw new IllegalArgumentException("Invalid timestamp " + timestamp);
     this.topic = topic;
     this.partition = partition;
     this.key = key;
     this.value = value;
     this.timestamp = timestamp;
}

//計算 patition,如果指定了 patition 則直接使用,否則使用 key 計算
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster{
     Integer partition = record.partition();
     if (partition != null) {
          List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
          int lastPartition = partitions.size() - 1;
          if (partition < 0 || partition > lastPartition) {
               throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
          }
          return partition;
     }
     return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

// 使用 key 選取 patition
public int partition(String topic, Object key, byte[] keyBytes, Object valuebyte[] valueBytes, Cluster cluster{
     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
     int numPartitions = partitions.size();
     if (keyBytes == null) {
          int nextValue = counter.getAndIncrement();
          List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
          if (availablePartitions.size() > 0) {
               int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
               return availablePartitions.get(part).partition();
          } else {
               return DefaultPartitioner.toPositive(nextValue) % numPartitions;
          }
     } else {
          //對 keyBytes 進行 hash 選出一個 patition
          return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
}


3.3 寫入流程

 producer 寫入訊息序列圖如下所示:

圖.3

流程說明:

  1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader

  2.  producer 將訊息傳送給該 leader

  3.  leader 將訊息寫入本地 log

  4.  followers 從 leader pull 訊息,寫入本地 log 後 leader 傳送 ACK

  5. leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 並向 producer 傳送 ACK


3.4 producer delivery guarantee

 一般情況下存在三種情況:

  1. At most once 訊息可能會丟,但絕不會重複傳輸

  2.  At least one 訊息絕不會丟,但可能會重複傳輸

  3.  Exactly once 每條訊息肯定會被傳輸一次且僅傳輸一次

當 producer 向 broker 傳送訊息時,一旦這條訊息被 commit,由於 replication 的存在,它