1. 程式人生 > 其它 >javaclient操作kafka&springboot整合kafka&kafka分割槽

javaclient操作kafka&springboot整合kafka&kafka分割槽

1. javaclient 測試kafka

1. 配置kafka 允許遠端推送

修改config/Kraft/server.properties 檔案,,將地址變為伺服器公網IP地址。

advertised.listeners=PLAINTEXT://localhost:9092

然後重啟

2. 測試AdminClient 對topic等元資料的管理

測試類以及結果:

package cn.qz.cloud.kafka.client;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * 對Topic的CRUD
 */
@Slf4j
public class KafkaAdminTest {

    public static Properties props = new Properties();

    static {
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER);
        props.put("request.timeout.ms", 60000);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        createTopic();
        describeTopic();
    }

    public static void createTopic() throws ExecutionException, InterruptedException {
        String topicName = KafkaConstants.TOPIC_NAME;
        try (AdminClient adminClient = AdminClient.create(props)) {
            /**
             * 2 代表分割槽
             * 1 代表副本
             */
            NewTopic newTopic = new NewTopic(topicName, 2, (short) 1);
            CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(newTopic));
            log.info("{}", topics.all().get());
        }
    }

    public static void listTopic() throws ExecutionException, InterruptedException {
        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        listTopicsOptions.listInternal(true);
        try (AdminClient adminClient = AdminClient.create(props)) {
            ListTopicsResult listTopicsResult = adminClient.listTopics(listTopicsOptions);
            Collection<TopicListing> topicListings = listTopicsResult.listings().get();
            log.info("{}", topicListings);
            /**
             * [(name=quickstart-events, topicId=rPIXse70QvK3Rri24a-bNg, internal=false), (name=myTopic1, topicId=E6i1TbWXTz-11yKI207ZLA, internal=false), (name=__consumer_offsets, topicId=38T6UsJSRn2BL6tnfj5Wfg, internal=true)]
             */
        }
    }

    public static void deleteTopic() throws ExecutionException, InterruptedException {
        String topicName = KafkaConstants.TOPIC_NAME;
        try (AdminClient adminClient = AdminClient.create(props)) {
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Sets.newHashSet(topicName));
            log.info("{}", deleteTopicsResult);
        }
    }

    public static void describeTopic() throws ExecutionException, InterruptedException {
        String topicName = KafkaConstants.TOPIC_NAME;
        try (AdminClient adminClient = AdminClient.create(props)) {
            DescribeTopicsResult topicsResult = adminClient.describeTopics(Arrays.asList(topicName));
            Map<String, TopicDescription> topicDescription = topicsResult.all().get();
            log.info("{}", topicDescription);
            /**
             * {myTopic1=(name=myTopic1, internal=false, partitions=(partition=0, leader=x.x.x.x:9092 (id: 1 rack: null), replicas=x.x.x.x:9092 (id: 1 rack: null), isr=x.x.x.x:9092 (id: 1 rack: null)),(partition=1, leader=x.x.x.x:9092 (id: 1 rack: null), replicas=x.x.x.x:9092 (id: 1 rack: null), isr=x.x.x.x:9092 (id: 1 rack: null)), authorizedOperations=null)}
             */
        }
    }
}

3. 訊息生產者

下面重新建立myTopic1。 設定分割槽位6,副本為1。啟動一個消費者進行監聽測試:

bin/kafka-console-consumer.sh --topic myTopic1 --from-beginning --bootstrap-server localhost:9092

1. ProducerRecord 介紹

向topic 傳送訊息的時候是傳送這麼一條訊息。原始碼如下:

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

    /**
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     * 
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
     *                  the timestamp using System.currentTimeMillis().
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers the headers that will be included in the record
     */
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }

​ 可以看到可以指定partition、key、value、headers,其中只有topic和value是必須的。其邏輯如下:

  1. 若指定Partition ID,則PR被髮送至指定Partition
  2. 若未指定Partition ID,但指定了Key, PR會按照hasy(key)傳送至對應Partition
  3. 若既未指定Partition ID也沒指定Key,PR會按照round-robin模式傳送到每個Partition
  4. 若同時指定了Partition ID和Key, PR只會傳送到指定的Partition (Key不起作用,程式碼邏輯決定)

比如傳送一條訊息如下:

Header header = new RecordHeader("testHeader", "testHeaderValue".getBytes());
                ProducerRecord producerRecord = new ProducerRecord(topic, null, null, "TEST_KEY", msg, Sets.newHashSet(header));

消費者收到的訊息如下:(也就是消費者可以拿到header的訊息)

topic: myTopic1, partition: 2, offset: 0, key: TEST_KEY, value: testMsg
key: testHeader, value: testHeaderValue

下面傳送的訊息以及消費者都簡單的傳送字串訊息,不指定key、也不指定partition、也不指定header。

2. 傳送訊息

下面程式碼演示了同步傳送、非同步傳送、基於冪等傳送、以及基於事務的傳送訊息。

package cn.qz.cloud.kafka.client;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

@Slf4j
public class Producer {

    private Properties properties = new Properties();

    private KafkaProducer kafkaProducer;

    public Producer() {
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER);
        /**
         * client 的作用是
         */
//        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "client1");
        /**
         * 序列化方法
         */
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); // DEFAULT 16384 = 16K
        /**
         * acks=0 訊息傳送出去,不管資料是否從Partition Leader上寫到磁碟是否成功,直接認為訊息傳送成功。
         * acks = 1 Partition Leader接收到訊息並寫入本地磁碟,就認為訊息傳送成功,不管其他的Follower有沒有同步訊息
         * acks=all Partition Leader接收到訊息之後,必須確認ISR列表裡跟Leader保持同步的Follower列表集合都要同步此訊息後,客戶端才認為訊息傳送成功
         */
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); // default 1
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000"); // DEFAULT 3000 ms = 3 s
        // 更多預設值參考: CommonClientConfigs
    }

    /**
     * 簡單的傳送訊息
     */
    public void produce(SendTypeEnum sendTypeEnum, String msg) {
        String topic = KafkaConstants.TOPIC_NAME;
        try {
            kafkaProducer = new KafkaProducer(properties);
            long startTime = System.currentTimeMillis();
            // 非同步
            if (sendTypeEnum == SendTypeEnum.ASYNC) {
                kafkaProducer.send(new ProducerRecord(topic, msg), new ProducerCallBack(startTime, msg));
            }
            // 發出去不關心結果
            // 方法返回的是一個Future 物件,不呼叫get 則不會阻塞
            if (SendTypeEnum.WITHOUT_RESULT == sendTypeEnum) {
                kafkaProducer.send(new ProducerRecord(topic, msg));
            }
            // 同步:org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord<K,V>)
            // 方法返回的是一個Future 物件,呼叫get 則是阻塞等待結果
            if (SendTypeEnum.SYNC_WITH_RESULT == sendTypeEnum) {
                RecordMetadata rm = (RecordMetadata) kafkaProducer.send(new ProducerRecord(topic, msg)).get();
                log.info("rm: {}", ToStringBuilder.reflectionToString(rm, ToStringStyle.NO_CLASS_NAME_STYLE));
            }
        } catch (Exception e) {
            log.error("produce error", e);
        } finally {
            kafkaProducer.close();
        }
    }

    /**
     * 開啟冪等性
     *
     * @param msg
     */
    public void produceIdempotence(String msg) {
        // 設定冪等之後,重試次數將變為Integer.MAX_VALUE  次, 且acks 被設為all
        /**
         * Producer ID(即PID)和Sequence Number
         * PID。每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對使用者是不可見的。
         * Sequence Numbler。(對於每個PID,該Producer傳送資料的每個<Topic, Partition>都對應一個從0開始單調遞增的Sequence Number。Broker端在快取中儲存了這seq number,對於接收的每條訊息,如果其序號比Broker快取中序號大於1則接受它,否則將其丟棄。這樣就可以實現了訊息重複提交了。
         * 它只能保證單分割槽上的冪等性,即一個冪等性Producer 能夠保證某個主題的一個分割槽上不出現重複訊息,它無法實現多個分割槽的冪等性。其次,它只能實現單會話上的冪等性,不能實現跨會話的冪等性。
         */
        properties.put("enable.idempotence", "true");//開啟冪等性
        try {
            kafkaProducer = new KafkaProducer(properties);
            long startTime = System.currentTimeMillis();
            kafkaProducer.send(new ProducerRecord(KafkaConstants.TOPIC_NAME, msg, msg), new ProducerCallBack(startTime, msg));
        } catch (Exception e) {
            log.error("", e);
        } finally {
            kafkaProducer.close();
        }
    }

    /**
     * 開啟事務
     * 事務是基於PID。
     * transactional.id與producerId在事務管理器中是一一對應關係,即transactional.id作為key,producerId作為value這樣的鍵值對方式儲存在事務管理器中。
     * 當producer恢復時,會通過使用者自己指定的transactional.id從事務管理器獲取producerId,以此來確保冪等性不同會話之間傳送資料的冪等性。
     */
    public void produceInTransaction() {
        properties.put("transactional.id", "myTx");
        kafkaProducer = new KafkaProducer(properties);
        kafkaProducer.initTransactions();
        try {
            long startTime = System.currentTimeMillis();
            try {
                kafkaProducer.beginTransaction();
                for (int i = 0; i < 100; i++) {
                    String messageStr = "message_" + i;
                    if (i == 99) {
                        throw new RuntimeException("XXX");
                    }
                    kafkaProducer.send(new ProducerRecord(KafkaConstants.TOPIC_NAME, messageStr, messageStr),
                            new ProducerCallBack(startTime, messageStr));
                }
                kafkaProducer.commitTransaction();
            } catch (ProducerFencedException e) {
                kafkaProducer.close();
                log.error("", e);
            } catch (OutOfOrderSequenceException e) {
                kafkaProducer.close();
                log.error("", e);
            } catch (Exception e) {
                kafkaProducer.abortTransaction();
                log.warn("", e);
            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            kafkaProducer.close();
        }
    }

    @Slf4j
    private static class ProducerCallBack implements Callback {

        private final long startTime;

        private final String message;

        public ProducerCallBack(long startTime, String message) {
            this.startTime = startTime;
            this.message = message;
        }

        /**
         * 收到Kafka服務端發來的Ack確認訊息後,會呼叫此函式
         *
         * @param metadata 生產者傳送訊息的元資料,如果傳送過程出現異常,此引數為null
         * @param e        傳送過程出現的異常,如果傳送成功此引數為空
         */
        public void onCompletion(RecordMetadata metadata, Exception e) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                log.info("send success! partition:{}, offset:{}, messgage:{}, elapsedTimeMs:{}", metadata.partition(), metadata.offset(), message, elapsedTime);
            } else {
                log.error("", e);
            }
        }
    }

    public enum SendTypeEnum {

        /**
         * Async
         */
        ASYNC,

        /**
         * 不關注結果,發出去就行
         */
        WITHOUT_RESULT,

        /**
         * 同步傳送
         */
        SYNC_WITH_RESULT;
    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        for (int i = 0; i < 10; i++) {
            producer.produce(SendTypeEnum.ASYNC, "testMsg" + i);
        }
    }
}

4. 訊息消費者

​ 訊息有手動提交和非同步提交。手動提交需要自己commit然後來記錄便宜量,非同步提交不需要自己提交offset。

1. 自動提交:

package cn.qz.cloud.kafka.client;

import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

@Slf4j
public class Consumer {

    private static Properties properties = new Properties();

    static {
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER); //required
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.Concumer.GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//default 300000
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");//default 500
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 設定是否自動提交,設為true之後偏移量會自動記錄,不需要自己ack
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "4194304"); // 服務端允許的最大訊息大小為4MB。
    }

    private KafkaConsumer kafkaConsumer;

    public void consume() {
        kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(KafkaConstants.TOPIC_NAME), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println(1);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println(2);
            }
        });

        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(20));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    /**
                     * 如果生產者傳送了訊息header,消費者可以獲取到
                     */
                    Headers headers = record.headers();
                    if (CollectionUtil.isNotEmpty(headers)) {
                        headers.forEach(h -> {
                            log.info("key: {}, value: {}", h.key(), new String(h.value()));
                        });
                    }
                }
            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            kafkaConsumer.close();
        }

    }

    public static void main(String[] args) throws Exception {
        Consumer consumerDemo = new Consumer();
        consumerDemo.consume();
    }

}

2. 手動提交

package cn.qz.cloud.kafka.client;

import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

@Slf4j
public class Consumer {

    private static Properties properties = new Properties();

    static {
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER); //required
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.Concumer.GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//default 300000
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");//default 500
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 設定是否自動提交,設為true之後偏移量會自動記錄,不需要自己ack
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "4194304"); // 服務端允許的最大訊息大小為4MB。
    }

    private KafkaConsumer kafkaConsumer;

    public void consume() {
        kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(KafkaConstants.TOPIC_NAME), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println(1);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println(2);
            }
        });

        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(20));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    /**
                     * 如果生產者傳送了訊息header,消費者可以獲取到
                     */
                    Headers headers = record.headers();
                    if (CollectionUtil.isNotEmpty(headers)) {
                        headers.forEach(h -> {
                            log.info("key: {}, value: {}", h.key(), new String(h.value()));
                        });
                    }
                }
                // 提交offset
                kafkaConsumer.commitAsync();
            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            kafkaConsumer.close();
        }

    }

    public static void main(String[] args) throws Exception {
        Consumer consumerDemo = new Consumer();
        consumerDemo.consume();
    }

}

2. springboot 專案測試kafka

  1. pom配置引入kafka
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
  1. 新增kafka相關配置
server:
  port: 8080

spring:
  #kafka配置
  kafka:
    #這裡改為你的kafka伺服器ip和埠號
    bootstrap-servers: xxx:9092
    #=============== producer  =======================
    producer:
      #如果該值大於零時,表示啟用重試失敗的傳送次數
      retries: 0
      #每當多個記錄被髮送到同一分割槽時,生產者將嘗試將記錄一起批量處理為更少的請求,預設值為16384(單位位元組)
      batch-size: 16384
      #生產者可用於緩衝等待發送到伺服器的記錄的記憶體總位元組數,預設值為3355443
      buffer-memory: 33554432
      #key的Serializer類,實現類實現了介面org.apache.kafka.common.serialization.Serializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value的Serializer類,實現類實現了介面org.apache.kafka.common.serialization.Serializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #=============== consumer  =======================
    consumer:
      #用於標識此使用者所屬的使用者組的唯一字串
      group-id: test-consumer-group
      #當Kafka中沒有初始偏移量或者伺服器上不再存在當前偏移量時該怎麼辦,預設值為latest,表示自動將偏移重置為最新的偏移量
      #可選的值為latest, earliest, none
      auto-offset-reset: earliest
      #消費者的偏移量將在後臺定期提交,預設值為true
      enable-auto-commit: true
      #如果'enable-auto-commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),預設值為5000。
      auto-commit-interval: 100
      #金鑰的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.Deserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #值的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.Deserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  1. 增加類:生產者、消費者
package cn.qz.cloud.kafka.springboot.springboot;

import cn.qz.cloud.kafka.client.KafkaConstants;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

@Configuration
public class kafkaConfig {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @PostConstruct
    public void init() {
        /**
         * init topic
         */
        AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfig());
        adminClient.deleteTopics(Lists.newArrayList(KafkaConstants.TOPIC_NAME));
        List<NewTopic> topics = new ArrayList<>();
        topics.add(new NewTopic(KafkaConstants.TOPIC_NAME, 3, (short) 1));
        adminClient.createTopics(topics);
        System.out.println("建立topic成功");
    }
}
===
  
package cn.qz.cloud.kafka.springboot.springboot;

import cn.qz.cloud.kafka.client.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping
public class Producer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/index")
    public String index() {
        return "index";
    }

    @GetMapping("/send-msg")
    public String send(@RequestParam String msg) {
        //生產訊息
        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(KafkaConstants.TOPIC_NAME, msg, msg);
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                System.out.println(stringObjectSendResult);
            }
        });
        return msg;
    }

}

===
package cn.qz.cloud.kafka.springboot.springboot;

import cn.qz.cloud.kafka.client.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    /**
     * org.springframework.kafka.annotation.KafkaListener 可以指定分割槽,指定groupId 等引數
     *
     * @param record
     */
    @KafkaListener(topics = {KafkaConstants.TOPIC_NAME})
    public void handMessage(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String msg = record.value();
        System.out.println("消費者接受訊息:topic-->" + topic + ",msg->>" + msg);
    }
}  

關於配置參考:

org.springframework.boot.autoconfigure.kafka.KafkaProperties

3. 關於kafka 的分割槽

1. Kafka 的分割槽數量可以修改:

[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-topics.sh --describe --topic myTopic1 --bootstrap-server localhost:9092
Topic: myTopic1	TopicId: 9LsqbI1dRVelPxx-3FJ9lw	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: myTopic1	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-topics.sh --alter --topic myTopic1 --bootstrap-server localhost:9092 --partitions 12
[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-topics.sh --describe --topic myTopic1 --bootstrap-server localhost:9092
Topic: myTopic1	TopicId: 9LsqbI1dRVelPxx-3FJ9lw	PartitionCount: 12	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: myTopic1	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 3	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 4	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 5	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 6	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 7	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 8	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 9	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 10	Leader: 1	Replicas: 1	Isr: 1
	Topic: myTopic1	Partition: 11	Leader: 1	Replicas: 1	Isr: 1

2. kafka 的分割槽策略如下

如果是kafka-client,取分割槽的預設實現是:org.apache.kafka.clients.producer.internals.DefaultPartitioner

package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public DefaultPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    public void close() {
    }
}

這裡可以看到如果有key,會將key進行計算得到值,然後轉為整數,和分割槽數量取模做運算;如果沒傳,類似輪詢的方式傳送。

呼叫分割槽是在:

org.apache.kafka.clients.producer.KafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>)
->
org.apache.kafka.clients.producer.KafkaProducer#doSend
->
org.apache.kafka.clients.producer.KafkaProducer#partition 原始碼如下:
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

3. 自定義自己的分割槽策略

  1. 新建實現類:一直送到分割槽0
package cn.qz.cloud.kafka.client;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
  1. 生產者配置指定分割槽策略
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.qz.cloud.kafka.client.CustomPartitioner");

4. 和ES分片區別

ES不能修改分片原因:https://blog.csdn.net/w1014074794/article/details/119802550

1.kafka很容易的通過管理工具增加新的分割槽,這種方式只會對指定了key的訊息產生影響,但是這種影響其實不大,因為消費者其實還是能消費到全部的訊息
2.相比較之下es不支援增加分割槽,原因在於es的查詢流程中:query phase–fetch phase,fetch phase的情況下是根據id去獲取文件的,如果此時分割槽數變化了,那麼就會有很多id獲取不到文件資料,而其實這個文件資料是存在於es的另外的分片中的,所以es並不支援線上增加分割槽

解釋:

1.ES你先當它是個資料庫,然後,你設想一種場景,你程式裡自定義分庫分表規則,按uid分片,uid尾號為0的在0號庫,尾號1的在1號庫,以此類推,你一共分了10個庫。

OK,現在要加第11個庫,從改了規則那一刻,就需要有資料遷移,資料遷移的過程,你如果要做到平滑,人為完成都非常麻煩。

  1. Kafka本身就要是訂閱某個主題,然後會有一個group cordinator來分配機器A消費分割槽1,機器B消費分割槽2

本身就是按分割槽來消費的,無論擴縮容,就不存在問題。