1. 程式人生 > 其它 >Kafka中Consumer Group

Kafka中Consumer Group

Consumer Group 是 Kafka 提供的可擴充套件且具有容錯性的消費者機制。

  1. Consumer Group 下可以有一個或多個 Consumer 例項。這裡的例項可以是一個單獨的程序,也可以是同一程序下的執行緒。在實際場景中,使用程序更為常見一些。
  2. Group ID 是一個字串,在一個 Kafka 叢集中,它標識唯一的一個 Consumer Group。
  3. Consumer Group 下所有例項訂閱的主題的單個分割槽,只能分配給組內的某個 Consumer 例項消費。這個分割槽當然也可以被其他的 Group 消費。

理想情況下,Consumer 例項的數量應該等於該 Group 訂閱主題的分割槽總數。

 設定大於總分割槽數的 Consumer 例項只會浪費資源,而沒有任何好處,因為同一個consumer group的consumer例項只能一個消費一個partition。

有幾個問題

1.針對 Consumer Group,Kafka 是怎麼管理位移的呢?

對於 Consumer Group ,Offset是一組 KV 對,Key 是分割槽,V 對應 Consumer 消費該分割槽的最新位移。

老版本的 Consumer Group 把位移儲存在 ZooKeeper 中,因為比較流行的提法是將伺服器節點做成無狀態的,這樣可以自由地擴縮容,實現超強的伸縮性,儲存的格式路徑是/kafka/consumers/consumer group名字/offsets/topic名字/partition,內容就是消費的offset值。

使用的java類是kafka.javaapi.consumer.ConsumerConnector

/kafka/consumers/zoo-consumer-group/offsets/my-topic/0

但是ZooKeeper 這類元框架其實並不適合進行頻繁的寫更新,而 Consumer Group 的位移更新卻是一個非常頻繁的操作。這種大吞吐量的寫操作會極大地拖慢 ZooKeeper 叢集的效能,所以新版的kafka不再使用zk,採用了將位移儲存在 Kafka 內部主題的方法。

這個Broker 端內部主題就是 __consumer_offsets。配置引數【bootstrap.servers】來消費。

使用的java類是org.apache.kafka.clients.consumer.KafkaConsumer

用kafka-consumer-offset-checker檢視

[root@localhost data]# kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group test-consumer-group --topic stable-test
[2017-08-22 19:24:24,222] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test-consumer-group stable-test 0 601808 601808 0 none
test-consumer-group stable-test 1 602826 602828 2 none
test-consumer-group stable-test 2 602136 602136 0 none

 


上面結果的說明:

Group : 消費者組
Topic : topic的名字
Pid : partition的ID
Offset : kafka消費者在對應分割槽上已經消費的訊息數【位置】
logSize : 已經寫到該分割槽的訊息數【位置】
Lag : 還有多少訊息未讀取(Lag = logSize - Offset)
Owner : 分割槽建立在哪個broker

 

2.如果兩個消費者 Topic 不一樣,但是 GroupName 一樣,會出現什麼現象?

RocketMQ 有這個問題,好像是部分訊息才會消費。

Kafka從上面的儲存格式來看,不會出現該問題,因為topic是分開的儲存offset的。

consumer.subscribe(Pattern.compile("sales-.*");或者consumer.subscribe(Set<String>);這樣就可以訂閱多個主題。

Kafka是沒影響的。