Kafka消費者-客戶端開發相關
正常的消費邏輯需要以下幾步:
- 配置消費者相關引數
- 建立一個消費者物件
- 訂閱主題
- 拉取訊息並消費
- 提交消費位移
- 關閉消費者例項
示例程式碼:
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; /** * @author: masheng * @description: Kafka消費者客戶端例項 * @date: 2020/07/27 22:12 */ public class KafkaConsumerAnalyze { private static final String TOPIC = "topic_test"; private static final String BROKER_LIST = "localhost:9092"; private static final String GROUP_ID = "group_test"; private static final AtomicBoolean isRunning = new AtomicBoolean(true); /* * 功能描述: 初始化配置 * @author: masheng * @time: 2020/7/27 * @param * @return: java.util.Properties */ public static Properties initConfig() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.test"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return properties; } public static void main(String[] args) { //1.配置消費者相關引數 Properties properties = initConfig(); //2.建立一個消費者物件 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //3.訂閱主題 consumer.subscribe(Arrays.asList(TOPIC)); try { while (isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } }
1.引數配置
4個必填引數:
- bootstrap.servers
- group.id:消費組名稱
- key.serializer
- value.serializer
2.訂閱主題與分割槽
一個消費組可以訂閱一個或多個主題,如果消費者前後兩次訂閱了不同的主題,以最後一次為準,可以通過正則的方式訂閱主題
方式1:subscribe()方法
方式2:assign()方法,指定需要訂閱的分割槽集合
public void assign(Collection<TopicPartition> partitions) { acquireAndEnsureOpen(); try { if (partitions == null) { throw new IllegalArgumentException("Topic partition collection to assign to cannot be null"); } else if (partitions.isEmpty()) { this.unsubscribe(); } else { Set<String> topics = new HashSet<>(); for (TopicPartition tp : partitions) { String topic = (tp != null) ? tp.topic() : null; if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); topics.add(topic); } // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); metadata.setTopics(topics); } } finally { release(); } }
可以通過KafkaConsumer中的partitionsFor()方法查詢指定主題的元資料資訊:
public List<PartitionInfo> partitionsFor(String topic) {
return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
}
PartitionInfo為主題的分割槽元資料資訊:
public class PartitionInfo { private final String topic; //主題 private final int partition; //分割槽 private final Node leader; //leader副本所在位置 private final Node[] replicas; //分割槽的AR集合 private final Node[] inSyncReplicas; //ISR集合 private final Node[] offlineReplicas; //OSR集合 }
取消訂閱:
使用unsubscribe()方法
總結:
通過subscribe()方法訂閱主題具有消費者自動再均衡的功能,而通過assign()方法訂閱分割槽時不具備消費者自動均衡功能
3.反序列化
反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer等,實現了Deserializer介面,該介面有三個方法:
//配置當前類
void configure(Map<String, ?> configs, boolean isKey);
//執行反序列化
T deserialize(String topic, byte[] data);
//關閉反序列化器
void close();
StringDeserializer類程式碼如下:
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
//把byte[]型別轉換為String型別
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
4.訊息消費
使用poll()方法
消費者消費到的訊息型別為ConsumerRecord:
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic; //主題
private final int partition; //分割槽
private final long offset; //偏移量
private final long timestamp; //時間戳
private final TimestampType timestampType; //時間戳型別,CreateTime和LogAppendTime
private final int serializedKeySize; //key經過序列化之後的大小
private final int serializedValueSize; //value經過序列化之後的大小
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum; //CRC32的校驗值
}
poll()方法返回值型別時ConsumerRecords,表示一次拉取操作所獲得的訊息集,內部包含若干ConsumerRecord
5.位移提交
新消費者客戶端中,消費位移儲存在Kafka內部的主題_consumer_offsets中
假設當前消費者已經消費了x位置的訊息,則需要提交的消費位移時x+1,代表下一條需要拉取的訊息的位置
KafkaConsumer類提供了position(TopicPartition)和committed(TopicPartition)
方法分別獲取消費位置和提交位移
消費位移演示如下:
public class KafkaProducerAnalyze {
private static final String TOPIC = "topic_test";
private static final String BROKER_LIST = "localhost:9092";
/*
* 功能描述: 初始化配置
* @author: masheng
* @time: 2020/7/27
* @param
* @return: java.util.Properties
*/
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
//所有副本都複製完返回成功,延遲最高,可以設定all、0、1三種
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
//1.配置相關引數
Properties properties = initConfig();
//2.初始化生產者物件
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//3.構建傳送訊息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello,Kafka!");
try {
//4.傳送訊息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
//5.關閉生產者例項
producer.close();
}
}
}
Kafka中預設的消費位移的提交方式時自動提交,由消費者客戶端引數enable.auto.commit配置,預設為true,定期5秒提交一次,由引數auto.commit.interval.ms配置,自動位移提交會帶來重複消費和訊息漏消費問題,比如剛提交完消費位移,在下一次提交消費位移之前消費者崩潰了,需要從上一次提交位移的地方重新消費,造成重複消費,如果提交執行緒先於處理執行緒,則會造成漏消費
生產中需要開啟手動提交,分為同步提交和非同步提交,對應於commitSync()和commitAsync()兩個方法
同步提交:
consumer.commitSync
可以改為批量處理+批量提交的方式,將拉取到的訊息存入快取,等積累到足夠多再批量提交
非同步提交:
consumer.commitAsync,提供了一個非同步提交的回撥函式
非同步提交如果失敗,可能會導致重複消費問題,可以設定一個遞增的序號來維護非同步提交的順序,每次位移提交之後增加序號相對應的值,如果遇到位移提交失敗需要重試的時候,檢查所提交的位移和序號的值的大小,如果前者小於後者,說明有更大的位移提交了,不需要進行本次重試,如果相同,說明可以進行重試提交
6.控制或關閉消費
使用pause()和resume()方法實現暫停某些分割槽在拉取操作時返回資料給客戶端和恢復某些分割槽向客戶端返回資料的操作
退出消費迴圈:
使用isRunning.get()方式,通過在其他地方設定該boolean值,或者可以呼叫KafkaConsumer的weakup()方法,跳出迴圈
7.指定消費位移
seek()方法,可以指定partition和offset,seek()方法只能重置消費者分配到的分割槽的消費位置,而分割槽的分配是在 poll()方法的呼叫過程中實現的。也就是說,在執行seek()方法之前需要先執行一次poll()方法,等到分配到分割槽之後才可以重置消費位置
8.再均衡
再均衡是指分割槽的所屬權從一個消費者轉移到另一個消費者,一般情況下,應儘量避免不必要的再均衡的發生
再均衡監聽器ConsumerRebalanceListener,包含兩個方法:
//再均衡開始之前和消費者停止讀取訊息之後呼叫,可以處理消費位移的提交,避免一些不必要的重複消費
void onPartitionsRevoked(Collection<TopicPartition> partitions);
//重新分配分割槽和消費者開始讀取訊息之前被呼叫
void onPartitionsAssigned(Collection<TopicPartition> partitions);
下面是一個能極大程度防止重複消費的例子:
public class MessageConsumer {
private static final String TOPIC = "education-info";
private static final String BROKER_LIST = "localhost:9092";
private static KafkaConsumer<String, String> kafkaConsumer = null;
private static Map<TopicPartition, OffsetAndMetadata> currentoffsets = new HashMap<>();
static {
Properties properties = initConfig();
kafkaConsumer = new KafkaConsumer<String, String>(properties);
//訂閱訊息實現再均衡的回撥方法,在此方法中手動提交偏移量,確保再均衡前偏移量提交成功
kafkaConsumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//再均衡之前和消費者停止讀取訊息之後呼叫
kafkaConsumer.commitSync(currentoffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
}
});
}
private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
//設定不自動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return properties;
}
public static void main(String[] args) {
try {
while (true) {
//迴圈拉取訊息,100ms是等待broker返回資料的時間,超過時間沒有響應則不再等待
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
//迴圈處理
for (ConsumerRecord record : records) {
try {
System.out.println(record.value());
currentoffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
} catch (Exception e) {
e.printStackTrace();
}
}
//正常消費時,非同步提交offset
kafkaConsumer.commitAsync(currentoffsets,null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//發生異常時,手動提交
kafkaConsumer.commitSync();
} finally {
kafkaConsumer.close();
}
}
}
}
9.多執行緒實現
KafkaConsumer是非執行緒安全的,其中定義了一個acquire()方法,用來檢測當前是否只有一個執行緒在操作,acquire()方法和release()方法成對出現,表示相應的加鎖和解鎖操作
實現方式:
public class MultiConsumerThreadDemo {
private static final String TOPIC = "topic_test";
private static final String BROKER_LIST = "localhost:9092";
private static final String GROUP_ID = "group_test";
//引入共享變數參與提交,RecordHandler類處理完訊息後將消費位移儲存到offsets,KafkaConsumerThread在每次poll方法之後提交,
// 這種可能會造成資料丟失
private static Map<TopicPartition, OffsetAndMetadata> offsets;
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(properties, TOPIC, Runtime.getRuntime().availableProcessors());
consumerThread.start();
}
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
private int threadNumber;
public KafkaConsumerThread(Properties properties, String topic, int threadNumber) {
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;
this.executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (!records.isEmpty()) {
executorService.submit(new RecordHandler(records));
}
synchronized (offsets) {
if (!offsets.isEmpty()){
consumer.commitSync(offsets);
offsets.clear();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
private class RecordHandler extends Thread {
public final ConsumerRecords<String, String> records;
private RecordHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
//處理records
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = this.records.records(tp);
//處理tpRecords
long lastComsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
synchronized (offsets){
if (!offsets.containsKey(tp)){
offsets.put(tp,new OffsetAndMetadata(lastComsumedOffset + 1));
} else {
long position = offsets.get(tp).offset();
if (position < lastComsumedOffset + 1) {
offsets.put(tp,new OffsetAndMetadata(lastComsumedOffset + 1));
}
}
}
}
}
}
}
}