Kafka入門三:幾種消費方式
1.消費位移確認
Kafka消費者消費位移確認有自動提交與手動提交兩種策略。在建立KafkaConsumer物件時,通過引數enable.auto.commit設定,true表示自動提交(預設)。自動提交策略由消費者協調器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒執行一次偏移量的提交。手動提交需要由客戶端自己控制偏移量的提交。
(1)自動提交。在建立一個消費者時,預設是自動提交偏移量,當然我們也可以顯示設定為自動。例如,我們建立一個消費者,該消費者自動提交偏移量
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("client.id", "test"); props.put("enable.auto.commit", true);// 顯示設定偏移量自動提交 props.put("auto.commit.interval.ms", 1000);// 設定偏移量提交時間間隔 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 建立消費者 consumer.subscribe(Arrays.asList("test"));// 訂閱主題
(2)手動提交。在有些場景我們可能對消費偏移量有更精確的管理,以保證訊息不被重複消費以及訊息不被丟失。假設我們對拉取到的訊息需要進行寫入資料庫處理,或者用於其他網路訪問請求等等複雜的業務處理,在這種場景下,所有的業務處理完成後才認為訊息被成功消費,這種場景下,我們必須手動控制偏移量的提交。
Kafka 提供了非同步提交(commitAsync)及同步提交(commitSync)兩種手動提交的方式。兩者的主要區別在於同步模式下提交失敗時一直嘗試提交,直到遇到無法重試的情況下才會結束,同時,同步方式下消費者執行緒在拉取訊息時會被阻塞,直到偏移量提交操作成功或者在提交過程中發生錯誤。而非同步方式下消費者執行緒不會被阻塞,可能在提交偏移量操作的結果還未返
回時就開始進行下一次的拉取操作,在提交失敗時也不會嘗試提交。
實現手動提交前需要在建立消費者時關閉自動提交,即設定enable.auto.commit=false。然後在業務處理成功後呼叫commitAsync()或commitSync()方法手動提交偏移量。由於同步提交會阻塞執行緒直到提交消費偏移量執行結果返回,而非同步提交併不會等消費偏移量提交成功後再繼續下一次拉取訊息的操作,因此非同步提交還提供了一個偏移量提交回調的方法commitAsync(OffsetCommitCallback callback)。當提交偏移量完成後會回撥OffsetCommitCallback 介面的onComplete()方法,這樣客戶端根據回撥結果執行不同的邏輯處理。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("fetch.max.bytes", 1024);// 為了便於測試,這裡設定一次fetch 請求取得的資料最大值為1KB,預設是5MB
props.put("enable.auto.commit", false);// 設定手動提交偏移量
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱主題
consumer.subscribe(Arrays.asList("test"));
try {
int minCommitSize = 10;// 最少處理10 條訊息後才進行提交
int icount = 0 ;// 訊息計算器
while (true) {
// 等待拉取訊息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// 簡單打印出訊息內容,模擬業務處理
System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record. partition(), record.offset(), record.key(),record.value());
icount++;
}
// 在業務邏輯處理成功後提交偏移量
if (icount >= minCommitSize){
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (null == exception) {
// TODO 表示偏移量成功提交
System.out.println("提交成功");
} else {
// TODO 表示提交偏移量發生了異常,根據業務進行相關處理
System.out.println("發生了異常");
}
}
});
icount=0; // 重置計數器
}
}
} catch(Exception e){
// TODO 異常處理
e.printStackTrace();
} finally {
consumer.close();
}
3.5以時間戳查詢訊息
Kafka 在0.10.1.1 版本增加了時間戳索引檔案,因此我們除了直接根據偏移量索引檔案查詢訊息之外,還可以根據時間戳來訪問訊息。consumer-API 提供了一個offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,該方法入參為一個Map 物件,Key 為待查詢的分割槽,Value 為待查詢的時間戳,該方法會返回時間戳大於等於待查詢時間的第一條訊息對應的偏移量和時間戳。需要注意的是,若待查詢的分割槽不存在,則該方法會被一直阻塞。
假設我們希望從某個時間段開始消費,那們就可以用offsetsForTimes()方法定位到離這個時間最近的第一條訊息的偏移量,在查到偏移量之後呼叫seek(TopicPartition partition, long offset)方法將消費偏移量重置到所查詢的偏移量位置,然後呼叫poll()方法長輪詢拉取訊息。例如,我們希望從主題“stock-quotation”第0 分割槽距離當前時間相差12 小時之前的位置開始拉取訊息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 顯示設定偏移量自動提交
props.put("auto.commit.interval.ms", 1000);// 設定偏移量提交時間間隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱主題
consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
try {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition,Long>();
// 構造待查詢的分割槽
TopicPartition partition = new TopicPartition("stock-quotation", 0);
// 設定查詢12 小時之前訊息的偏移量
timestampsToSearch.put(partition, (System.currentTimeMillis() - 12 * 3600 * 1000));
// 會返回時間大於等於查詢時間的第一個偏移量
Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes (timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
// 這裡依然用for 輪詢,當然由於本例是查詢的一個分割槽,因此也可以用if 處理
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
// 若查詢時間大於時間戳索引檔案中最大記錄索引時間,
// 此時value 為空,即待查詢時間點之後沒有新訊息生成
offsetTimestamp = entry.getValue();
if (null != offsetTimestamp) {
// 重置消費起始偏移量
consumer.seek(partition, entry.getValue().offset());
}
}
while (true) {
// 等待拉取訊息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records){
// 簡單打印出訊息內容
System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record.partition(), record.offset(), record.key(),record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
3.6消費速度控制
提供 pause(Collection<TopicPartition> partitions)和resume(Collection<TopicPartition>
partitions)方法,分別用來暫停某些分割槽在拉取操作時返回資料給客戶端和恢復某些分割槽向客戶端返回資料操作。通過這兩個方法可以對消費速度加以控制,結合業務使用。