1. 程式人生 > 其它 >kafka-08-SpringBoot Kafka實戰

kafka-08-SpringBoot Kafka實戰

技術標籤:004-kafkakafka

注意:
當前的 Kafka 版本無法保證每個訊息“只被儲存一次”。現實中的很多應用程式在訊息里加入唯一識別符號,用於檢測重複訊息,消費者在讀取訊息時可以對它們進行清理。應用程式需要可以做到訊息的“冪等”,也就是說,即使出現了重複訊息,也不會對處理結果的正確性造成負面影響。

整合SpringBoot kafka,加入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

1. 新建或者更新主題

KafkaConfig.java

/**
 * Description: kafka相關配置
 *
 * @author Xander
 * datetime: 2021-01-08 17:40
 */
@Configuration
public class KafkaConfig {
    /**
     * 主題
     */
    public static final String TOPIC_SPRING_KAFKA = "SpringKafka";

    /**
     * 新建或者更新Topic並設定分割槽數為3,分割槽副本數為1,
     * 這裡設定僅僅時測試使用,主題的分割槽數和每個分割槽的副本數,需要根據業務場景和硬體條件去考慮
     * <p>
     * 我們也可以不手動建立topic,因為kafka server.properties 配置檔案中 auto.create.topics.enable 預設為 true,
     * 表示如果主題不存在,則自動建立主題,
     * 分割槽數量由kafka server.properties 配置檔案中 num.partitions 指定,預設是 1
     * 所以如果是自動建立主題,則預設的分割槽數為1,分割槽副本數為1
     *
     * @return
     */
    @Bean
    public NewTopic newOrUpdateTopic() {
        // 通過TopicBuilder新建或者update Topic,
        // 注意:主題的分割槽只能新增,不能減少分割槽
        return TopicBuilder.name(TOPIC_SPRING_KAFKA).replicas(1).partitions(3).build();
    }

}

我們也可以不手動建立主題,因為kafka server.properties 配置檔案中auto.create.topics.enable預設為 true,表示如果主題不存在,則自動建立主題,分割槽數量由kafka server.properties 配置檔案中num.partitions指定,預設是 1,所以如果這裡不手動建立主題的話,kafka如果檢查到主題不存在,會自動新建分割槽數和副本數都為1的主題。

注意:如果主題已存在,NewTopic如果要update已存在的主題,分割槽數只能大於等於已有的分割槽數,不能減少分割槽。

2. SpringBoot kafka 配置

常用的生產者和消費者相關的配置都列出來,並表明了註釋。

application.yml

spring:
  kafka:
#    kafka叢集broker列表 host1:por1,host2:port2,host3:port3
    bootstrap-servers: docker01:9092

########生產者配置########
    producer:
#     compression-type 訊息的壓縮演算法
#     預設情況下是 none,訊息傳送時不會被壓縮。 該引數可以設定為 none, gzip, snappy, lz4, zstd
      compression-type: none
#     acks 有多少個分割槽副本收到訊息,生產者才會認為訊息寫入是成功的,只能選(0、1、all)
      acks: all
#     bufferMemory 生產者記憶體緩衝區的大小,下面是32MB
      bufferMemory: 33554432
#     retries 發生臨時性的錯誤(比如分割槽找不到首領)重試次數,
#      預設情況下,生產者會在每次重試之間等待 100 ms,可以通過 retry.backoff.ms 引數來改變這個時間間隔
      retries: 3
#      key和value 的序列化器,這兩個預設是 StringSerializer.class
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#      batch-size 批次大小,按照位元組數計算
      batch-size: 1024
      properties:
#        自定義分割槽器
#        partitioner:
#          class: com.xander.kafka.partitioner.XdPartitioner
#        request.timeout.ms 在傳送資料時等待伺服器返回響應的時間
        request:
          timeout:
            ms: 1000
#        傳送批次之前等待更多訊息加入批次的時間
#        linger.ms為0,表示生產者每條訊息都直接提交給kafka,不等待批次,這時候batch-size其實就沒用了
        linger:
          ms: 100
#      retry.backoff.ms  每次重試之間的時間間隔,預設是100ms,這裡配置50ms
        retry:
          backoff:
            ms: 50
#   max.in.flight.requests.per.connection 在收到伺服器響應之前可以傳送多少個訊息,如果不需要保證訊息順序性的場景,建議不用配置該屬性
#  把它設為 1 可以保證訊息在同一個生產者的某一個分割槽上,是按照發送的順序寫入伺服器的,即使發生了重試。但是會降低Kafka的吞吐量
        max:
          in:
            flight:
              requests:
                per:
                  connection: 1
#  max.block.ms 緩衝區滿時的最大阻塞時間,在阻塞時間達到 max.block.ms 時,生產者會丟擲超時異常。
          block:
            ms: 200

########### 消費者配置 ###############
    consumer:
#  auto-offset-reset: 沒有偏移量的分割槽或者偏移量無效時如何處理
# earliest: 消費者將從起始位置讀取分割槽的記錄
# latest: 消費者將從最新的記錄開始讀取資料
# none:只要有一個分割槽不存在已提交的offset,就丟擲異常;
      auto-offset-reset: earliest
# group-id 預設的消費者群組
      group-id: defaultGroup
# enable.auto.commit 是否自動提交偏移量,
      enableAutoCommit: true
#      自動提交偏移量的間隔時間,100ms
      autoCommitInterval: 100ms
#  單次請求能夠返回的記錄數量
      max-poll-records: 3
#  fetch.max.wait.ms 指定獲取記錄的最大等待時間,這裡是100ms
      fetchMaxWait: 100ms
#      key和value 的反序列化器,這兩個預設是 StringSerializer.class
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
#        fetch.min.bytes 從伺服器獲取記錄的最小位元組數
        fetch:
          min:
            bytes: 102400
#        request.timeout.ms 消費者請求超時時間
        request:
          timeout:
            ms: 1000
#        會話過期時間
        session:
          timeout:
            ms: 120000
#       向協調器傳送心跳的頻率
        heartbeat:
          interval:
            ms: 40000
# 如果需要批量消費,則需要修改 spring.kafka.listener.type = batch,預設是 single,單次消費單條訊息
#    listener:
#      type: batch
#   手動提交偏移量時:消費者訊息確認模式改為手動確認
#    listener:
#      ack-mode: manual

3. 生產者向kafka寫資料

3.1 傳送並忽略結果

我們把訊息傳送給伺服器,但並不關心它是否正常到達。大多數情況下,訊息會正常到達,因為 Kafka 是高可用的,而且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些訊息。因為我們會忽略返回值,所以無法知道訊息是否傳送成功。
如果允許丟失一小部分訊息,並且不關心傳送結果,那麼可以使用這種傳送方式。這種方式可以達到最大的響應速度和吞吐效能。

3.2 同步傳送

返回一個 Future 物件,然後呼叫 Future 物件的get()方法等待 Kafka 響應。如果伺服器返回錯誤, get() 方法會丟擲異常。如果沒有發生錯誤,我們會得到一個 RecordMetadata 物件,可以用它獲取訊息的主題、分割槽和偏移量等資訊。

3.3 非同步傳送

在非同步傳送訊息方式中生產者提供了回撥支援,可以在回撥中處理異常和獲取訊息的主題、分割槽和偏移量等資訊

Kafka生產者

/**
 * Description: Kafka生產者
 *
 * @author Xander
 * datetime: 2021-01-10 10:29
 */
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // 傳送訊息
    @GetMapping("/{msg}")
    public void send(@PathVariable String msg) throws ExecutionException, InterruptedException {
        long start = Instant.now().toEpochMilli();
        this.logger.info("------start");
        // 傳送並忽略結果
        // this.sendAndForget(msg);
        // 同步傳送
        this.sendSync(msg);
        // 非同步傳送
        // this.sendAsync(msg);
        this.logger.info("------end: " + (Instant.now().toEpochMilli() - start));
    }

    /**
     * 傳送並忽略結果
     *
     * @param msg
     */
    private void sendAndForget(String msg) {
        kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg);
    }

    /**
     * 同步傳送
     *
     * @param msg
     */
    private void sendSync(String msg) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg);
        SendResult<String, String> sendResult = future.get();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        this.logger.info("傳送成功:" + recordMetadata.topic() + "--" + recordMetadata.partition() + "---" + recordMetadata.offset());
    }

    /**
     * 非同步傳送
     *
     * @param msg
     */
    private void sendAsync(String msg) {
        kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {

            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata recordMetadata = sendResult.getRecordMetadata();
                logger.info("傳送成功:" + recordMetadata.topic() + "--" + recordMetadata.partition() + "---" + recordMetadata.offset());
            }
        });
    }
}

3.1.2 傳送成功

傳送下面的請求:
http://localhost:8080/kafka/123

kafka-console-consumer.sh工具訂閱 SpringKafka 主題,可以看到訊息 '123' 傳送成功

[[email protected] ~]# /usr/local/kafka_2.13-2.6.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.31:9092 --topic SpringKafka
123

4. 消費者從kafka讀資料

**說明:**這裡只演示每次消費單條記錄的案例,如果要批量消費記錄,需要修改 spring.kafka.listener.type = batch,預設是 single (單次消費單條訊息)。
批量消費,請參考 Springboot kafka參考文件:https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation

Springboot kafka參考文件的批量消費舉例

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

4.1 自動提交偏移量

如果消費者屬性enable.auto.commit被設為 true ,那麼每過auto.commit.interval.ms(提交時間間隔,預設值是 5s ),消費者會自動把上一次輪詢接收到的最大偏移量提交上去。
自動提交是在輪詢裡進行的,消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,那麼就會提交從上一次輪詢返回的偏移量。

自動提交會可能會導致訊息重複消費
假設我們仍然使用預設的 5s 提交時間間隔,在最近一次提交之後的 3s 發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取訊息。這個時候偏移量已經落後了 3s ,所以在這 3s 內到達的訊息會被重複處理。

每次消費單個記錄,並在輪詢中自動提交偏移量

/**
 * Description: Kafka消費者
 *
 * @author Xander
 * datetime: 2021-01-10 10:32
 */
@Component
public class KafkaConsumer {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 每次消費單個記錄
     *
     * @param record
     */
    @KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA})
    public void onListen(ConsumerRecord<?, ?> record) {
        this.logger.info("消費單個記錄----- 主題:" + record.topic() + "-分割槽:" + record.partition() + "-key:" + record.key()
                + "-value:" + record.value() + "-偏移量:" + record.offset());
    }


}

4.2 手動提交偏移量

消費者也可以手動提交偏移量,在每處理成功一條訊息後就手動提交一次偏移,這能夠保證已經處理的訊息都被準確的提交。但是在前面說過,當前的 Kafka 版本無法保證每個訊息“只被儲存一次”,例如:當生產者傳送訊息到broker,broker傳送響應的時候,因為網路關係,生產者沒有接收到正確的響應,這時候,會發生重試,再次傳送訊息,這時,就可能產生重複的訊息。

防止訊息重複消費:
建議在生產中對訊息新增唯一標識,在消費者消費訊息的時候,對唯一標識進行判斷,是否已經消費了該訊息,如果已經消費過,則不做任何處理,從而達到防止訊息重複消費的目的。

手動提交偏移量,需要配置 enable.auto.commit = false 取消自動提交,並且 spring.kafka.listener.ack-mode = manual 消費者訊息確認模式改為手動確認

/**
 * Description: Kafka消費者,手動提交偏移量,
 * 需要配置 enable.auto.commit = false 取消自動提交,並且 spring.kafka.listener.ack-mode = manual 消費者訊息確認模式改為手動確認
 *
 * 提示:手動提交偏移量,能夠最大程度減少重複消費訊息,但是在訊息未處理完成,提前提交偏移量,也可能導致訊息丟失
 * 關於提交偏移量,請參考下面文章的第6節
 * [CSDN同步:kafka-05-消費者] https://blog.csdn.net/qq_20633779/article/details/112335534
 *
 * @author Xander
 * datetime: 2021-01-10 10:32
 */
@Component
public class KafkaConsumerWithAck {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 每次消費單個記錄,並且手動提交偏移量
     *
     *
     * @param record
     */
    @KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA})
    public void onListenWithAck(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException {
        this.logger.info("消費單個記錄----- 主題:" + record.topic() + "-分割槽:" + record.partition() + "-key:" + record.key()
                + "-value:" + record.value() + "-偏移量:" + record.offset());
        //模擬業務邏輯處理。。。
        this.logger.info("業務處理中...");
        TimeUnit.SECONDS.sleep(10);
        // 手動提交偏移量,表示這個偏移量之前的所有記錄已經被處理
        ack.acknowledge();
    }
}

4.3 模擬MQ的死信佇列

在遇到可重試錯誤時,把錯誤寫入一個獨立的主題, 一個獨立的消費者群組負責從該主題上讀取錯誤訊息,並進行重試,這種模式有點像其他訊息系統裡的dead-letter-queue

/**
 * Description: Kafka消費者: 訊息轉發
 * 在遇到可重試錯誤時,把錯誤寫入一個獨立的主題, 一個獨立的消費者群組負責從該主題上讀取錯誤訊息,並進行重試,這種模式有點像其他訊息系統裡的 `dead-letter-queue`
 *
 * @author Xander
 * datetime: 2021-01-10 10:32
 */
@Component
public class KafkaConsumerSendTo {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 訊息轉發
     *
     * @param record
     */
    @KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA}, groupId = "sendToGroupId")
    @SendTo("test")
    public String onListen(ConsumerRecord<String, String> record) {
        this.logger.info("轉發訊息到test主題 ----- 主題:" + record.topic() + "-分割槽:" + record.partition() + "-key:" + record.key()
                + "-value:" + record.value() + "-偏移量:" + record.offset());
        // return的資料就是轉發到 test 主題的訊息
        return record.value();
    }


}

5. 自定義分割槽器

傳送訊息時候,kafkaTemplate會通過傳入的 主題topic、分割槽partition、鍵key、值value,其中分割槽partition和鍵key是可選的,建立一個ProducerRecord物件。

  • 如果在ProducerRecord物件裡指定了分割槽,那麼分割槽器就不會再做任何事情,直接把指定的分割槽返回。

  • 如果沒有指定分割槽 ,那麼分割槽器會根據 key 來選擇一個分割槽 。
    選好分割槽以後 ,生產者就知道該往哪個主題和分割槽傳送這條記錄了。

  • 如果 key 為 null , 並且使用了預設的分割槽器,那麼記錄將被隨機地傳送到主題內各個可用的分割槽上。分割槽器使用輪詢(Round Robin )演算法將訊息均衡地分佈到各個分割槽上。

  • 如果鍵不為空,並且使用了預設的分割槽器,那麼 Kafka 會對鍵進行雜湊,然後根據雜湊值把訊息對映到特定的分割槽上。這裡的關鍵之處在於 ,同一個鍵總是被對映到同一個分割槽上 ,所以在進行對映時,我們會使用主題所有的分割槽,而不僅僅是可用的分割槽 。這也意味著,如果寫入資料的分割槽是不可用的,那麼就會發生錯誤。但這種情況很少發生。

上面說的是預設的分割槽器,我們也可以根據業務場景自定義分割槽器。
新建一個 org.apache.kafka.clients.producer.Partitioner 介面的實現類 com.xander.kafka.partitioner.XdPartitioner,然後配置
spring.kafka.producer.properties.partitioner.class= com.xander.kafka.partitioner.XdPartitioner

自定義Kafka分割槽器,每條訊息都發送到分割槽0

/**
 * Description: 自定義Kafka分割槽器,每條訊息都發送到分割槽0
 *
 * @author Xander
 * datetime: 2021-01-13 19:41
 */
public class XdPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //這裡可以根據業務場景將訊息路由到不同的分割槽
        // return 0 表示每條訊息都發送到分割槽0
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

依次傳送請求:
http://localhost:8080/kafka/111
http://localhost:8080/kafka/222
http://localhost:8080/kafka/333

通過日誌列印可以看到,傳送的訊息都被路由到分割槽0上了,列印的日誌格式是 “傳送成功:主題--分割槽---偏移量”。

2021-01-13 19:55:27.957  INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController  : ------start
2021-01-13 19:55:28.192  INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController  : 傳送成功:SpringKafka--0---0
2021-01-13 19:55:28.193  INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController  : ------end: 236
2021-01-13 19:55:34.935  INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController  : ------start
2021-01-13 19:55:35.037  INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController  : 傳送成功:SpringKafka--0---1
2021-01-13 19:55:35.038  INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController  : ------end: 103
2021-01-13 19:55:38.163  INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController  : ------start
2021-01-13 19:55:38.268  INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController  : 傳送成功:SpringKafka--0---2
2021-01-13 19:55:38.268  INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController  : ------end: 105

6. 業務場景舉例

一個應用程式在很多情況下需要往 Kafka 寫入訊息 :

  • 記錄使用者的活動(用於審計和分析)、
  • 記錄度量指標、
  • 儲存日誌訊息、
  • 記錄智慧家電的資訊、
  • 與其他應用程式進行非同步通訊、
  • 緩衝即將寫入到資料庫的資料,等等。

多樣的使用場景意味著多樣的需求:

  • 是否每個訊息都很重要?
  • 是否允許丟失一小部分訊息?
  • 偶爾出現重複訊息是否可以接受?
  • 是否有嚴格的延遲和吞吐量要求?

6.1 不允許的訊息丟失或訊息重複,允許一點點的延遲

在信用卡事務處理系統裡,訊息丟失或訊息重複是不允許的,可以接受的延遲最大為 500ms ,對吞吐量要求較高 我們希望每秒鐘可以處理一百萬個訊息。

這種情況下,實現方案:建議生產者端可以使用同步傳送解決訊息丟失問題,同時給訊息新增唯一標識,來解決訊息的重複消費問題。

6.2 允許丟失少量的訊息或出現少量的訊息重複,追求高響應和高吞吐

儲存網站的點選資訊是另一種使用場景。在這個場景裡,允許丟失少量的訊息或出現少量的訊息重複,只要不影響使用者體驗就行,在數以千萬計的點選量中,丟失少量的訊息並不會有什麼影響。

這種情況下,實現方案:建議生產者使用傳送並忘記的方式來發送訊息,如果系統要對傳送失敗的訊息進行處理,則可以使用非同步傳送的方式,在回撥中處理異常,以追求最大的吞吐量。

程式碼:
https://github.com/wengxingxia/kafka-springboot.git

[慕課手記同步:kafka-08-SpringBoot Kafka實戰]https://www.imooc.com/article/314288


歡迎關注文章同步公眾號"黑桃"