Kafka新版消費者API示例(二)
kafka手動提交策略提供了更加靈活的管理方式,在某些場景我們需要對消費偏移量有更精準的管理。以保證訊息不被重複消費以及訊息不丟失。
Kafka提供兩種手動提交方式:
1.非同步提交(commitAsync):
非同步模式下,提交失敗也不會嘗試提交。消費者執行緒不會被阻塞,因為非同步操作,可能在提交偏移量操作結果未返回時就開始下一次拉取操作。
2.同步提交(CommitSync):
同步模式下,提交失敗時一直嘗試提交,直到遇到無法重試才結束。同步方式下,消費者執行緒在拉取訊息時會被阻塞,直到偏移量提交操作成功或者在提交過程中發生錯誤。
實現手動提交前需要在建立消費者時關閉自動提交,設定enable.auto.commit=false。
由於非同步提交不會等消費偏移量提交成功後再拉取下一次訊息,因此非同步提交提供了一個偏移量提交回調方法commitAsync(OffsetCommitCallback callback)。提交偏移量完成之後會回撥OffsetCommitCallback介面的onComplete()方法
示例程式碼:
package com.simon.kafka.consumer.newconsumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.*; /** * Created by Simon on 2018/11/5. */ public class KafkaConsumerAsync { public static void main(String[] args) throws InterruptedException { // 1、準備配置檔案 String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094"; Properties props = new Properties(); //kafka連線資訊 props.put("bootstrap.servers",kafkas); //消費者組id props.put("group.id", "test_group"); //是否自動提交offset props.put("enable.auto.commit", "false"); //在沒有offset的情況下采取的拉取策略 props.put("auto.offset.reset", "none"); //自動提交時間間隔 props.put("auto.commit.interval.ms", "1000"); //設定一次fetch請求取得的資料最大為1k props.put("fetch.max.bytes", "1024"); //key反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "test"; // 2、建立KafkaConsumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 3、訂閱資料,不給定監聽器 consumer.subscribe(Collections.singleton(topic)); try{ //最少處理100條 int minCommitSize = 100; //定義計數器 int icount = 0; // 4、獲取資料 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value()); icount++; } Thread.sleep(5000); //在業務邏輯處理成功後提交offset if(icount >= minCommitSize){ //滿足最少消費100條,再進行非同步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception == null){ System.out.println("commit success"); }else { //提交失敗,對應處理 System.out.println("commit failed"); } } }); //計數器歸零 icount = 0 ; } } }catch (Exception e){ e.printStackTrace(); }finally { //關閉連線 consumer.close(); } } }
以時間戳查詢訊息:
Kafka在0.10.1.1版本上增加了時間戳索引檔案。Kafka消費者API提供了一個offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,引數為一個map物件,key為待查詢的分割槽,value為待查詢的時間戳。會返回一個大於等於該事件戳的第一條訊息對應的偏移量和時間戳。若待查詢分割槽不存在,會一直阻塞。
示例:
將kafka-client的maven依賴改為1.0.0 。在0.10.0.1中無法引入OffsetAndTimestamp類
<!--引入kafka-clients--> <!--<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency>--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency>
程式碼:
package com.simon.kafka.consumer.newconsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Created by Simon on 2018/11/5.
*/
public class KafkaConsumerTimestamps {
public static void main(String[] args) throws InterruptedException {
// 1、準備配置檔案
String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
Properties props = new Properties();
//kafka連線資訊
props.put("bootstrap.servers",kafkas);
//消費者組id
props.put("group.id", "test_group");
//客戶端id
props.put("client.id", "test_group");
//是否自動提交offset
props.put("enable.auto.commit", "true");
//在沒有offset的情況下采取的拉取策略
props.put("auto.offset.reset", "none");
//自動提交時間間隔
props.put("auto.commit.interval.ms", "1000");
//設定一次fetch請求取得的資料最大為1k
props.put("fetch.max.bytes", "1024");
//key反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
String topic = "test";
// 2、建立KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3、訂閱主題
TopicPartition topicPartition = new TopicPartition(topic,0);
consumer.assign(Collections.singleton(topicPartition));
try{
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
// 設定查詢12 小時之前訊息的偏移量
timestampsToSearch.put(topicPartition, (System.currentTimeMillis() - 12 * 3600 * 1000));
// 會返回時間大於等於查詢時間的第一個偏移量
Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
// 用for 輪詢,當然由於本例是查詢的一個分割槽,因此也可以用if 處理
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
// 若查詢時間大於時間戳索引檔案中最大記錄索引時間,
// 此時value 為空,即待查詢時間點之後沒有新訊息生成
offsetTimestamp = entry.getValue();
if (null != offsetTimestamp) {
// 重置消費起始偏移量
consumer.seek(topicPartition, entry.getValue().offset());
}
}
while (true) {
//4.輪詢拉取訊息
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
//關閉連線
consumer.close();
}
}
}
由於叢集環境已選型為kafka0.10.0.1,本次無法按指定時間戳拉取,報錯資訊為不支援當前broker版本。
速度控制:
應用場景中我們可能需要暫停某些分割槽消費,先消費其他分割槽,當達到某個條件再恢復該分割槽消費。
Kafka提供兩種方法用於速度控制的方法:
1.pause(Collection<TopicPartition> partitions):暫停某些分割槽在拉取操作時返回資料給客戶端
//無返回值
consumer.pause(Collections.singleton(topicPartition));
2.resume(Collection<TopicPartition> partitions):恢復某些分割槽向客戶端返回資料
//無返回值
consumer.resume(Collections.singleton(topicPartition));