1. 程式人生 > 實用技巧 >kafka的客戶端操作

kafka的客戶端操作

一:Consumer API

1.自動提交程式

  這種不建議在實際中使用

    /**
     * 簡單的消費kafka訊息,自動提交
     * 消費過的資料再消費不到了
     */
    public static void helloConsumer() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        // 訂閱
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
        }
    }

  

2.手動提交offset

  如果事務失敗了,麼有提交,下次還能繼續獲取到資料

 /**
     * 手動提交
     */
    public static void commitedOffset() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        // 訂閱
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
            // 手動提交,for迴圈成功才執行;不然不執行,在下一次還會再拉取資料
            consumer.commitAsync();
        }
    }

  

3.ConsumerGroup

  單個分割槽的訊息只能有ConsumerGroup中的某個Consumer消費

  Consumer從partition中的消費是順序,預設從頭開始

  單個ConsumerGroup會消費所有partition中的訊息

4.特性

  

5.按照patition維度進行處理

/**
     * 按照patition維度進行處理
     */
    public static void commitedOffsetWithPartition() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        // 訂閱
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecords) {

                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }

  

6.只消費某個partition

    /**
     * 訂閱topic下的partition中的內容
     *
     */
    public static void commitedOffsetWithTopicPartition() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);

        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);


        // 訂閱partition
        consumer.assign(Arrays.asList(p1));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecords) {

                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }

  

二:Consumer API的多執行緒處理

1.第一種方式

  

2.程式

3.第二種方式

  這種方式,是沒有辦法提交offset的,只是為了快速消費資料

  

4.程式

三:一些其他的特性

1.手動控制offset

/**
     * 手動指定offset的起始位置,手動提交offset
     *
     * 手動指定offset起始位置
     *  1、人為控制offset起始位置
     *  2、如果出現程式錯誤,重複消費一次
     *
     * 步驟
     *   1、第一次從0消費【一般情況】
     *   2、比如一次消費了100條, offset置為101並且存入Redis
     *   3、每次poll之前,從redis中獲取最新的offset位置
     *   4、每次從這個位置開始消費
     *
     * 建議
     *   1.使用redis進行儲存
     */

    public static void controllerOffset() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);

        // 訂閱partition
        consumer.assign(Arrays.asList(p0));
        while (true) {
            // 設定offset
            consumer.seek(p0, 5);
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecords) {

                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }

  

2.限流

/**
     * 限流
     */
    public static void controllerLimit() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);

        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

        long totalNum = 100;

        // 訂閱partition
        consumer.assign(Arrays.asList(p0, p1));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                long num = 0;
                for (ConsumerRecord<String, String> record : pRecords) {
                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                     /*
                        1、接收到record資訊以後,去令牌桶中拿取令牌
                        2、如果獲取到令牌,則繼續業務處理
                        3、如果獲取不到令牌, 則pause等待令牌
                        4、當令牌桶中的令牌足夠, 則將consumer置為resume狀態
                     */
                    num++;
                    if(record.partition() == 0){
                        if(num >= totalNum){
                            consumer.pause(Arrays.asList(p0));
                        }
                    }

                    if(record.partition() == 1){
                        if(num == 40){
                            consumer.resume(Arrays.asList(p0));
                        }
                    }

                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }