1. 程式人生 > >kafka之重複消費資料

kafka之重複消費資料

在進入主題之前,我們先思考一個問題。
問題

kafka消費者使用自動提交的模式,提交間隔為2s,消費者在獲取資料的時候處理0.5s,從kafka拉取過來的資料只夠處理1秒。那麼消費者下次拉取過來的資料是否是已經消費完的資料?或者說由於資料已經消費,但是偏移量沒有被提交,是否會造成下次獲取的資料是從舊的偏移量開始拉取?

答案

不會是舊資料,kafka的消費者也有自己偏移量,這個偏移量是從kafka中讀取的量,和kafka提交的偏移量不一樣。假設變成自動提交偏移量,而且沒有寫提交的邏輯,同一個消費者,除了第一次或者rebalance會根據已提交的offset來獲取資料,剩下的時候都是根據自己本地的偏移量來獲取的。這個模式有點類似於用桶取水,用瓢來喝水。消費者就是桶的角色,poll就是瓢的角色。

重複消費的情況

我們把重複消費的情況分為2種,一種是想避免的,一種是故意如此的。

想避免的場景

  1. 消費者使用了自動提交模式,當還沒有提交的時候,有新的消費者加入或者移除,發生了rebalance。再次消費的時候,消費者會根據提交的偏移量來,於是重複消費了資料。
  2. 使用非同步提交,並且在callback裡寫了失敗重試,但是沒有注意順序。例如提交5的時候,傳送網路故障,由於是非同步,程式繼續執行,再次提交10的時候,提交成功,此時正好執行到5的重試,並且成功。當發生了rebalance,又會重複消費了資料。

注:這裡不討論那個消費者提交的offset的作用。

故意的場景

  1. 使用不同的組消費同一個topic。改個 group.id屬性即可。
  2. 自己手動提交偏移量。 這裡的麻煩的地方就是需要理解開頭的問題,並不是說你提交完就可以了。你得想個辦法去讀取那個偏移量再次消費。下面提供一個暴力的手段,關閉消費者,然後再次開啟新的。
 public static void consumer(Properties properties,String info) {
        System.out.println(info);
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(new String[]{"hello"}));
        boolean flag = false;
        while (true) {
            ConsumerRecords<String, String> poll = kafkaConsumer.poll(100);
            if (!poll.isEmpty()) {
                for (ConsumerRecord<String, String> o : poll) {
                    System.out.println(o.value() + o.offset());
                    //假設場景為重複消費3,這裡需要根據業務來提交便宜量
                    if (o.offset() == 3) {
                        //手動提交偏移量
                        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
                        //提交的偏移量,這個偏移量就是下次消費的第一條資料
                        currentOffset.put(new TopicPartition(o.topic(), o.partition()), new OffsetAndMetadata(o.offset()+1, ""));
                        kafkaConsumer.commitSync(currentOffset);
                        flag = true;
                        break;
                    }
                }
            }
            if(flag){
                kafkaConsumer.close();
                break;
            }
        }
    }

這裡也必須注意,kafka並不是資料庫,他儲存的資料有持久化的時間和大小的限制,可能你提交的偏移量的資料已經被kafka清理掉了。