1. 程式人生 > >一文精通kafka 消費者的三種語義

一文精通kafka 消費者的三種語義

i++ ger side sass top exc hat ... exception

本文主要是以kafka 09的client為例子,詳解kafka client的使用,包括kafka消費者的三種消費語義at-most-once, at-least-once, 和 exactly-once message ,生產者的使用等。

(一) 創建topic

bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1

(二) 生產者

public class ProducerExample {
public static void main(String[] str) throws InterruptedException, IOException {

System.out.println("Starting ProducerExample ...");
sendMessages();
}
private static void sendMessages() throws InterruptedException, IOException {
Producer<String, String> producer = createProducer();
sendMessages(producer);
// Allow the producer to complete sending of the messages before program exit.
Thread.sleep(20);
}
private static Producer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
// Controls how much bytes sender would wait to batch up before publishing to Kafka.
props.put("batch.size", 10);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer(props);
}
private static void sendMessages(Producer<String, String> producer) {
String topic = "normal-topic";
int partition = 0;
long record = 1;
for (int i = 1; i <= 10; i++) {
producer.send(
new ProducerRecord<String, String>(topic, partition, Long.toString(record),Long.toString(record++)));
}
}
}

(三)消費者

消費者註冊到kafka有多種方式:

subscribe:這種方式在新增topic或者partition或者消費者增加或者消費者減少的時候,會進行消費者組內消費者的再平衡。

assign:這種方式註冊的消費者不會進行rebalance。

上面兩種方式都是可以實現,三種消費語義的。具體API的使用請看下文。

  1. At-most-once Kafka Consumer

做多一次消費語義是kafka消費者的默認實現。配置這種消費者最簡單的方式是

1). enable.auto.commit設置為true。

2). auto.commit.interval.ms設置為一個較低的時間範圍。

3). consumer.commitSync()不要調用該方法。

由於上面的配置,就可以使得kafka有線程負責按照指定間隔提交offset。

但是這種方式會使得kafka消費者有兩種消費語義:

a.最多一次語義->at-most-once

消費者的offset已經提交,但是消息還在處理,這個時候掛了,再重啟的時候會從上次提交的offset處消費,導致上次在處理的消息部分丟失。

b. 最少一次消費語義->at-least-once

消費者已經處理完了,但是offset還沒提交,那麽這個時候消費者掛了,就會導致消費者重復消費消息處理。但是由於auto.commit.interval.ms設置為一個較低的時間範圍,會降低這種情況出現的概率。

代碼如下:

public class AtMostOnceConsumer {
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AtMostOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic. ‘assign‘ could be used here
// instead of ‘subscribe‘ to subscribe to specific partition.
consumer.subscribe(Arrays.asList("normal-topic"));
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "true");
// Auto commit interval, kafka would commit offset at this interval.
props.put("auto.commit.interval.ms", "101");
// This is how to control number of records being read in each poll
props.put("max.partition.fetch.bytes", "135");
// Set this if you want to always read from beginning.
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void proce***ecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the message.
Thread.sleep(20);
}
}

  1. At-least-once kafka consumer

實現最少一次消費語義的消費者也很簡單。

1). 設置enable.auto.commit為false

2). 消息處理完之後手動調用consumer.commitSync()

這種方式就是要手動在處理完該次poll得到消息之後,調用offset異步提交函數consumer.commitSync()。建議是消費者內部實現密等,來避免消費者重復處理消息進而得到重復結果。最多一次發生的場景是消費者的消息處理完並輸出到結果庫(也可能是部分處理完),但是offset還沒提交,這個時候消費者掛掉了,再重啟的時候會重新消費並處理消息。

代碼如下:

public class AtLeastOnceConsumer {
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic. ‘assign‘ could be used here
// instead of ‘subscribe‘ to subscribe to specific partition.
consumer.subscribe(Arrays.asList("normal-topic"));
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "true");
// Make Auto commit interval to a big number so that auto commit does not happen,
// we are going to control the offset commit via consumer.commitSync(); after processing // message.
props.put("auto.commit.interval.ms", "999999999999");
// This is how to control number of messages being read in each poll
props.put("max.partition.fetch.bytes", "135");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void proce***ecords(KafkaConsumer<String, String> consumer) throws {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
// Below call is important to control the offset commit. Do this call after you
// finish processing the business process.
consumer.commitSync();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the record.
Thread.sleep(20);
}
}

  1. 使用subscribe實現Exactly-once

使用subscribe實現Exactly-once 很簡單,具體思路如下:

1). 將enable.auto.commit設置為false。

2). 不調用consumer.commitSync()。

3). 使用subcribe定於topic。

4). 實現一個ConsumerRebalanceListener,在該listener內部執行

consumer.seek(topicPartition,offset),從指定的topic/partition的offset處啟動。

5). 在處理消息的時候,要同時控制保存住每個消息的offset。以原子事務的方式保存offset和處理的消息結果。傳統數據庫實現原子事務比較簡單。但對於非傳統數據庫,比如hdfs或者nosql,為了實現這個目標,只能將offset與消息保存在同一行。

6). 實現密等,作為保護層。

代碼如下:

public class ExactlyOnceDynamicConsumer {
private static OffsetManager offsetManager = new OffsetManager("storage2");
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting ExactlyOnceDynamicConsumer ...");
readMessages();
}
private static void readMessages() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Manually controlling offset but register consumer to topics to get dynamically
// assigned partitions. Inside MyConsumerRebalancerListener use
// consumer.seek(topicPartition,offset) to control offset which messages to be read.
consumer.subscribe(Arrays.asList("normal-topic"),
new MyConsumerRebalancerListener(consumer));
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// Control maximum data on each poll, make sure this value is bigger than the maximum // single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void proce***ecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
// Save processed offset in external storage.
offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
}
}
}
}
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
private OffsetManager offsetManager = new OffsetManager("storage2");
private Consumer<String, String> consumer;
public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
this.consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
}
}
}
/**

  • The partition offset are stored in an external storage. In this case in a local file system where
  • program runs.
    */
    public class OffsetManager {
    private String storagePrefix;
    public OffsetManager(String storagePrefix) {
    this.storagePrefix = storagePrefix;
    }
    /**
    • Overwrite the offset for the topic in an external storage.
    • @param topic - Topic name.
    • @param partition - Partition of the topic.
    • @param offset - offset to be stored.
      */
      void saveOffsetInExternalStore(String topic, int partition, long offset) {
      try {
      FileWriter writer = new FileWriter(storageName(topic, partition), false);
      BufferedWriter bufferedWriter = new BufferedWriter(writer);
      bufferedWriter.write(offset + "");
      bufferedWriter.flush();
      bufferedWriter.close();
      } catch (Exception e) {
      e.printStackTrace();
      throw new RuntimeException(e);
      }
      }
      /**
    • @return he last offset + 1 for the provided topic and partition.
      */
      long readOffsetFromExternalStore(String topic, int partition) {
      try {
      Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
      return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
      } catch (Exception e) {
      e.printStackTrace();
      }
      return 0;
      }
      private String storageName(String topic, int partition) {
      return storagePrefix + "-" + topic + "-" + partition;
      }
      }
  1. 使用assign實現Exactly-once

使用assign實現Exactly-once 也很簡單,具體思路如下:

1). 將enable.auto.commit設置為false。

2). 不調用consumer.commitSync()。

3). 調用assign註冊kafka消費者到kafka

4). 初次啟動的時候,調用consumer.seek(topicPartition,offset)來指定offset。

5). 在處理消息的時候,要同時控制保存住每個消息的offset。以原子事務的方式保存offset和處理的消息結果。傳統數據庫實現原子事務比較簡單。但對於非傳統數據庫,比如hdfs或者nosql,為了實現這個目標,只能將offset與消息保存在同一行。

6). 實現密等,作為保護層。

代碼如下:

public class ExactlyOnceStaticConsumer {
private static OffsetManager offsetManager = new OffsetManager("storage1");
public static void main(String[] str) throws InterruptedException, IOException {
System.out.println("Starting ExactlyOnceStaticConsumer ...");
readMessages();
}
private static void readMessages() throws InterruptedException, IOException {
KafkaConsumer<String, String> consumer = createConsumer();
String topic = "normal-topic";
int partition = 1;
TopicPartition topicPartition =
registerConsumerToSpecificPartition(consumer, topic, partition);
// Read the offset for the topic and partition from external storage.
long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
// Use seek and go to exact offset for that topic and partition.
consumer.seek(topicPartition, offset);
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg2";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// control maximum data on each poll, make sure this value is bigger than the maximum // single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
/**

  • Manually listens for specific topic partition. But, if you are looking for example of how to * dynamically listens to partition and want to manually control offset then see
  • ExactlyOnceDynamicConsumer.java
    */
    private static TopicPartition registerConsumerToSpecificPartition(
    KafkaConsumer<String, String> consumer, String topic, int partition) {
    TopicPartition topicPartition = new TopicPartition(topic, partition);
    List<TopicPartition> partitions = Arrays.asList(topicPartition);
    consumer.assign(partitions);
    return topicPartition;
    }
    /**
  • Process data and store offset in external store. Best practice is to do these operations
  • atomically.
    */
    private static void proce***ecords(KafkaConsumer<String, String> consumer) throws {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
    }
    }
    }
    }

[完]

喜歡小編輕輕點個關註吧!

一文精通kafka 消費者的三種語義