1. 程式人生 > 其它 >Springboot整合訊息佇列Kafka

Springboot整合訊息佇列Kafka

技術標籤:微服務kafkaspring bootzookeeper

Springboot整合訊息佇列Kafka

一、Spring-Kafka

Spring 生態中,提供了 Spring-Kafka 專案,讓我們更簡便的使用 Kafka

Spring for Apache Kafka (spring-kafka) 專案將 Spring 核心概念應用於基於 Kafka 的訊息傳遞解決方案的開發。

它提供了一個“模板”作為傳送訊息的高階抽象。
它還通過 @KafkaListener 註解和“偵聽器容器(listener container)”為訊息驅動的 POJO 提供支援。
這些庫促進了依賴注入和宣告的使用。
在所有這些用例中,你將看到 Spring Framework 中的 JMS 支援,以及和 Spring AMQP 中的 RabbitMQ 支援的相似之處。

二、快速入門

先來對 Kafka-Spring 做一個快速入門,實現 Producer 三種傳送訊息的方式的功能,同時建立一個 Consumer 消費訊息。

2.1 引入依賴

<dependencies>
    <
!-- 引入 Spring-Kafka 依賴 --> <!-- 已經內建 kafka-clients 依賴,所以無需重複引入 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.3.RELEASE</version> </dependency> <
!-- 實現對 JSON 的自動化配置 --> <!-- 因為,Kafka 對複雜物件的 Message 序列化時,我們會使用到 JSON --> <!-- 同時,spring-boot-starter-json 引入了 spring-boot-starter ,而 spring-boot-starter 又引入了 spring-boot-autoconfigure 。 spring-boot-autoconfigure 實現了 Spring-Kafka 的自動化配置 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <!-- 方便等會寫單元測試 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>

2.2 應用配置檔案

resources 目錄下,建立 application.yaml 配置檔案。

spring:
  # Kafka 配置項,對應 KafkaProperties 配置類
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以設定多個,以逗號分隔
    # Kafka Producer 配置項
    producer:
      acks: 1 # 0-不應答。1-leader 應答。all-所有 leader 和 follower 應答。
      retries: 3 # 傳送失敗時,重試傳送的次數
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 訊息的 key 的序列化
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 訊息的 value 的序列化
    # Kafka Consumer 配置項
    consumer:
      auto-offset-reset: earliest # 設定消費者分組最初的消費進度為 earliest 。可參考部落格 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # JsonDeserializer 在反序列化訊息時,考慮到安全性,只反序列化成信任的 Message 類
      properties:
        spring:
          json:
            trusted:
              packages: cn.cy.springboot.kafkademo.message
    # Kafka Consumer Listener 監聽器配置
    listener:
      missing-topics-fatal: false # 消費監聽介面監聽的主題不存在時,預設會報錯。所以通過設定為 false ,解決報錯

logging:
  level:
    org:
      springframework:
        kafka: ERROR # spring-kafka INFO 日誌太多了,所以我們限制只打印 ERROR 級別
      apache:
        kafka: ERROR # kafka INFO 日誌太多了,所以我們限制只打印 ERROR 級別

2.3 Application

建立 Application.java 類,配置 @SpringBootApplication 註解即可。

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2.4 Demo01Message

cn.cy.springboot.kafkademo.message 包下,建立 Demo01Message 訊息類,提供給當前示例使用。

@Data
public class Demo01Message {
    public static final String TOPIC = "DEMO_01";
    /**
     * 編號
     */
    private Integer id;
}
  • TOPIC 靜態屬性,我們設定該訊息類對應 Topic 為 "DEMO_01"

2.5 Demo01Producer

cn.cy.springboot.kafkademo.producer 包下,建立 Demo01Producer 類,它會使用 Kafka-Spring 封裝提供的 KafkaTemplate ,實現三種傳送訊息的方式。

@Component
public class Demo01Producer {
    @Resource
    private KafkaTemplate<Object, Object> kafkaTemplate;
    public SendResult syncSend(Integer id) throws ExecutionException, InterruptedException {
        // 建立 Demo01Message 訊息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 同步傳送訊息
        return kafkaTemplate.send(Demo01Message.TOPIC, message).get();
    }
    public ListenableFuture<SendResult<Object, Object>> asyncSend(Integer id) {
        // 建立 Demo01Message 訊息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 非同步傳送訊息
        return kafkaTemplate.send(Demo01Message.TOPIC, message);
    }
}
  • TOPIC 靜態屬性,我們設定該訊息類對應 Topic"DEMO_01"

  • #asyncSend(...) 方法,非同步傳送訊息。在方法內部,會呼叫 KafkaTemplate#send(topic, data) 方法,非同步傳送訊息,返回 Spring ListenableFuture 物件,一個可以通過監聽執行結果的 Future 增強。

  • #syncSend(...) 方法,同步傳送訊息。在方法內部,也是呼叫 KafkaTemplate#send(topic, data) 方法,非同步傳送訊息。不過,因為我們後面呼叫了 ListenableFuture 物件的 #get() 方法,阻塞等待發送結果,從而實現同步的效果。

  • 暫時未提供 oneway 傳送訊息的方式。因為需要配置 Produceracks = 0 ,才可以使用這種傳送方式。當然,實際場景下,基本不會使用 oneway 的方式來發送訊息,所以直接先忽略吧。

  • 在序列化時,我們使用了 JsonSerializer 序列化 Message 訊息物件,它會在 Kafka 訊息 Headers__TypeId__ 上,值為 Message 訊息對應的類全名

  • 在反序列化時,我們使用了 JsonDeserializer 序列化出 Message 訊息物件,它會根據 Kafka 訊息 Headers__TypeId__的值,反序列化訊息內容成該 Message 物件。

2.6 Demo01Consumer

cn.cy.springboot.kafkademo.consumer 包下,建立 Demo01Consumer 類,消費訊息。

@Component
public class Demo01Consumer {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @KafkaListener(topics = Demo01Message.TOPIC,
            groupId = "demo01-consumer-group-" + Demo01Message.TOPIC)
    public void onMessage(Demo01Message message) {
        logger.info("[onMessage][執行緒編號:{} 訊息內容:{}]", Thread.currentThread().getId(), message);
    }
}
  • 在方法上,添加了 @KafkaListener 註解,宣告消費的 Topic"DEMO_01" ,消費者分組是 "demo01-consumer-group-DEMO_01" 。一般情況下,我們建議一個消費者分組,僅消費一個 Topic 。這樣做會有個好處:每個消費者分組職責單一,只消費一個 Topic
  • 方法引數,使用消費 Topic 對應的訊息類即可。這裡,我們使用了 Demo01Message
  • 雖然說,@KafkaListener 註解是方法級別的,還是建議一個類,對應一個方法,消費訊息。

2.7 Demo01AConsumer

cn.cy.springboot.kafkademo.consumer 包下,建立 Demo01AConsumer 類,消費訊息。

@Component
public class Demo01AConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @KafkaListener(topics = Demo01Message.TOPIC,
            groupId = "demo01-A-consumer-group-" + Demo01Message.TOPIC)
    public void onMessage(ConsumerRecord<Integer, String> record) {
        logger.info("[onMessage][執行緒編號:{} 訊息內容:{}]", Thread.currentThread().getId(), record);
    }
}
  • 整體和 Demo01Consumer 是一致的,主要有兩個差異點,也是為什麼我們又額外建立了這個消費者的原因。

  • 差異一: 在方法上,添加了 @KafkaListener 註解,宣告消費的 Topic 還是 "DEMO_01" ,消費者分組修改成"demo01-A-consumer-group-DEMO_01" 。這樣,我們就可以測試 Kafka 叢集消費的特性。

    • 叢集消費(Clustering):叢集消費模式下,相同 Consumer Group 的每個 Consumer 例項平均分攤訊息。
    • 也就是說,如果我們傳送一條 Topic 為 "DEMO_01" 的訊息,可以分別被 "demo01-A-consumer-group-DEMO_01""demo01-consumer-group-DEMO_01" 都消費一次。
    • 但是,如果我們啟動兩個該示例的例項,則消費者分組 "demo01-A-consumer-group-DEMO_01""demo01-consumer-group-DEMO_01" 都會有多個 Consumer 示例。此時,我們再發送一條 Topic 為 "DEMO_01" 的訊息,只會被 "demo01-A-consumer-group-DEMO_01" 的一個 Consumer 消費一次,也同樣只會被 "demo01-A-consumer-group-DEMO_01" 的一個 Consumer 消費一次。
  • 差異二,方法引數,設定消費的訊息對應的類不是 Demo01Message 類,而是 Kafka 內建的 ConsumerRecord 類。通過 ConsumerRecord 類,我們可以獲取到消費的訊息的更多資訊,例如說訊息的所屬佇列、建立時間等等屬性,不過訊息的內容(value)就需要自己去反序列化。當然,一般情況下,我們不會使用 ConsumerRecord 類。

2.8 簡單測試

建立 Demo01ProducerTest 測試類,編寫二個單元測試方法,呼叫 Demo01Producer 二個傳送訊息的方式。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo01ProducerTest {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private Demo01Producer producer;
    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        SendResult result = producer.syncSend(id);
        logger.info("[testSyncSend][傳送編號:[{}] 傳送結果:[{}]]", id, result);
        // 阻塞等待,保證消費
        new CountDownLatch(1).await();
    }
    @Test
    public void testASyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        producer.asyncSend(id).addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
            @Override
            public void onFailure(Throwable e) {
                logger.info("[testASyncSend][傳送編號:[{}] 傳送異常]]", id, e);
            }
            @Override
            public void onSuccess(SendResult<Object, Object> result) {
                logger.info("[testASyncSend][傳送編號:[{}] 傳送成功,結果為:[{}]]", id, result);
            }
        });
        // 阻塞等待,保證消費
        new CountDownLatch(1).await();
    }
}

執行 #testSyncSend()方法,測試同步傳送訊息。

2020-12-24 16:35:46.040  INFO 25132 --- [           main] c.c.s.k.producer.Demo01ProducerTest      : [testSyncSend][傳送編號:[1608798945] 傳送結果:[SendResult [producerRecord=ProducerRecord(topic=DEMO_01, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 110, 46, 99, 121, 46, 115, 112, 114, 105, 110, 103, 98, 111, 111, 116, 46, 107, 97, 102, 107, 97, 100, 101, 109, 111, 46, 109, 101, 115, 115, 97, 103, 101, 46, 68, 101, 109, 111, 48, 49, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Demo01Message(id=1608798945), timestamp=null), recordMetadata=DEMO_01-0@6]]]
2020-12-24 16:35:46.392  INFO 25132 --- [ntainer#0-0-C-1] c.c.s.k.consumer.Demo01AConsumer         : [onMessage][執行緒編號:18 訊息內容:ConsumerRecord(topic = DEMO_01, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1608798945874, serialized key size = -1, serialized value size = 17, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Demo01Message(id=1608798945))]
2020-12-24 16:35:46.395  INFO 25132 --- [ntainer#1-0-C-1] c.c.s.kafkademo.consumer.Demo01Consumer  : [onMessage][執行緒編號:20 訊息內容:Demo01Message(id=1608798945)]
  • 通過日誌我們可以看到,我們傳送的訊息,分別被 Demo01AConsumerDemo01Consumer 兩個消費者(消費者分組)都消費了一次。
  • 同時,兩個消費者在不同的執行緒中,消費了這條訊息。

執行 #testASyncSend() 方法,測試非同步傳送訊息。
注意,不要關閉 #testSyncSend()單元測試方法,因為我們要模擬每個消費者叢集,都有多個 Consumer 節點。

2020-12-24 16:41:17.765  INFO 19088 --- [ad | producer-1] c.c.s.k.producer.Demo01ProducerTest      : [testASyncSend][傳送編號:[1608799277] 傳送成功,結果為:[SendResult [producerRecord=ProducerRecord(topic=DEMO_01, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 110, 46, 99, 121, 46, 115, 112, 114, 105, 110, 103, 98, 111, 111, 116, 46, 107, 97, 102, 107, 97, 100, 101, 109, 111, 46, 109, 101, 115, 115, 97, 103, 101, 46, 68, 101, 109, 111, 48, 49, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Demo01Message(id=1608799277), timestamp=null), recordMetadata=DEMO_01-0@9]]]
2020-12-24 16:41:20.753  INFO 19088 --- [ntainer#0-0-C-1] c.c.s.k.consumer.Demo01AConsumer         : [onMessage][執行緒編號:18 訊息內容:ConsumerRecord(topic = DEMO_01, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1608799277614, serialized key size = -1, serialized value size = 17, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Demo01Message(id=1608799277))]
  • #testSyncSend()方法執行的結果,是一致的。此時,我們開啟 #testSyncSend()方法所在的控制檯,會看到有訊息消費的日誌。說明,符合叢集消費的機制:叢集消費模式下,相同 Consumer Group 的每個 Consumer 例項平均分攤訊息。
  • 不過如上的日誌,也可能出現在 #testSyncSend() 方法所在的控制檯,而不在 #testASyncSend() 方法所在的控制檯。

2.9 @KafkaListener

@KafkaListener 註解的常用屬性:

/**
 * 監聽的 Topic 陣列
 */
String[] topics() default {};

/**
 * 監聽的 Topic 表示式
 */
String topicPattern() default "";

/**
 * @TopicPartition 註解的陣列。每個 @TopicPartition 註解,可配置監聽的 Topic、佇列、消費的開始位置
 */
TopicPartition[] topicPartitions() default {};

/**
 * 消費者分組
 */
String groupId() default "";

/**
 * 使用消費異常處理器 KafkaListenerErrorHandler 的 Bean 名字
 */
String errorHandler() default "";

/**
 * 自定義消費者監聽器的併發數
 */
String concurrency() default "";

/**
 * 是否自動啟動監聽器。預設情況下,為 true 自動啟動。
 */
String autoStartup() default "";

/**
 * Kafka Consumer 拓展屬性。
 */
String[] properties() default {};

@KafkaListener 註解的不常用屬性:

/**
 * 唯一標識
 */
String id() default "";
/**
 * id 唯一標識的字首
 */
String clientIdPrefix() default "";
/**
 * 當 groupId 未設定時,是否使用 id 作為 groupId
 */
boolean idIsGroup() default true;

/**
 * 使用的 KafkaListenerContainerFactory Bean 的名字。
 * 若未設定,則使用預設的 KafkaListenerContainerFactory Bean 。
 */
String containerFactory() default "";

/**
 * 所屬 MessageListenerContainer Bean 的名字。
 */
String containerGroup() default "";

/**
 * 真實監聽容器的 Bean 名字,需要在名字前加 "__" 。
 */
String beanRef() default "__listener";
  • @TopicPartition 註解
  • @PartitionOffset 註解
  • @KafkaListeners 註解,允許我們在其中,同時新增多個 @KafkaListener 註解。