Kafka消費者讀取資料
轉載於:https://www.cnblogs.com/sodawoods-blogs/p/8969774.html
(1)Customer和Customer Group
(1)兩種常用的訊息模型
佇列模型(queuing)和釋出-訂閱模型(publish-subscribe)。
佇列的處理方式是一組消費者從伺服器讀取訊息,一條訊息只由其中的一個消費者來處理。
釋出-訂閱模型中,訊息被廣播給所有的消費者,接收到訊息的消費者都可以處理此訊息。
(2)Kafka的消費者和消費者組
Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記自己。 一個釋出在Topic上訊息被分發給此消費者組中的一個消費者。 假如所有的消費者都在一個組中
注意:當單個消費者無法跟上資料生成的速度,就可以增加更多的消費者分擔負載,每個消費者只處理部分partition的訊息,從而實現單個應用程式的橫向伸縮。但是不要讓消費者的數量多於partition的數量,此時多餘的消費者會空閒。此外,Kafka還允許多個應用程式從同一個Topic讀取所有的訊息,此時只要保證每個應用程式有自己的消費者組即可。
消費者組的概念就是:當有多個應用程式都需要從Kafka獲取訊息時,讓每個app對應一個消費者組,從而使每個應用程式都能獲取一個或多個Topic的全部訊息;在每個消費者組中,往消費者組中新增消費者來伸縮讀取能力和處理能力,消費者組中的每個消費者只處理每個Topic的一部分的訊息,每個消費者對應一個執行緒。
(3)執行緒安全
在同一個群組中,無法讓一個執行緒執行多個消費者,也無法讓多線執行緒安全地共享一個消費者。按照規則,一個消費者使用一個執行緒,如果要在同一個消費者組中執行多個消費者,需要讓每個消費者執行在自己的執行緒中。最好把消費者的邏輯封裝在自己的物件中,然後使用java的ExecutorService啟動多個執行緒,使每個消費者執行在自己的執行緒上,可參考https://www.confluent.io/blog
(2)Partition Rebalance分割槽再均衡
(1)消費者組中新新增消費者讀取到原本是其他消費者讀取的訊息
(2)消費者關閉或崩潰之後離開群組,原本由他讀取的partition將由群組裡其他消費者讀取
(3)當向一個Topic新增新的partition,會發生partition在消費者中的重新分配
以上三種現象會使partition的所有權在消費者之間轉移,這樣的行為叫作再均衡。
再均衡的優點:
給消費者組帶來了高可用性和伸縮性
再均衡的缺點:
(1)再均衡期間消費者無法讀取訊息,整個群組有一小段時間不可用
(2)partition被重新分配給一個消費者時,消費者當前的讀取狀態會丟失,有可能還需要去重新整理快取,在它重新恢復狀態之前會拖慢應用程式。
因此需要進行安全的再均衡和避免不必要的再均衡。
(3)建立Kafka消費者、訂閱主題、輪詢
Properties props = new Properties();props.put("bootstrap", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//1.建立消費者
KafkaConsuner<String, String> consumer = new KafkaConsumer<String, String>(props);
//2.訂閱Topic
//建立一個只包含單個元素的列表,Topic的名字叫作customerCountries
consumer.subscribe(Collections.singletonList("customerCountries"));
//支援正則表示式,訂閱所有與test相關的Topic
//consumer.subscribe("test.*");
//3.輪詢
//訊息輪詢是消費者的核心API,通過一個簡單的輪詢向伺服器請求資料,一旦消費者訂閱了Topic,輪詢就會處理所欲的細節,包括群組協調、partition再均衡、傳送心跳
//以及獲取資料,開發者只要處理從partition返回的資料即可。
try {
while (true) {//消費者是一個長期執行的程式,通過持續輪詢向Kafka請求資料。在其他執行緒中呼叫consumer.wakeup()可以退出迴圈
//在100ms內等待Kafka的broker返回資料.超市引數指定poll在多久之後可以返回,不管有沒有可用的資料都要返回
ConsumerRecord<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.debug(record.topic() + record.partition() + record.offset() + record.key() + record.value());
//統計各個地區的客戶數量,即模擬對訊息的處理
int updatedCount = 1;
updatedCount += custCountryMap.getOrDefault(record.value(), 0) + 1;
custCountryMap.put(record.value(), updatedCount);
//真實場景中,結果一般會被儲存到資料儲存系統中
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
//退出應用程式前使用close方法關閉消費者,網路連線和socket也會隨之關閉,並立即觸發一次再均衡
consumer.close();
}
(4)消費者的配置
1:fetch.min.bytes,指定消費者從broker獲取訊息的最小位元組數,即等到有足夠的資料時才把它返回給消費者
2:fetch.max.wait.ms,等待broker返回資料的最大時間,預設是500ms。fetch.min.bytes和fetch.max.wait.ms哪個條件先得到滿足,就按照哪種方式返回資料
3:max.partition.fetch.bytes,指定broker從每個partition中返回給消費者的最大位元組數,預設1MB
4:session.timeout.ms,指定消費者被認定死亡之前可以與伺服器斷開連線的時間,預設是3s
5:auto.offset.reset,消費者在讀取一個沒有偏移量或者偏移量無效的情況下(因為消費者長時間失效,包含偏移量的記錄已經過時並被刪除)該作何處理。預設是latest(消費者從最新的記錄開始讀取資料)。另一個值是 earliest(消費者從起始位置讀取partition的記錄)
6:enable.auto.commit,指定消費者是否自動提交偏移量,預設為true
7:partition.assignment.strategy,指定partition如何分配給消費者,預設是Range。Range:把Topic的若干個連續的partition分配給消費者。RoundRobin:把Topic的所有partition逐個分配給消費者
8:max.poll.records,單次呼叫poll方法能夠返回的訊息數量
(5)提交和偏移量
1、消費者為什麼要提交偏移量
當消費者崩潰或者有新的消費者加入,那麼就會觸發再均衡(rebalance),完成再均衡後,每個消費者可能會分配到新的分割槽,而不是之前處理那個,為了能夠繼續之前的工作,消費者需要讀取每個partition最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。
2、提交偏移量可能帶來的問題
case1:如果提交的偏移量小於客戶端處理的最後一個訊息的偏移量,那麼處於兩個偏移量之間的訊息就會被重複處理。
case2:如果提交的偏移量大於客戶端處理的最後一個訊息的偏移量,那麼處於兩個偏移量之間的訊息將會丟失。
3、提交偏移量的方式
(1)自動提交 Automatic Commit
enable.auto.commit設定成true(預設為true),那麼每過5s,消費者自動把從poll()方法接收到的最大的偏移量提交。提交的時間間隔由auto.commit.interval.ms控制,預設是5s
自動提交的優點是方便,但是可能會重複處理訊息
(2)提交當前偏移量 Commit Current Offset
將enable.auto.commit設定成false,讓應用程式決定何時提交偏移量。commitSync()提交由poll()方法返回的最新偏移量,所以在處理完所有訊息後要確保呼叫commitSync,否則會有訊息丟失的風險。commitSync在提交成功或碰到無法恢復的錯誤之前,會一直重試。如果發生了再均衡,從最近一批訊息到發生再均衡之間的所有訊息都會被重複處理。
不足:broker在對提交請求作出迴應之前,應用程式會一直阻塞,會限制應用程式的吞吐量
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", record.topic(),
record.partition(), record.offset(), record.key(),
record.value());
}
try {
consumer.commitSync();//處理完當前批次的訊息,在輪詢更多的訊息之前,呼叫commitSync方法提交當前批次最新的訊息
} catch (CommitFailedException e) {
log.error("commit failed", e);//只要沒有發生不可恢復的錯誤,commitSync方法會一直嘗試直至提交成功。如果提交失敗,我們也只能把異常記錄到錯誤日誌裡
}
}
(3)非同步提交
非同步提交的commitAsync,只管傳送提交請求,無需等待broker響應。commitAsync提交之後不進行重試,假設要提交偏移量2000,這時候發生短暫的通訊問題,伺服器接收不到提交請求,因此也就不會作出響應。與此同時,我們處理了另外一批訊息,併成功提交了偏移量3000,。如果commitAsync重新嘗試提交2000,那麼它有可能在3000之後提交成功,這個時候如果發生再均衡,就會出現重複訊息。
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", record.topic(),
record.partition(), record.offset(), record.key(),
record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {//在broker作出響應後執行回撥函式,回撥經常被用於記錄提交錯誤或生成度量指標
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
log.error("Commit Failed for offsets {}", offsets, e);
}
}});
}
(4)同步和非同步組合提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。但是如果在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。
因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量。
try {while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", record.topic(),
record.partition(), record.offset(), record.key(),
record.value());
}
consumer.commitAsync();//如果一切正常,我們使用commitAsync來提交,這樣速度更快,而且即使這次提交失敗,下次提交很可能會成功
} catch (CommitFailedException e) {
log.error("commit failed", e);
} finally {
try {
consumer.commitSync();//關閉消費者前,使用commitSync,直到提交成成功或者發生無法恢復的錯誤
} finally {
consumer.close();
}
}
(5)提交特定的偏移量
消費者API允許呼叫commitSync()和commitAsync()方法時傳入希望提交的partition和offset的map,即提交特定的偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();//用於跟蹤偏移量的mapint count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", record.topic(),
record.partition(), record.offset(), record.key(),
record.value());//模擬對訊息的處理
//在讀取每條訊息後,使用期望處理的下一個訊息的偏移量更新map裡的偏移量。下一次就從這裡開始讀取訊息
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, “no matadata”));
if (count++ % 1000 == 0) {//每處理1000條訊息就提交一次偏移量,在實際應用中,可以根據時間或者訊息的內容進行提交
consumer.commitAsync(currentOffsets, null);
}
}
}
(6)再均衡監聽器
在為消費者分配新的partition或者移除舊的partition時,可以通過消費者API執行一些應用程式程式碼,在使用subscribe()方法時傳入一個ConsumerRebalanceListener例項。
ConsumerRebalanceListener需要實現的兩個方法
1:public void onPartitionRevoked(Collection<TopicPartition> partitions)方法會在再均衡開始之前和消費者停止讀取訊息之後被呼叫。如果在這裡提交偏移量,下一個接管partition的消費者就知道該從哪裡開始讀取了。
2:public void onPartitionAssigned(Collection<TopicPartition> partitions)方法會在重新分配partition之後和消費者開始讀取訊息之前被呼叫。
下面的例子演示如何在失去partition的所有權之前通過onPartitionRevoked()方法來提交偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();//用於跟蹤偏移量的mapprivate class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//如果發生再均衡,要在即將失去partition所有權時提交偏移量。
//注意:(1)提交的是最近處理過的偏移量,而不是批次中還在處理的最後一個偏移量。因為partition有可能在我們還在處理訊息時被撤回。
//(2)我們要提交所有分割槽的偏移量,而不只是即將市區所有權的分割槽的偏移量。因為提交的偏移量是已經處理過的,所以不會有什麼問題。
//(3)呼叫commitSync方法,確保在再均衡發生之前提交偏移量
consumer.commitSync(currentOffsets);
}
}
try{
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", record.topic(),
record.partition(), record.offset(), record.key(),
record.value());//模擬對訊息的處理
//在讀取每條訊息後,使用期望處理的下一個訊息的偏移量更新map裡的偏移量。下一次就從這裡開始讀取訊息
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, “no matadata”));
}
consumer.commitAsync(currentOffsets, null);
} catch(WakeupException e) {
//忽略異常,正在關閉消費者
} catch (Exception e) {
log.error("unexpected error", e);
} finally {
try{
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
}
}