Kafka基礎(七): Kafka 消費者
阿新 • • 發佈:2020-07-29
1 消費方式
consumer 採用 pull(拉)模式從 broker 中讀取資料。 push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由 broker 決定的。它的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息, 典型的表現就是拒絕服務以及網路擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費訊息。 pull 模式不足之處是,如果 kafka 沒有資料,消費者可能會陷入迴圈中,一直返回空資料。 針對這一點,Kafka 的消費者在消費資料時會傳入一個時長引數 timeout,如果當前沒有資料可供消費,consumer 會等待一段時間之後再返回,這段時長即為 timeout。2 分割槽分配策略
3 offset 的維護
由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢 復後繼續消費。exclude.internal.topics=false
2)讀取 offset
0.11.0.0 之前版本:
bin/kafka-console-consumer.sh --topic __consumer_offsets -- zookeeper hadoop102:0.11.0.0 之後版本(含)2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
bin/kafka-console-consumer.sh --topic __consumer_offsets -- zookeeper hadoop102:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm atter" --consumer.config config/consumer.properties --frombeginning
4 消費者組案例
1)需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費。 2)案例實操 (1)在 hadoop102、hadoop103 上修改/opt/module/kafka/config/consumer.properties 配置檔案中的 group.id 屬性為任意組名。[atguigu@hadoop103 config]$ vi consumer.properties
group.id=atguigu
(2)在 hadoop102、hadoop103 上分別啟動消費者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties [atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh -- bootstrap-server hadoop102:9092 --topic first --consumer.config config/consumer.properties(3)在 hadoop104 上啟動生產者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first >hello world(4)檢視 hadoop102 和 hadoop103 的接收者。 同一時刻只有一個消費者接收到訊息。