Springboot整合訊息佇列Kafka
技術標籤:微服務kafkaspring bootzookeeper
Springboot整合訊息佇列Kafka
一、Spring-Kafka
在 Spring
生態中,提供了 Spring-Kafka
專案,讓我們更簡便的使用 Kafka
。
Spring for Apache Kafka (spring-kafka) 專案將 Spring 核心概念應用於基於 Kafka 的訊息傳遞解決方案的開發。
它提供了一個“模板”作為傳送訊息的高階抽象。
它還通過 @KafkaListener 註解和“偵聽器容器(listener container)”為訊息驅動的 POJO 提供支援。
這些庫促進了依賴注入和宣告的使用。
在所有這些用例中,你將看到 Spring Framework 中的 JMS 支援,以及和 Spring AMQP 中的 RabbitMQ 支援的相似之處。
二、快速入門
先來對 Kafka-Spring
做一個快速入門,實現 Producer
三種傳送訊息的方式的功能,同時建立一個 Consumer
消費訊息。
2.1 引入依賴
<dependencies>
< !-- 引入 Spring-Kafka 依賴 -->
<!-- 已經內建 kafka-clients 依賴,所以無需重複引入 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
< !-- 實現對 JSON 的自動化配置 -->
<!-- 因為,Kafka 對複雜物件的 Message 序列化時,我們會使用到 JSON -->
<!--
同時,spring-boot-starter-json 引入了 spring-boot-starter ,而 spring-boot-starter 又引入了 spring-boot-autoconfigure 。
spring-boot-autoconfigure 實現了 Spring-Kafka 的自動化配置
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<!-- 方便等會寫單元測試 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
2.2 應用配置檔案
在 resources
目錄下,建立 application.yaml
配置檔案。
spring:
# Kafka 配置項,對應 KafkaProperties 配置類
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以設定多個,以逗號分隔
# Kafka Producer 配置項
producer:
acks: 1 # 0-不應答。1-leader 應答。all-所有 leader 和 follower 應答。
retries: 3 # 傳送失敗時,重試傳送的次數
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 訊息的 key 的序列化
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 訊息的 value 的序列化
# Kafka Consumer 配置項
consumer:
auto-offset-reset: earliest # 設定消費者分組最初的消費進度為 earliest 。可參考部落格 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# JsonDeserializer 在反序列化訊息時,考慮到安全性,只反序列化成信任的 Message 類
properties:
spring:
json:
trusted:
packages: cn.cy.springboot.kafkademo.message
# Kafka Consumer Listener 監聽器配置
listener:
missing-topics-fatal: false # 消費監聽介面監聽的主題不存在時,預設會報錯。所以通過設定為 false ,解決報錯
logging:
level:
org:
springframework:
kafka: ERROR # spring-kafka INFO 日誌太多了,所以我們限制只打印 ERROR 級別
apache:
kafka: ERROR # kafka INFO 日誌太多了,所以我們限制只打印 ERROR 級別
2.3 Application
建立 Application.java
類,配置 @SpringBootApplication
註解即可。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
2.4 Demo01Message
在 cn.cy.springboot.kafkademo.message
包下,建立 Demo01Message 訊息類,提供給當前示例使用。
@Data
public class Demo01Message {
public static final String TOPIC = "DEMO_01";
/**
* 編號
*/
private Integer id;
}
TOPIC
靜態屬性,我們設定該訊息類對應 Topic 為"DEMO_01"
。
2.5 Demo01Producer
在 cn.cy.springboot.kafkademo.producer
包下,建立 Demo01Producer
類,它會使用 Kafka-Spring
封裝提供的 KafkaTemplate
,實現三種傳送訊息的方式。
@Component
public class Demo01Producer {
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;
public SendResult syncSend(Integer id) throws ExecutionException, InterruptedException {
// 建立 Demo01Message 訊息
Demo01Message message = new Demo01Message();
message.setId(id);
// 同步傳送訊息
return kafkaTemplate.send(Demo01Message.TOPIC, message).get();
}
public ListenableFuture<SendResult<Object, Object>> asyncSend(Integer id) {
// 建立 Demo01Message 訊息
Demo01Message message = new Demo01Message();
message.setId(id);
// 非同步傳送訊息
return kafkaTemplate.send(Demo01Message.TOPIC, message);
}
}
-
TOPIC
靜態屬性,我們設定該訊息類對應Topic
為"DEMO_01"
。 -
#asyncSend(...)
方法,非同步傳送訊息。在方法內部,會呼叫KafkaTemplate#send(topic, data)
方法,非同步傳送訊息,返回Spring
ListenableFuture 物件,一個可以通過監聽執行結果的 Future 增強。 -
#syncSend(...)
方法,同步傳送訊息。在方法內部,也是呼叫KafkaTemplate#send(topic, data)
方法,非同步傳送訊息。不過,因為我們後面呼叫了 ListenableFuture 物件的#get()
方法,阻塞等待發送結果,從而實現同步的效果。 -
暫時未提供
oneway
傳送訊息的方式。因為需要配置Producer
的acks = 0
,才可以使用這種傳送方式。當然,實際場景下,基本不會使用oneway
的方式來發送訊息,所以直接先忽略吧。 -
在序列化時,我們使用了
JsonSerializer
序列化 Message 訊息物件,它會在 Kafka 訊息 Headers 的__TypeId__
上,值為Message
訊息對應的類全名。 -
在反序列化時,我們使用了
JsonDeserializer
序列化出Message
訊息物件,它會根據Kafka
訊息 Headers 的__TypeId__
的值,反序列化訊息內容成該Message
物件。
2.6 Demo01Consumer
在 cn.cy.springboot.kafkademo.consumer
包下,建立 Demo01Consumer
類,消費訊息。
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-consumer-group-" + Demo01Message.TOPIC)
public void onMessage(Demo01Message message) {
logger.info("[onMessage][執行緒編號:{} 訊息內容:{}]", Thread.currentThread().getId(), message);
}
}
- 在方法上,添加了
@KafkaListener
註解,宣告消費的Topic
是"DEMO_01"
,消費者分組是"demo01-consumer-group-DEMO_01"
。一般情況下,我們建議一個消費者分組,僅消費一個Topic
。這樣做會有個好處:每個消費者分組職責單一,只消費一個Topic
。 - 方法引數,使用消費
Topic
對應的訊息類即可。這裡,我們使用了Demo01Message
。 - 雖然說,
@KafkaListener
註解是方法級別的,還是建議一個類,對應一個方法,消費訊息。
2.7 Demo01AConsumer
在cn.cy.springboot.kafkademo.consumer
包下,建立 Demo01AConsumer
類,消費訊息。
@Component
public class Demo01AConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-A-consumer-group-" + Demo01Message.TOPIC)
public void onMessage(ConsumerRecord<Integer, String> record) {
logger.info("[onMessage][執行緒編號:{} 訊息內容:{}]", Thread.currentThread().getId(), record);
}
}
-
整體和
Demo01Consumer
是一致的,主要有兩個差異點,也是為什麼我們又額外建立了這個消費者的原因。 -
差異一: 在方法上,添加了
@KafkaListener
註解,宣告消費的Topic
還是"DEMO_01"
,消費者分組修改成了"demo01-A-consumer-group-DEMO_01"
。這樣,我們就可以測試 Kafka 叢集消費的特性。- 叢集消費(
Clustering
):叢集消費模式下,相同Consumer Group
的每個Consumer
例項平均分攤訊息。 - 也就是說,如果我們傳送一條 Topic 為
"DEMO_01"
的訊息,可以分別被"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都消費一次。 - 但是,如果我們啟動兩個該示例的例項,則消費者分組
"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都會有多個 Consumer 示例。此時,我們再發送一條 Topic 為"DEMO_01"
的訊息,只會被"demo01-A-consumer-group-DEMO_01"
的一個 Consumer 消費一次,也同樣只會被"demo01-A-consumer-group-DEMO_01"
的一個 Consumer 消費一次。
- 叢集消費(
-
差異二,方法引數,設定消費的訊息對應的類不是
Demo01Message
類,而是Kafka
內建的 ConsumerRecord 類。通過ConsumerRecord
類,我們可以獲取到消費的訊息的更多資訊,例如說訊息的所屬佇列、建立時間等等屬性,不過訊息的內容(value
)就需要自己去反序列化。當然,一般情況下,我們不會使用ConsumerRecord
類。
2.8 簡單測試
建立 Demo01ProducerTest
測試類,編寫二個單元測試方法,呼叫 Demo01Producer
二個傳送訊息的方式。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo01ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo01Producer producer;
@Test
public void testSyncSend() throws ExecutionException, InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][傳送編號:[{}] 傳送結果:[{}]]", id, result);
// 阻塞等待,保證消費
new CountDownLatch(1).await();
}
@Test
public void testASyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
producer.asyncSend(id).addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable e) {
logger.info("[testASyncSend][傳送編號:[{}] 傳送異常]]", id, e);
}
@Override
public void onSuccess(SendResult<Object, Object> result) {
logger.info("[testASyncSend][傳送編號:[{}] 傳送成功,結果為:[{}]]", id, result);
}
});
// 阻塞等待,保證消費
new CountDownLatch(1).await();
}
}
執行 #testSyncSend()
方法,測試同步傳送訊息。
2020-12-24 16:35:46.040 INFO 25132 --- [ main] c.c.s.k.producer.Demo01ProducerTest : [testSyncSend][傳送編號:[1608798945] 傳送結果:[SendResult [producerRecord=ProducerRecord(topic=DEMO_01, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 110, 46, 99, 121, 46, 115, 112, 114, 105, 110, 103, 98, 111, 111, 116, 46, 107, 97, 102, 107, 97, 100, 101, 109, 111, 46, 109, 101, 115, 115, 97, 103, 101, 46, 68, 101, 109, 111, 48, 49, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Demo01Message(id=1608798945), timestamp=null), recordMetadata=DEMO_01-0@6]]]
2020-12-24 16:35:46.392 INFO 25132 --- [ntainer#0-0-C-1] c.c.s.k.consumer.Demo01AConsumer : [onMessage][執行緒編號:18 訊息內容:ConsumerRecord(topic = DEMO_01, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1608798945874, serialized key size = -1, serialized value size = 17, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Demo01Message(id=1608798945))]
2020-12-24 16:35:46.395 INFO 25132 --- [ntainer#1-0-C-1] c.c.s.kafkademo.consumer.Demo01Consumer : [onMessage][執行緒編號:20 訊息內容:Demo01Message(id=1608798945)]
- 通過日誌我們可以看到,我們傳送的訊息,分別被
Demo01AConsumer
和Demo01Consumer
兩個消費者(消費者分組)都消費了一次。 - 同時,兩個消費者在不同的執行緒中,消費了這條訊息。
執行 #testASyncSend()
方法,測試非同步傳送訊息。
注意,不要關閉 #testSyncSend()
單元測試方法,因為我們要模擬每個消費者叢集,都有多個 Consumer
節點。
2020-12-24 16:41:17.765 INFO 19088 --- [ad | producer-1] c.c.s.k.producer.Demo01ProducerTest : [testASyncSend][傳送編號:[1608799277] 傳送成功,結果為:[SendResult [producerRecord=ProducerRecord(topic=DEMO_01, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 110, 46, 99, 121, 46, 115, 112, 114, 105, 110, 103, 98, 111, 111, 116, 46, 107, 97, 102, 107, 97, 100, 101, 109, 111, 46, 109, 101, 115, 115, 97, 103, 101, 46, 68, 101, 109, 111, 48, 49, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Demo01Message(id=1608799277), timestamp=null), recordMetadata=DEMO_01-0@9]]]
2020-12-24 16:41:20.753 INFO 19088 --- [ntainer#0-0-C-1] c.c.s.k.consumer.Demo01AConsumer : [onMessage][執行緒編號:18 訊息內容:ConsumerRecord(topic = DEMO_01, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1608799277614, serialized key size = -1, serialized value size = 17, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Demo01Message(id=1608799277))]
- 和
#testSyncSend()
方法執行的結果,是一致的。此時,我們開啟#testSyncSend()
方法所在的控制檯,會看到有訊息消費的日誌。說明,符合叢集消費的機制:叢集消費模式下,相同Consumer Group
的每個Consumer
例項平均分攤訊息。 - 不過如上的日誌,也可能出現在
#testSyncSend()
方法所在的控制檯,而不在#testASyncSend()
方法所在的控制檯。
2.9 @KafkaListener
@KafkaListener
註解的常用屬性:
/**
* 監聽的 Topic 陣列
*/
String[] topics() default {};
/**
* 監聽的 Topic 表示式
*/
String topicPattern() default "";
/**
* @TopicPartition 註解的陣列。每個 @TopicPartition 註解,可配置監聽的 Topic、佇列、消費的開始位置
*/
TopicPartition[] topicPartitions() default {};
/**
* 消費者分組
*/
String groupId() default "";
/**
* 使用消費異常處理器 KafkaListenerErrorHandler 的 Bean 名字
*/
String errorHandler() default "";
/**
* 自定義消費者監聽器的併發數
*/
String concurrency() default "";
/**
* 是否自動啟動監聽器。預設情況下,為 true 自動啟動。
*/
String autoStartup() default "";
/**
* Kafka Consumer 拓展屬性。
*/
String[] properties() default {};
@KafkaListener
註解的不常用屬性:
/**
* 唯一標識
*/
String id() default "";
/**
* id 唯一標識的字首
*/
String clientIdPrefix() default "";
/**
* 當 groupId 未設定時,是否使用 id 作為 groupId
*/
boolean idIsGroup() default true;
/**
* 使用的 KafkaListenerContainerFactory Bean 的名字。
* 若未設定,則使用預設的 KafkaListenerContainerFactory Bean 。
*/
String containerFactory() default "";
/**
* 所屬 MessageListenerContainer Bean 的名字。
*/
String containerGroup() default "";
/**
* 真實監聽容器的 Bean 名字,需要在名字前加 "__" 。
*/
String beanRef() default "__listener";
@TopicPartition
註解@PartitionOffset
註解@KafkaListeners
註解,允許我們在其中,同時新增多個@KafkaListener
註解。