Kafka精確一次
kafka是一個高效能的訊息中介軟體,支援實時,批量,和流處理方式,現已被很多公司應用於web級別的應用上。本篇文章展示了怎麼利用kafka的api來建立客戶端程式,並且展示了三種語義的客戶端的建立方法:至多一次(at-most-once),至少一次( at-least-once),精確一次(and exactly-once )。
首先,在你本機上安裝kafka,快速開始點這裡,這裡假設你們已經安裝了kafka並且在執行,Zookeeper 預設埠2181, kafka預設埠 9092. 當kafka執行起來之後,建立一個名為”normal-topic”,分割槽數為2的topic,命令如下:
bin/kafka-topics –zookeeper localhost:2181 –create –topic normal-topic –partitions 2 –replication-factor 1
檢視建立的topic狀態:
bin/kafka-topics –list –topic normal-topic –zookeeper localhost:2181
好了,前提步驟做完,接下來是建立kafka客戶端。
1.Producer
producer是往topic裡傳送訊息的,consumer則負責接受topic的訊息並進行處理,producer程式碼如下:
public class ProducerExample {
public static void main(String[] str) throws InterruptedException, IOException {
System.out .println("Starting ProducerExample ...");
sendMessages();
}
private static void sendMessages() throws InterruptedException, IOException {
Producer<String, String> producer = createProducer();
sendMessages(producer);
// Allow the producer to complete sending of the messages before program exit.
Thread.sleep(20);
}
private static Producer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
// 批量傳送個數
props.put("batch.size", 10);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer(props);
}
private static void sendMessages(Producer<String, String> producer) {
String topic = "normal-topic";
int partition = 0;
long record = 1;
for (int i = 1; i <= 10; i++) {
producer.send(
new ProducerRecord<String, String>(topic, partition, Long.toString(record),Long.toString(record++)));
}
}
}
2.Consumer
consumer註冊到kafka的幾種方法:
>1. subscribe, 當有consumer通過subcscribe註冊的時候,kafka會進行負載均衡,(topic的增加或刪除也會導致負載均衡)這個方法有兩個:
(a) 2引數,一個topic, 一個listener. 以這種方式註冊,當負載均衡的時候,kafka會通知這個consumer. 這個listener可以使手動管理offset(要做到精確一次必須手動管理offset,至少現在版本是)
(b)只有一個topic引數,
>2. assign方法。使用這個方法,kafka不會進行負載均衡。
下面的至少一次,至多一次,都是用的1(b)中的方法,精確一次有兩種方式,1(a)和 2。
至多一次 (0或1次)
kafka consumer是預設至多一次,consumer的配置是:
>1. 設定‘enable.auto.commit’ 為 true.
>2. 設定 ‘auto.commit.interval.ms’ 為一個較小的值.
>3. consumer不去執行 consumer.commitSync(), 這樣, Kafka 會每隔一段時間自動提交offset。
public class AtMostOnceConsumer {
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AtMostOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic. 'assign' could be used here
// instead of 'subscribe' to subscribe to specific partition.
consumer.subscribe(Arrays.asList("normal-topic"));
processRecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "true");
// Auto commit interval, kafka would commit offset at this interval.
props.put("auto.commit.interval.ms", "101");
// This is how to control number of records being read in each poll
props.put("max.partition.fetch.bytes", "135");
// Set this if you want to always read from beginning.
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the message.
Thread.sleep(20);
}
}
至少一次 (一或多次)
>1. 設定‘enable.auto.commit’ 為 false 或者
設定‘enable.auto.commit’ 為 true 並設定‘auto.commit.interval.ms’ 為一個較大的值.
>2. 處理完後consumer呼叫 consumer.commitSync()
public class AtLeastOnceConsumer {
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic. 'assign' could be used here
// instead of 'subscribe' to subscribe to specific partition.
consumer.subscribe(Arrays.asList("normal-topic"));
processRecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "true");
// Make Auto commit interval to a big number so that auto commit does not happen,
// we are going to control the offset commit via consumer.commitSync(); after processing // message.
props.put("auto.commit.interval.ms", "999999999999");
// This is how to control number of messages being read in each poll
props.put("max.partition.fetch.bytes", "135");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) throws {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
// Below call is important to control the offset commit. Do this call after you
// finish processing the business process.
consumer.commitSync();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the record.
Thread.sleep(20);
}
}
精確一次
下例展示了kafka的精確一次語義,consumer通過subscribe方法註冊到kafka,精確一次的語義要求必須手動管理offset,按照下述步驟進行設定:
1.設定enable.auto.commit = false;
2.處理完訊息之後不要手動提交offset,
3.通過subscribe方法將consumer註冊到某個特定topic,
4.實現ConsumerRebalanceListener介面和consumer.seek(topicPartition,offset)方法(讀取特定topic和partition的offset)
5.將offset和訊息一塊儲存,確保原子性,推薦使用事務機制。
public class ExactlyOnceDynamicConsumer {
private static OffsetManager offsetManager = new OffsetManager("storage2");
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting ManualOffsetGuaranteedExactlyOnceReadingDynamicallyBalancedPartitionConsumer ...");
readMessages();
}
/**
*/
private static void readMessages() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Manually controlling offset but register consumer to topics to get dynamically assigned partitions.
// Inside MyConsumerRebalancerListener use consumer.seek(topicPartition,offset) to control offset
consumer.subscribe(Arrays.asList("normal-topic"), new MyConsumerRebalancerListener(consumer));
processRecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// Control maximum data on each poll, make sure this value is bigger than the maximum single record size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
}
}
}
}
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
private OffsetManager offsetManager = new OffsetManager("storage2");
private Consumer<String, String> consumer;
public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
this.consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
}
}
}
public class OffsetManager {
private String storagePrefix;
public OffsetManager(String storagePrefix) {
this.storagePrefix = storagePrefix;
}
/**
* Overwrite the offset for the topic in an external storage.
*
* @param topic - Topic name.
* @param partition - Partition of the topic.
* @param offset - offset to be stored.
*/
void saveOffsetInExternalStore(String topic, int partition, long offset) {
try {
FileWriter writer = new FileWriter(storageName(topic, partition), false);
BufferedWriter bufferedWriter = new BufferedWriter(writer);
bufferedWriter.write(offset + "");
bufferedWriter.flush();
bufferedWriter.close();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* @return he last offset + 1 for the provided topic and partition.
*/
long readOffsetFromExternalStore(String topic, int partition) {
try {
Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
private String storageName(String topic, int partition) {
return storagePrefix + "-" + topic + "-" + partition;
}
}
這裡展示的示例是將offset儲存到檔案中,如果要儲存到資料庫中的話,需要修改offsetmanger類,將offset寫入資料庫。