1. 程式人生 > >Kafka新版消費者API示例(二)

Kafka新版消費者API示例(二)

kafka手動提交策略提供了更加靈活的管理方式,在某些場景我們需要對消費偏移量有更精準的管理。以保證訊息不被重複消費以及訊息不丟失。

Kafka提供兩種手動提交方式:

1.非同步提交(commitAsync):

   非同步模式下,提交失敗也不會嘗試提交消費者執行緒不會被阻塞,因為非同步操作,可能在提交偏移量操作結果未返回時就開始下一次拉取操作。

2.同步提交(CommitSync):

    同步模式下,提交失敗時一直嘗試提交,直到遇到無法重試才結束。同步方式下,消費者執行緒在拉取訊息時會被阻塞,直到偏移量提交操作成功或者在提交過程中發生錯誤。

實現手動提交前需要在建立消費者時關閉自動提交,設定enable.auto.commit=false。

由於非同步提交不會等消費偏移量提交成功後再拉取下一次訊息,因此非同步提交提供了一個偏移量提交回調方法commitAsync(OffsetCommitCallback callback)。提交偏移量完成之後會回撥OffsetCommitCallback介面的onComplete()方法

示例程式碼:

package com.simon.kafka.consumer.newconsumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

/**
 * Created by Simon on 2018/11/5.
 */
public class KafkaConsumerAsync {

    public static void main(String[] args) throws InterruptedException {

        // 1、準備配置檔案
        String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
        Properties props = new Properties();
        //kafka連線資訊
        props.put("bootstrap.servers",kafkas);
        //消費者組id
        props.put("group.id", "test_group");
        //是否自動提交offset
        props.put("enable.auto.commit", "false");
        //在沒有offset的情況下采取的拉取策略
        props.put("auto.offset.reset", "none");
        //自動提交時間間隔
        props.put("auto.commit.interval.ms", "1000");
        //設定一次fetch請求取得的資料最大為1k
        props.put("fetch.max.bytes", "1024");
        //key反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value反序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        String topic = "test";
        // 2、建立KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 3、訂閱資料,不給定監聽器
        consumer.subscribe(Collections.singleton(topic));

        try{
            //最少處理100條
            int minCommitSize = 100;
            //定義計數器
            int icount = 0;
            // 4、獲取資料
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
                    icount++;
                }
                Thread.sleep(5000);
            
                //在業務邏輯處理成功後提交offset
                if(icount >= minCommitSize){
                    //滿足最少消費100條,再進行非同步提交
                    consumer.commitAsync(new OffsetCommitCallback() {
                        @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                            if(exception == null){
                                System.out.println("commit success");
                            }else {
                                //提交失敗,對應處理
                                System.out.println("commit failed");
                            }
                        }
                    });
                    
                    //計數器歸零
                    icount = 0 ;
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //關閉連線
            consumer.close();
        }
    }
}

以時間戳查詢訊息:

    Kafka在0.10.1.1版本上增加了時間戳索引檔案。Kafka消費者API提供了一個offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,引數為一個map物件,key為待查詢的分割槽,value為待查詢的時間戳。會返回一個大於等於該事件戳的第一條訊息對應的偏移量和時間戳。若待查詢分割槽不存在,會一直阻塞。

 示例:

   將kafka-client的maven依賴改為1.0.0 。在0.10.0.1中無法引入OffsetAndTimestamp類

 <!--引入kafka-clients-->
    <!--<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.0.1</version>
    </dependency>-->

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.0</version>
    </dependency>

程式碼:

package com.simon.kafka.consumer.newconsumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Simon on 2018/11/5.
 */
public class KafkaConsumerTimestamps {

    public static void main(String[] args) throws InterruptedException {

        // 1、準備配置檔案
        String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
        Properties props = new Properties();
        //kafka連線資訊
        props.put("bootstrap.servers",kafkas);
        //消費者組id
        props.put("group.id", "test_group");
        //客戶端id
        props.put("client.id", "test_group");
        //是否自動提交offset
        props.put("enable.auto.commit", "true");
        //在沒有offset的情況下采取的拉取策略
        props.put("auto.offset.reset", "none");
        //自動提交時間間隔
        props.put("auto.commit.interval.ms", "1000");
        //設定一次fetch請求取得的資料最大為1k
        props.put("fetch.max.bytes", "1024");
        //key反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value反序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        String topic = "test";
        // 2、建立KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 3、訂閱主題
        TopicPartition topicPartition = new TopicPartition(topic,0);
        consumer.assign(Collections.singleton(topicPartition));
        try{
            Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();

            // 設定查詢12 小時之前訊息的偏移量
            timestampsToSearch.put(topicPartition, (System.currentTimeMillis() - 12 * 3600 * 1000));

            // 會返回時間大於等於查詢時間的第一個偏移量
            Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampsToSearch);
            OffsetAndTimestamp offsetTimestamp = null;
            // 用for 輪詢,當然由於本例是查詢的一個分割槽,因此也可以用if 處理
            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
                // 若查詢時間大於時間戳索引檔案中最大記錄索引時間,
                // 此時value 為空,即待查詢時間點之後沒有新訊息生成
                offsetTimestamp = entry.getValue();
                if (null != offsetTimestamp) {
                    // 重置消費起始偏移量
                    consumer.seek(topicPartition, entry.getValue().offset());
                }
            }
            while (true) {
                //4.輪詢拉取訊息
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());

                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //關閉連線
            consumer.close();
        }
    }
}

由於叢集環境已選型為kafka0.10.0.1,本次無法按指定時間戳拉取,報錯資訊為不支援當前broker版本。

速度控制:

   應用場景中我們可能需要暫停某些分割槽消費,先消費其他分割槽,當達到某個條件再恢復該分割槽消費。

Kafka提供兩種方法用於速度控制的方法:

     1.pause(Collection<TopicPartition> partitions):暫停某些分割槽在拉取操作時返回資料給客戶端

//無返回值
consumer.pause(Collections.singleton(topicPartition));

     2.resume(Collection<TopicPartition> partitions):恢復某些分割槽向客戶端返回資料

//無返回值
consumer.resume(Collections.singleton(topicPartition));