kafka 從分割槽任意位置、分割槽開頭、分割槽末尾開始消費資料
最近就kafka消費者消費資料時,消費者提交的offset與同事們有一些分歧和討論,這裡記錄一下自己的研究。
我們知道redis和kafka都可以作為訊息佇列使用,都可以完成釋出訂閱功能,但是kafka相較於redis可以實現訂閱訊息的儲存,可以實現訂閱訊息的任意位置消費,更重要的時kafka訂閱訊息是可以儲存到磁碟上的,而redis訂閱訊息是無法儲存磁碟的。
(1)消費者消費資料時加入一個消費者分組之後,可以通過 subscribe函式訂閱某個topic,這時這個消費者進入brokers的group management管理機制,同一個分片只能被一個分組中的消費者消費,如果同一個分片希望被多個消費者消費,需要將多個消費者放入到不同的消費者分組中。
//訂閱指定的topic
consumer.subscribe(Arrays.asList(topic));
(2)還有一種消費資料的方式是可以通過assign函式指定要消費的分割槽資料,這種方式可以指定從分割槽的任意位置開始消費資料,當然這種//消費者指定要消費的分割槽,指定分割槽之後消費者崩潰之後 不會引發分割槽reblance
consumer.assign(list);
消費資料的方式,如果消費者奔潰之後,不會引發分割槽reblance,也就是說assign的consumer不會擁有kafka的group management機制。
我們上面說過,同一個分片只能由消費者分組中的同一個消費者進行消費,假設當消費者A使用assign指定分割槽進行消費時,如果這時消費者A使用的分組group B,是通過subscribe訂閱了這個主題的分片時,由於消費者A不加入group management,它相當於一個獨立的臨時消費者,這時消費者A也是可以正常消費的,看起來就是一個分片被一個消費者組中的多個消費者消費一樣。
(3)我們還可以配置如下屬性auto.offset.reset來,設定消費者從分割槽的開頭或者末尾進行消費資料。當然這也是有條件的。
//一般配置earliest 或者latest 值
props.put("auto.offset.reset", "latest");
我把上述三種情況的消費者不同使用方式下,消費者提交offset的情況進行了歸總和說明:
早在kafka0.8.2.2版本的時候,kafka已經支援訊息offset存在brokers中,只不過預設是將offset儲存到zookeeper中。kafka現在最新發布的版本都是預設將資料儲存到brokers中。我的程式碼示例是使用了kafka0.10.0.0版本,當我們這裡通過assign函式分配指定的分割槽時
下面是我的測試程式碼,有興趣的同學可以檢視和驗證上述結論:
/**
*
* @author yujie.wang
* kafka生產者示例程式碼
*/
public class Producer_Sample {
//kafka叢集機器
private static final String KAFKA_HOSTS = "10.4.30.151:9092,10.4.30.151:9093,10.4.30.151:9094";
//topic名稱
private static final String TOPIC = "my-replicated-topic_2";
public static void main(String[] args) {
// TODO Auto-generated method stub
Producer_Sample producer = new Producer_Sample();
producer.producer_send(TOPIC);
System.out.println("end");
}
/**
* 生產者生產資料
* 傳送訊息是非同步進行,一旦訊息被儲存到分割槽快取中,send方法就返回
* 一旦訊息被接收 就會呼叫callBack
* @param topic
*/
public void producer_send(String topic){
Properties props = new Properties();
//kafka叢集機器
props.put("bootstrap.servers", KAFKA_HOSTS);
//生產者傳送的資料需要等待主分片和其副本都儲存才發回確認訊息
props.put("acks", "all");
//生產者傳送失敗後的確認訊息
props.put("retries", 0);
//生產者 每個分割槽快取大小 16K
props.put("batch.size", 16384);
//生產者傳送分割槽快取中資料前停留時間
props.put("linger.ms", 1);
//生產者可用快取總量大小 32M
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
for(int i = 220; i < 230; i++){
//傳送訊息是非同步進行,一旦訊息被儲存到分割槽快取中,send方法就返回
// producer.send(new ProducerRecord<String, String>("my-replicated-topic_1", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<String, String>(topic, "call___"+Integer.toString(i+20), "call___"+Integer.toString(i)),
new Call());
System.out.println("send return I: "+ i);
}
producer.close();
}
/**
*訊息被儲存之後的回撥方法
*/
class Call implements Callback{
@Override
public void onCompletion(RecordMetadata recordmetadata,
Exception exception) {
// TODO Auto-generated method stub
System.out.println("callBack: "+ recordmetadata.checksum() + " recordmetadata content : "+recordmetadata.toString());
}
}
}
/**
* @author yujie.wang
* kafka消費者示例,包含隨機位置消費和最多一次消費方式
* 消費者提交消費資料offset 分為自動提交和手動控制提交
*
* 這份程式碼示例中包含了 多種從kafka的任意位置獲取資料的方式
*/
public class Consumer_Sample {
//kafka叢集機器
private static final String KAFKA_HOSTS = "10.4.30.151:9092,10.4.30.151:9093,10.4.30.151:9094";
//topic名稱
private static final String TOPIC = "my-replicated-topic_2";
public static void main(String[] args) {
// TODO Auto-generated method stub
Consumer_Sample consumer = new Consumer_Sample();
//從分割槽的末尾 或者已存在groupid的請情況下從未消費位置開始消費資料
consumer.consumerSubscribe("true", TOPIC);
// 通過實現ConsumerRebalanceListener介面 進而時間任意位置的消費
consumer.consumerSubscribeImplListener("true", TOPIC);
//從指定的分割槽 開始位置seekToBeginning 或者任意位置seek消費資料
consumer.consumerAssin("true", TOPIC);
//通過配置屬性auto.offset.reset 來設定消費者從分割槽開頭或者末尾進行消費,但是需要使用一定條件的group Id
consumer.consumerAutoOffsetReset("true", TOPIC);
System.out.println("consumer end");
}
/**
* 直接通過訂閱一個指定分割槽來消費資料
* (1)如果該groupId消費者分組下 有消費者提交過offset,則從 當前提交的offset位置開始消費資料
* (2)如果該groupId消費者分組下 沒有有消費者提交過offset,則從 當前log新增的最後位置(也就是資料的末尾)開始消費資料
* @param isAutoCommitBool
* @param topic
*/
public void consumerSubscribe(final String isAutoCommitBool, final String topic){
Properties props = new Properties();
//配置kafka叢集機器
props.put("bootstrap.servers", KAFKA_HOSTS);
//消費者分組
props.put("group.id", "yujie37");
//這裡設定 消費者自動提交已消費訊息的offset
props.put("enable.auto.commit", isAutoCommitBool);
// 設定自動提交的時間間隔為1000毫秒
props.put("auto.commit.interval.ms", "1000");
// 設定每次poll的最大資料個數
props.put("max.poll.records", 5);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱topic
consumer.subscribe(Arrays.asList(topic));
List<PartitionInfo> parList = consumer.partitionsFor(topic);
//打印出分割槽資訊
printPartition(parList);
//消費資料
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5000);
System.out.println("topic: "+topic + " pool return records size: "+ records.count());
for (ConsumerRecord<String, String> record : records){
System.out.println(record.toString());
//手動提交已消費資料的offset
if("false".equalsIgnoreCase(isAutoCommitBool)){
consumer.commitSync();
}
}
}
}
/**
*
* @param isAutoCommitBool true 開啟自動提交offset;false 不開啟
* @param topic
* (1)如果該groupId消費者分組下 有消費者提交過offset,則從 當前提交的offset位置開始消費資料
* (2)如果該groupId消費者分組下 沒有有消費者提交過offset,則從 當前log新增的最後位置(也就是資料的末尾)開始消費資料
*
* 注意如果enable.auto.commit 設定為false,如果消費完資料沒有提交已消費資料的offset,
* 則會出現重複消費資料的情況
*
* 通過實現ConsumerRebalanceListener介面中的onPartitionsAssigned方法,並在其中呼叫消費者的seek或者seekToBeginning
* 方法定位分割槽的任意位置或者開頭位置
*/
public void consumerSubscribeImplListener(final String isAutoCommitBool, final String topic){
Properties props = new Properties();
//配置kafka叢集機器
props.put("bootstrap.servers", KAFKA_HOSTS);
//消費者分組
props.put("group.id", "yujie26");
//這裡設定 消費者自動提交已消費訊息的offset
props.put("enable.auto.commit", isAutoCommitBool);
// 設定自動提交的時間間隔為1000毫秒
props.put("auto.commit.interval.ms", "1000");
// 設定每次poll的最大資料個數
props.put("max.poll.records", 5);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱topic,並實現ConsumerRebalanceListener
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(//分割槽撤銷時,消費者可以向該分割槽提交自己當前的offset
Collection<TopicPartition> collection) {
// TODO Auto-generated method stub
if("false".equalsIgnoreCase(isAutoCommitBool)){
//consumer.commitSync();
}
}
@Override
public void onPartitionsAssigned(//當分割槽分配給消費者時,消費者可以通過該方法重新定位需要消費的資料位置
Collection<TopicPartition> collection) {
// TODO Auto-generated method stub
//將消費者定位到各個分割槽的開始位置進行消費
/* consumer.seekToBeginning(collection);
System.out.println("seek beg");*/
Iterator it = collection.iterator();
while(it.hasNext()){
//將消費者定位到指定分割槽的指定位置7進行消費
consumer.seek((TopicPartition)it.next(), 7);
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5000);
System.out.println("topic: "+topic + "pool return records size: "+ records.count());
for (ConsumerRecord<String, String> record : records){
System.out.println(record.toString());
//手動提交已消費資料的offset
if("false".equalsIgnoreCase(isAutoCommitBool)){
consumer.commitSync();
}
}
}
}
/**
*
* @param isAutoCommitBool true 開啟自動提交offset;false 不開啟
* @param topic
* 如果groupId之前存在 , 則從之前提交的最後消費資料的offset處繼續開始消費資料
* 如果groupId之前不存在,則從當前分割槽的最後位置開始消費
*
* 注意如果enable.auto.commit 設定為false,如果消費完資料沒有提交已消費資料的offset,
* 則會出現重複消費資料的情況
*/
public void consumerAutoOffsetReset(final String isAutoCommitBool, final String topic){
Properties props = new Properties();
//配置kafka叢集機器
props.put("bootstrap.servers", KAFKA_HOSTS);
//消費者分組
props.put("group.id", "yujie32");
//這裡設定 消費者自動提交已消費訊息的offset
props.put("enable.auto.commit", isAutoCommitBool);
// 設定自動提交的時間間隔為1000毫秒
props.put("auto.commit.interval.ms", "1000");
// 設定每次poll的最大資料個數
props.put("max.poll.records", 5);
//設定使用最開始的offset偏移量為該group.id的最早。如果不設定,則會是latest即該topic最新一個訊息的offset
//如果採用latest,消費者只能得道其啟動後,生產者生產的訊息
//一般配置earliest 或者latest 值
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱topic,並實現ConsumerRebalanceListener
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5000);
System.out.println("topic: "+topic + "pool return records size: "+ records.count());
for (ConsumerRecord<String, String> record : records){
System.out.println(record.toString());
//手動提交已消費資料的offset
if("false".equalsIgnoreCase(isAutoCommitBool)){
consumer.commitSync();
}
}
}
}
/**
* 通過assign分配的分割槽,消費者發生故障 Server端不會觸發分割槽重平衡(即使該消費者共享某個已有的groupId),每個消費者都是獨立工作的
* 為了避免offset提交衝突,需要確保每個消費者都有唯一的groupId
* 從指定的分割槽的開頭開始消費資料
* @param isAutoCommitBool true 開啟自動提交offset;false 不開啟
* @param topic
*/
public void consumerAssin(String isAutoCommitBool,String topic){
Properties props = new Properties();
//配置kafka叢集機器
props.put("bootstrap.servers", KAFKA_HOSTS);
//消費者分組
props.put("group.id", "yujie35");
//這裡設定 消費者自動提交已消費訊息的offset
props.put("enable.auto.commit", isAutoCommitBool);
// 設定自動提交的時間間隔為1000毫秒
props.put("auto.commit.interval.ms", "1000");
// 設定每次poll的最大資料個數
props.put("max.poll.records", 5);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//獲得topic的所有分割槽
List<PartitionInfo> parList = consumer.partitionsFor(topic);
//打印出分割槽資訊
printPartition(parList);
List<TopicPartition> list = new ArrayList<TopicPartition>();
for(PartitionInfo par : parList){
TopicPartition partition = new TopicPartition(topic, par.partition());
list.add(partition);
}
//消費者指定要消費的分割槽,指定分割槽之後消費者崩潰之後 不會引發分割槽reblance
consumer.assign(list);
//從list中所有分割槽的開頭開始消費資料,這個操作不改變已提交的消費資料的offset
// consumer.seekToBeginning(list);
/* for(TopicPartition tpar:list ){
//consumer.seek(tpar, position);
} */
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5000);
System.out.println("topic: "+topic + " pool return records size: "+ records.count());
for (ConsumerRecord<String, String> record : records){
System.out.println(record.toString());
//手動提交已消費資料的offset
if("false".equalsIgnoreCase(isAutoCommitBool)){
consumer.commitSync();
}
}
}
}
public void printPartition(List<PartitionInfo> parList){
for(PartitionInfo p : parList){
System.out.println(p.toString());
}
}
/**
* 單獨處理每個分割槽中的資料,處理完了之後非同步提交offset,注意提交的offset是程式將要讀取的下一條訊息的offset
* @param consumer
*/
public void handlerData(KafkaConsumer<String, String> consumer){
boolean running = true;
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//注意提交的offset是程式將要讀取的下一條訊息的offset
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
}
/**
* 關閉消費者
* @param consumer
*/
public void closeConsumer(KafkaConsumer<String, String> consumer){
if(consumer != null){
consumer.close();
}
}
}