1. 程式人生 > >kafka消息通信原理學習(1)

kafka消息通信原理學習(1)

coord 例子 eba alt sub 序列化 strac system str

關於 Topic 和 Partition:

  Topic:

在 kafka 中,topic 是一個存儲消息的邏輯概念,可以認為是一個消息集合。每條消息發送到 kafka 集群的消息都有一個類別。物理上來說,不同的 topic 的消息是分開存儲的,每個 topic 可以有多個生產者向它發送消息,也可以有多個消費者去消費其中的消息。

技術分享圖片

  Partition:

  每個 topic 可以劃分多個分區(每個 Topic 至少有一個分區),同一 topic 下的不同分區包含的消息是不同的。每個消息在被添加到分區時,都會被分配一個 offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka 通過 offset保證消息在分區內的順序,offset 的順序不跨分區,即 kafka只保證在同一個分區內的消息是有序的。下圖中,對於名字為 test 的 topic,做了 3 個分區,分別是p0、p1、p2.

? 每一條消息發送到 broker 時,會根據 partition 的規則選擇存儲到哪一個 partition。如果 partition 規則設置合理,那麽所有的消息會均勻的分布在不同的partition中,這樣就有點類似數據庫的分庫分表的概念,把數據做了分片處理。

技術分享圖片

  Topic&Partition 的存儲:

  Partition 是以文件的形式存儲在文件系統中,比如創建一個名為 firstTopic 的 topic,其中有 3 個 partition,那麽在kafka 的數據目錄(/tmp/kafka-log)中就有 3 個目錄,firstTopic-0~3,命名規則是<topic_name>-<partition_id>,創建3個分區的topic:

sh kafka-topics.sh --create --zookeeper 192.168.254.135:2181 --replication-factor 1 --partitions 3 --topic firstTopic

kafka 消息分發策略:

  消息是 kafka 中最基本的數據單元,在 kafka 中,一條消息由 key、value 兩部分構成,在發送一條消息時,我們可以指定這個 key,那麽 producer 會根據 key 和 partition 機制來判斷當前這條消息應該發送並存儲到哪個 partition 中。我們可以根據需要進行擴展 producer 的 partition 機制。

  我們可以通過如下代碼來實現自己的分片策略:

public class MyPartition implements Partitioner {//實現Partitioner接口

    private Random random=new Random();
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //獲得分區列表 List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic); int partitionNum=0; if(key==null){ partitionNum=random.nextInt(partitionInfos.size()); //隨機分區 }else{ partitionNum=Math.abs((key.hashCode())%partitionInfos.size()); } System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum); return partitionNum; //指定發送的分區值 } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }

  然後基於之前的代碼在producer上需要在消息發送端增加配置:指定自己的partiton策略

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gupaoedu.kafka.MyPartition");

消息默認的分發機制:

  默認情況下,kafka 采用的是 hash 取模的分區算法。如果Key 為 null,則會隨機分配一個分區。這個隨機是在這個參數”metadata.max.age.ms”的時間範圍內隨機選擇一個。對於這個時間段內,如果 key 為 null,則只會發送到唯一的分區。這個值值哦默認情況下是 10 分鐘更新一次。關 於 Metadata ,簡單理解就是Topic/Partition 和 broker 的映射關系,每一個 topic 的每一個 partition,需要知道對應的 broker 列表是什麽,leader是誰、follower 是誰。這些信息都是存儲在 Metadata 這個類裏面。

消費端如何消費指定的分區:

  通過下面的代碼,就可以消費指定該 topic 下的 0 號分區。其他分區的數據就無法接收。

//消費指定分區的時候,不需要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費指定的分區
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));

消息的消費原理:

  在實際生產過程中,每個 topic 都會有多個 partitions,多個 partitions 的好處在於,一方面能夠對 broker 上的數據進行分片有效減少了消息的容量從而提升 io 性能。另外一方面,為了提高消費端的消費能力,一般會通過多個consumer 去消費同一個 topic ,也就是消費端的負載均衡機制,也就是我們接下來要了解的,在多個 partition 以及多個 consumer 的情況下,消費者是如何消費消息的?kafka 存在 consumer group的 概 念 , 也 就是 group.id 一 樣 的 consumer ,這些consumer 屬於一個 consumer group,組內的所有消費者協調在一起來消費訂閱主題的所有分區。當然每一個分區只能由同一個消費組內的 consumer 來消費,那麽同一個consumer group 裏面的 consumer 是怎麽去分配該消費哪個分區裏的數據的呢?舉個簡單的例子就是如果存在的分區輸,即partiton的數量於comsumer數量一致的時候,每個comsumer對應一個分區,如果comsumer數量多於分區,那麽多出來的數量的comsumer將不工作,相反則是其中將會有comsumer消費多個分區。

  分區分配策略:

  在 kafka 中,存在兩種分區分配策略,一種是 Range(默認)、另 一 種 另 一 種 還 是 RoundRobin ( 輪 詢 )。 通 過comsumer的配置partition.assignment.strategy 這個參數來設置。

  Range strategy(範圍分區): 

  Range 策略是對每個主題而言的,首先對同一個主題裏面的分區按照序號進行排序,並對消費者按照字母順序進行排序。假設我們有 10 個分區,3 個消費者,排完序的分區將會是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序將會是C1-0, C2-0, C3-0。然後將 partitions 的個數除於消費者線程的總數來決定每個消費者線程消費幾個分區。如果除不盡,那麽前面幾個消費者線程將會多消費一個分區。比如我們有 10 個分區,3 個消費者線程, 10 / 3 = 3,而且除不盡,那麽消費者線程 C1-0 將會多消費一個分區,所以最後分區分配的結果看起來是這樣的:

  C1-0 將消費 0, 1, 2, 3 分區

  C2-0 將消費 4, 5, 6 分區

  C3-0 將消費 7, 8, 9 分區

假如我們有 11 個分區,那麽最後分區分配的結果看起來是這樣的:

  C1-0 將消費 0, 1, 2, 3 分區

  C2-0 將消費 4, 5, 6, 7 分區

  C3-0 將消費 8, 9, 10 分區

假如我們有 2 個主題(T1 和 T2),分別有 10 個分區,那麽最後分區分配的結果看起來是這樣的:

  C1-0 將消費 T1 主題的 0, 1, 2, 3 分區以及 T2 主題的 0, 1, 2, 3 分區

  C2-0 將消費 T1 主題的 4, 5, 6 分區以及 T2 主題的 4, 5, 6 分區

  C3-0 將消費 T1 主題的 7, 8, 9 分區以及 T2 主題的 7, 8, 9 分區

可以看出,C1-0 消費者線程比其他消費者線程多消費了 2 個分區,這就是 Range strategy 的一個很明顯的弊端.

  RoundRobin strategy(輪詢分區):

  輪詢分區策略是把所有 partition 和所有 consumer 線程都列出來,然後按照 hashcode 進行排序。最後通過輪詢算法分配 partition 給消費線程。如果所有 consumer 實例的訂閱是相同的,那麽 partition 會均勻分布。假如按照 hashCode 排序完的 topic&partitions 組依次為 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費者線程排序為 C1-0, C1-1, C2-0, C2-1,最後分區分配的結果為:

  C1-0 將消費 T1-5, T1-2, T1-6 分區;

  C1-1 將消費 T1-3, T1-1, T1-9 分區;

  C2-0 將消費 T1-0, T1-4 分區;

  C2-1 將消費 T1-8, T1-7 分區;

  使用輪詢分區策略必須滿足兩個條件

    1. 每個主題的消費者實例具有相同數量的流

    2. 每個消費者訂閱的主題必須是相同的

  什麽時候會觸發這個策略呢?當出現以下幾種情況時,kafka 會進行一次分區分配操作,也就是 kafka consumer 的 rebalance。

  1. 同一個 consumer group 內新增了消費者。

  2. 消費者離開當前所屬的 consumer group,比如主動停機或者宕機。

  3. topic 新增了分區(也就是分區數量發生了變化)。

  4.消費者主動取消訂閱topic

  kafka consuemr 的 rebalance 機制規定了一個 consumer group 下的所有 consumer 如何達成一致來分配訂閱 topic的每個分區。而具體如何執行分區策略,就是前面提到過的兩種內置的分區策略。而 kafka 對於分配策略這塊,提供了可插拔的實現方式, 也就是說,除了這兩種之外,我們還可以創建自己的分配機制。可以通過繼承 AbstractPartitionAssignor 抽象類實現 assign 來做到。

  誰來執行 Rebalance 以及管理 consumer 的 group 呢?

  Kafka 提供了一個角色:coordinator(協調員) 來執行對於 consumer group 的管理,當 consumer group 的第一個 consumer 啟動的時候,它會去和 kafka server(broker) 確定誰是它們組的 coordinator。之後該 group 內的所有成員都會和該 coordinator 進行協調通信。consumer group 如何確定自己的 coordinator 是誰呢? 消費 者 向 kafka 集 群 中 的 任 意 一 個 broker 發 送 一 個GroupCoordinatorRequest 請求,服務端會返回一個負載最 小 的 broker 節 點 的 id , 並 將 該 broker 設 置 為coordinator。在 rebalance 之前,需要保證 coordinator 是已經確定好了的,整個 rebalance 的過程分為兩個步驟 ,一個是JoinGroup 的過程,在這個過程之後會進入一個Synchronizing Group State 階段。那麽這兩個階段都做了什麽呢?

  JoinGroup 的過程:

  表示加入到 consumer group 中,在這一步中,所有的成員都會向 coordinator 發送 joinGroup 的請求。一旦所有成員都發送了 joinGroup 請求,那麽 coordinator 會選擇一個 consumer 擔任 leader 角色,並把組成員信息和訂閱信息發送消費者。下圖就是描述了這麽一個過程,並且請求與響應中攜帶的一些重要的信息。

技術分享圖片

  protocol_metadata: 序列化後的消費者的訂閱信息

  leader_id: 消費組中的消費者,coordinator 會選擇一個座位 leader,對應的就是 member_id

  member_metadata 對應消費者的訂閱信息

  members:consumer group 中全部的消費者的訂閱信息

  generation_id:年代信息,類似於 zookeeper 的時候的 epoch 是一樣的,對於每一輪 rebalance ,generation_id 都會遞增。主要用來保護 consumer group。隔離無效的 offset 提交。也就是上一輪的      consumer 成員無法提交 offset 到新的 consumer group 中。

  Synchronizing Group State 階段:

  進入了 Synchronizing Group State階段,主要邏輯是向 GroupCoordinator 發 送SyncGroupRequest 請求,並且處理 SyncGroupResponse響應,簡單來說,就是 leader 將消費者對應的 partition 分配方案同步給 consumer group 中的所有 consumer,每個消費者都會向 coordinator 發送 syncgroup 請求,不過只有 leader 節點會發送分配方案,其他消費者只是打打醬油而已。當 leader 把方案發給 coordinator 以後,coordinator 會把結果設置到 SyncGroupResponse 中。這樣所有成員都知道自己應該消費哪個分區。

技術分享圖片

  member_assignment :在syncGroup發送請求的時候,只有leader角色的comsumer才會去發送這個信息,而其他消費端是空的。然後會通過coordinator去分發給各個comsumer。

  ? consumer group 的分區分配方案是在客戶端執行的!Kafka 將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性

offset :

  每個 topic可以劃分多個分區(每個 Topic 至少有一個分區),同一topic 下的不同分區包含的消息是不同的。每個消息在被添加到分區時,都會被分配一個 offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka 通過 offset 保證消息在分區內的順序,offset 的順序不跨分區,即 kafka 只保證在同一個分區內的消息是有序的; 對於應用層的消費來說,每次消費一個消息並且提交以後,會保存當前消費到的最近的一個 offset。那麽 offset 保存在哪裏?

  offset 在哪裏維護?

  在 kafka 中,提供了一個__consumer_offsets_* 的一個topic , 把 offset 信 息 寫 入 到 這 個 topic 中 。__consumer_offsets——按保存了每個 consumer group某一時刻提交的 offset 信息。__consumer_offsets 默認有50 個分區。可以在 /tmp/kafka-logs/ 下查看。那麽如何查看對應的 consumer_group 保存在哪個分區中呢?

  通過Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由 於 默 認 情 況 下groupMetadataTopicPartitionCount 有 50 個分區,計算得到的結果為:4, 意味著當前的 consumer_group 的位移信息保存在__consumer_offsets 的第 4個分區,執行如下命令,可以查看當前 consumer_goup 中的offset 位移信息

sh kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 4 --broker-list 192.168.254.135:9092,192.168.254.136:9092,192.168.254.137:9092 --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

  執行語句可以看到如下結果:

技術分享圖片

  這個意思就是 KafkaConsumerDemo1 消費組在 testTopic 中現在的 offsets 現在是 111.

消息的存儲:

  首先我們需要了解的是,kafka 是使用日誌文件的方式來保存生產者和發送者的消息,每條消息都有一個 offset 值來表示它在分區中的偏移量。Kafka 中存儲的一般都是海量的消息數據,為了避免日誌文件過大,Log 並不是直接對應在一個磁盤上的日誌文件,而是對應磁盤上的一個目錄,這個目錄的明明規則是<topic_name>_<partition_id>比如創建一個名為 firstTopic 的 topic,其中有 3 個 partition,那麽在 kafka 的數據目錄(/tmp/kafka-log,這裏可以通過server.properties中的log.dirs=/tmp/kafka-logs去修改)中就有 3 個目錄,firstTopic-0~3多個分區在集群中的分配 如果我們對於一個 topic,在集群中創建多個 partition,那麽 partition 是如何分布的呢?

1.將所有 N Broker 和待分配的 i 個 Partition 排序
2.將第 i 個 Partition 分配到第(i mod n)個 Broker 上

技術分享圖片

  結合前面講的消息分發策略,就應該能明白消息發送到 broker 上,消息會保存到哪個分區中,並且消費端應該消費哪些分區的數據了。

kafka消息通信原理學習(1)