kafka消費者客戶端
Kafka消費者
1.1 消費者與消費者組
消費者與消費者組之間的關係
每一個消費者都隸屬於某一個消費者組,一個消費者組可以包含一個或多個消費者,每一條訊息只會被消費者組中的某一個消費者所消費。不同消費者組之間訊息的消費是互不干擾的。
為什麼會有消費者組的概念
消費者組出現主要是出於兩個目的:
(1) 使整體的消費能力具備橫向的伸縮性。可以適當增加消費者組中消費者的數量,來提高整體的消費能力。但是每一個分割槽至多被消費者組的中一個消費者所消費,因此當消費者組中消費者數量超過分割槽數時,多出的消費者不會分配到任何一個分割槽。當然這是預設的分割槽分配策略,可通過partition.assignment.strategy進行配置。
(2) 實現訊息消費的隔離。不同消費者組之間訊息消費互不干擾,從而實現釋出訂閱這種訊息投遞模式。
注意:
消費者隸屬的消費者組可以通過group.id進行配置。消費者組是一個邏輯上的概念,但消費者並不是一個邏輯上的概念,它可以是一個執行緒,也可以是一個程序。同一個消費者組內的消費者可以部署在同一臺機器上,也可以部署在不同的機器上。
1.2 消費者客戶端開發
一個正常的消費邏輯需要具備以下幾個步驟:
配置消費者客戶端引數及建立相應的消費者例項。
訂閱主題
拉取訊息並消費
提交消費位移
關閉消費者例項
public class KafkaConsumerAnalysis {public static final String brokerList="node112:9092,node113:9092,node114:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
return prop;
}
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition =" + record.partition() + ", offset = " + record.offset());
System.out.println("key = " + record.key() + ", value = " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
consumer.close();
}
}
}
1.2.1 訂閱主題和分割槽
先來說一下消費者訂閱訊息的粒度:一個消費者可以訂閱一個主題、多個主題、或者多個主題的特定分割槽。主要通過subsribe和assign兩個方法實現訂閱。
(1)訂閱一個主題:
public void subscribe(Collection<String> topics),當集合中有一個主題時。
(2)訂閱多個主題:
public void subscribe(Collection<String> topics),當集合中有多個主題時。
public void subscribe(Pattern pattern),通過正則表示式實現消費者主題的匹配。通過這種方式,如果在訊息消費的過程中,又添加了新的能夠匹配到正則的主題,那麼消費者就可以消費到新新增的主題。 consumer.subscribe(Pattern.compile("topic-.*"));
(3)多個主題的特定分割槽
public void assign(Collection<TopicPartition> partitions),可以實現訂閱某些特定的主題分割槽。TopicPartition包括兩個屬性:topic(String)和partition(int)。
如果事先不知道有多少分割槽該如何處理,KafkaConsumer中的partitionFor方法可以獲得指定主題分割槽的元資料資訊:
public List<PartitionInfo> partitionsFor(String topic)
PartitionInfo的屬性如下:
public class PartitionInfo {
private final String topic;//主題
private final int partition;//分割槽
private final Node leader;//分割槽leader
private final Node[] replicas;//分割槽的AR
private final Node[] inSyncReplicas;//分割槽的ISR
private final Node[] offlineReplicas;//分割槽的OSR
}
因此也可以通過這個方法實現某個主題的全部訂閱。
需要指出的是,subscribe(Collection)、subscirbe(Pattern)、assign(Collection)方法分別代表了三種不同的訂閱狀態:AUTO_TOPICS、AUTO_PATTREN和USER_ASSIGN,這三種方式是互斥的,消費者只能使用其中一種,否則會報出IllegalStateException。
subscirbe方法可以實現消費者自動再平衡的功能。多個消費者的情況下,可以根據分割槽分配策略自動分配消費者和分割槽的關係,當消費者增加或減少時,也能實現負載均衡和故障轉移。
如何實現取消訂閱:
consumer.unsubscribe()
1.2.2 反序列化
KafkaProducer端生產訊息進行序列化,同樣消費者就要進行相應的反序列化。相當於根據定義的序列化格式的一個逆序提取資料的過程。
import com.gdy.kafka.producer.Company;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CompanyDeserializer implements Deserializer<Company> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Company deserialize(String topic, byte[] data) {
if(data == null) {
return null;
}
if(data.length < 8) {
throw new SerializationException("size of data received by Deserializer is shorter than expected");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLength = buffer.getInt();
byte[] nameBytes = new byte[nameLength];
buffer.get(nameBytes);
int addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressBytes);
String name,address;
try {
name = new String(nameBytes,"UTF-8");
address = new String(addressBytes,"UTF-8");
}catch (UnsupportedEncodingException e) {
throw new SerializationException("Error accur when deserializing");
}
return new Company(name, address);
}
@Override
public void close() {
}
}
實際生產中需要自定義序列化器和反序列化器時,推薦使用Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具來包裝。
1.2.3 訊息消費
Kafka中訊息的消費是基於拉模式的,kafka訊息的消費是一個不斷輪旋的過程,消費者需要做的就是重複的呼叫poll方法。
public ConsumerRecords<K, V> poll(final Duration timeout)
這個方法需要注意的是,如果消費者的緩衝區中有可用的資料,則會立即返回,否則會阻塞至timeout。如果在阻塞時間內緩衝區仍沒有資料,則返回一個空的訊息集。timeout的設定取決於應用程式對效應速度的要求。如果應用執行緒的位移工作是從Kafka中拉取資料並進行消費可以將這個引數設定為Long.MAX_VALUE。
每次poll都會返回一個ConsumerRecords物件,它是ConsumerRecord的集合。對於ConsumerRecord相比於ProducerRecord多了一些屬性:
private final String topic;//主題
private final int partition;//分割槽
private final long offset;//偏移量
private final long timestamp;//時間戳
private final TimestampType timestampType;//時間戳型別
private final int serializedKeySize;//序列化key的大小
private final int serializedValueSize;//序列化value的大小
private final Headers headers;//headers
private final K key;//key
private final V value;//value
private volatile Long checksum;//CRC32校驗和
另外我們可以按照分割槽維度對訊息進行消費,通過ConsumerRecords.records(TopicPartiton)方法實現。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
Set<TopicPartition> partitions = records.partitions();
for (TopicPartition tp : partitions) {
for (ConsumerRecord<String, String> record : records.records(tp)) {
System.out.println(record.partition() + " ," + record.value());
}
}
另外還可以按照主題維度對訊息進行消費,通過ConsumerRecords.records(Topic)實現。
for (String topic : topicList) {
for (ConsumerRecord<String, String> record : records.records(topic)) {
System.out.println(record.partition() + " ," + record.value());
}
}
1.2.4 消費者位移提交
首先要 明白一點,消費者位移是要做持久化處理的,否則當發生消費者崩潰或者消費者重平衡時,消費者消費位移無法獲得。舊消費者客戶端是將位移提交到zookeeper上,新消費者客戶端將位移儲存在Kafka內部主題_consumer_offsets中。
KafkaConsumer提供了兩個方法position(TopicPatition)和commited(TopicPartition)。
public long position(TopicPartition partition)-----獲得下一次拉取資料的偏移量
public OffsetAndMetadata committed(TopicPartition partition)-----給定分割槽的最後一次提交的偏移量。
還有一個概念稱之為lastConsumedOffset,這個指的是最後一次消費的偏移量。
在kafka提交方式有兩種:自動提交和手動提交。
(1)自動位移提交
kafka預設情況下采用自動提交,enable.auto.commit的預設值為true。當然自動提交併不是沒消費一次訊息就進行提交,而是定期提交,這個定期的週期時間由auto.commit.intervals.ms引數進行配置,預設值為5s,當然這個引數生效的前提就是開啟自動提交。
自動提交會造成重複消費和訊息丟失的情況。重複消費很容易理解,因為自動提交實際是延遲提交,因此很容易造成重複消費,然後訊息丟失是怎麼產生的?
(2)手動位移提交
開始手動提交的需要配置enable.auto.commit=false。手動提交消費者偏移量,又可分為同步提交和非同步提交。
同步提交:
同步提交很簡單,呼叫commitSync() 方法:
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//consume message
consumer.commitSync();
}
}
這樣,每消費一條訊息,提交一個偏移量。當然可用過快取訊息的方式,實現批量處理+批量提交:
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBaches) {
for (ConsumerRecord<String, String> record : records) {
//consume message
}
consumer.commitSync();
buffer.clear();
}
}
還可以通過public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)這個方法實現按照分割槽粒度進行同步提交。
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
for (ConsumerRecord record : partitionRecords) {
//consume message
}
long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(tp,new OffsetAndMetadata(lastConsumerOffset+1)));
}
}
非同步提交:
commitAsync非同步提交的時候消費者執行緒不會被阻塞,即可能在提交偏移量的結果還未返回之前,就開始了新一次的拉取資料操作。非同步提交可以提升消費者的效能。commitAsync有三個過載:
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback )
對照同步提交的方法引數,多了一個Callback回撥引數,它提供了一個非同步提交的回撥方法,當消費者位移提交完成後回撥OffsetCommitCallback的onComplement方法。以第二個方法為例:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//consume message
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e == null) {
System.out.println(offsets);
}else {
e.printStackTrace();
}
}
});
1.2.5 控制和關閉消費
kafkaConsumer提供了pause()和resume() 方法分別實現暫停某些分割槽在拉取操作時返回資料給客戶端和恢復某些分割槽向客戶端返回資料的操作:
public void pause(Collection<TopicPartition> partitions)
public void resume(Collection<TopicPartition> partitions)
優雅停止KafkaConsumer退出消費者迴圈的方式:
(1)不要使用while(true),而是使用while(isRunning.get()),isRunning是一個AtomicBoolean型別,可以在其他地方呼叫isRunning.set(false)方法退出迴圈。
(2)呼叫consumer.wakup()方法,wakeup方法是KafkaConsumer中唯一一個可以從其他執行緒裡安全呼叫的方法,會丟擲WakeupException,我們不需要處理這個異常。
跳出迴圈後一定要顯示的執行關閉動作和釋放資源。
1.2.6 指定位移消費
KafkaConsumer可通過兩種方式實現實現不同粒度的指定位移消費。第一種是通過auto.offset.reset引數,另一種通過一個重要的方法seek。
(1)auto.offset.reset
auto.offset.reset這個引數總共有三種可配置的值:latest、earliest、none。如果配置不在這三個值當中,就會丟擲ConfigException。
latest:當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,消費新產生的該分割槽下的資料
earliest:當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,從頭開始消費
none:topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset或位移越界,則丟擲NoOffsetForPartitionException異常
訊息的消費是通過poll方法進行的,poll方法對於開發者來說就是一個黑盒,無法精確的掌控消費的起始位置。即使通過auto.offsets.reset引數也只能在找不到位移或者位移越界的情況下粗粒度的從頭開始或者從末尾開始。因此,Kafka提供了另一種更細粒度的消費掌控:seek。
(2)seek
seek可以實現追前消費和回溯消費:
public void seek(TopicPartition partition, long offset)
可以通過seek方法實現指定分割槽的消費位移的控制。需要注意的一點是,seek方法只能重置消費者分配到的分割槽的偏移量,而分割槽的分配是在poll方法中實現的。因此在執行seek方法之前需要先執行一次poll方法獲取消費者分配到的分割槽,但是並不是每次poll方法都能獲得資料,所以可以採用如下的方法。
consumer.subscribe(topicList);
Set<TopicPartition> assignment = new HashSet<>();
while(assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();//獲取消費者分配到的分割槽,沒有獲取返回一個空集合
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10); //重置指定分割槽的位移
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//consume record
}
如果對未分配到的分割槽執行了seek方法,那麼會報出IllegalStateException異常。
在前面我們已經提到,使用auto.offsets.reset引數時,只有當消費者分配到的分割槽沒有提交的位移或者位移越界時,才能從earliest消費或者從latest消費。seek方法可以彌補這一中情況,實現任意情況的從頭或從尾部消費。
Set<TopicPartition> assignment = new HashSet<>();
while(assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);//獲取指定分割槽的末尾位置
for (TopicPartition tp : assignment) {
consumer.seek;
}
與endOffset對應的方法是beginningOffset方法,可以獲取指定分割槽的起始位置。其實kafka已經提供了一個從頭和從尾消費的方法。
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)
還有一種場景是這樣的,我們並不知道特定的消費位置,卻知道一個相關的時間點。為解決這種場景遇到的問題,kafka提供了一個offsetsForTimes()方法,通過時間戳來查詢分割槽消費的位移。
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
}
//獲得指定分割槽指定時間點的消費位移
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
由於seek方法的存在,使得消費者的消費位移可以儲存在任意的儲存介質中,包括DB、檔案系統等。
1.2.7 消費者的再均衡
再均衡是指分割槽的所屬權從一個消費者轉移到另一消費者的行為,它為消費者組具備高可用和伸縮性提高保障。不過需要注意的地方有兩點,第一是消費者發生再均衡期間,消費者組中的消費者是無法讀取訊息的。第二點就是消費者發生再均衡可能會引起重複消費問題,所以一般情況下要儘量避免不必要的再均衡。
KafkaConsumer的subscribe方法中有一個引數為ConsumerRebalanceListener,我們稱之為再均衡監聽器,它可以用來在設定發生再均衡動作前後的一些準備和收尾動作。
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
onPartitionsRevoked方法會在再均衡之前和消費者停止讀取訊息之後被呼叫。可以通過這個回撥函式來處理消費位移的提交,以避免重複消費。引數partitions表示再均衡前分配到的分割槽。
onPartitionsAssigned方法會在再均衡之後和消費者消費之間進行呼叫。引數partitons表示再均衡之後所分配到的分割槽。
consumer.subscribe(topicList);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(topicList, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);//提交偏移量
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do something
}
});
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//process records