kafka-08-SpringBoot Kafka實戰
注意:
當前的 Kafka 版本無法保證每個訊息“只被儲存一次”。現實中的很多應用程式在訊息里加入唯一識別符號,用於檢測重複訊息,消費者在讀取訊息時可以對它們進行清理。應用程式需要可以做到訊息的“冪等”,也就是說,即使出現了重複訊息,也不會對處理結果的正確性造成負面影響。
整合SpringBoot kafka,加入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
1. 新建或者更新主題
KafkaConfig.java
/** * Description: kafka相關配置 * * @author Xander * datetime: 2021-01-08 17:40 */ @Configuration public class KafkaConfig { /** * 主題 */ public static final String TOPIC_SPRING_KAFKA = "SpringKafka"; /** * 新建或者更新Topic並設定分割槽數為3,分割槽副本數為1, * 這裡設定僅僅時測試使用,主題的分割槽數和每個分割槽的副本數,需要根據業務場景和硬體條件去考慮 * <p> * 我們也可以不手動建立topic,因為kafka server.properties 配置檔案中 auto.create.topics.enable 預設為 true, * 表示如果主題不存在,則自動建立主題, * 分割槽數量由kafka server.properties 配置檔案中 num.partitions 指定,預設是 1 * 所以如果是自動建立主題,則預設的分割槽數為1,分割槽副本數為1 * * @return */ @Bean public NewTopic newOrUpdateTopic() { // 通過TopicBuilder新建或者update Topic, // 注意:主題的分割槽只能新增,不能減少分割槽 return TopicBuilder.name(TOPIC_SPRING_KAFKA).replicas(1).partitions(3).build(); } }
我們也可以不手動建立主題,因為kafka server.properties 配置檔案中auto.create.topics.enable
預設為 true,表示如果主題不存在,則自動建立主題,分割槽數量由kafka server.properties 配置檔案中num.partitions
指定,預設是 1,所以如果這裡不手動建立主題的話,kafka如果檢查到主題不存在,會自動新建分割槽數和副本數都為1的主題。
注意:如果主題已存在,NewTopic如果要update已存在的主題,分割槽數只能大於等於已有的分割槽數,不能減少分割槽。
2. SpringBoot kafka 配置
常用的生產者和消費者相關的配置都列出來,並表明了註釋。
spring:
kafka:
# kafka叢集broker列表 host1:por1,host2:port2,host3:port3
bootstrap-servers: docker01:9092
########生產者配置########
producer:
# compression-type 訊息的壓縮演算法
# 預設情況下是 none,訊息傳送時不會被壓縮。 該引數可以設定為 none, gzip, snappy, lz4, zstd
compression-type: none
# acks 有多少個分割槽副本收到訊息,生產者才會認為訊息寫入是成功的,只能選(0、1、all)
acks: all
# bufferMemory 生產者記憶體緩衝區的大小,下面是32MB
bufferMemory: 33554432
# retries 發生臨時性的錯誤(比如分割槽找不到首領)重試次數,
# 預設情況下,生產者會在每次重試之間等待 100 ms,可以通過 retry.backoff.ms 引數來改變這個時間間隔
retries: 3
# key和value 的序列化器,這兩個預設是 StringSerializer.class
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# batch-size 批次大小,按照位元組數計算
batch-size: 1024
properties:
# 自定義分割槽器
# partitioner:
# class: com.xander.kafka.partitioner.XdPartitioner
# request.timeout.ms 在傳送資料時等待伺服器返回響應的時間
request:
timeout:
ms: 1000
# 傳送批次之前等待更多訊息加入批次的時間
# linger.ms為0,表示生產者每條訊息都直接提交給kafka,不等待批次,這時候batch-size其實就沒用了
linger:
ms: 100
# retry.backoff.ms 每次重試之間的時間間隔,預設是100ms,這裡配置50ms
retry:
backoff:
ms: 50
# max.in.flight.requests.per.connection 在收到伺服器響應之前可以傳送多少個訊息,如果不需要保證訊息順序性的場景,建議不用配置該屬性
# 把它設為 1 可以保證訊息在同一個生產者的某一個分割槽上,是按照發送的順序寫入伺服器的,即使發生了重試。但是會降低Kafka的吞吐量
max:
in:
flight:
requests:
per:
connection: 1
# max.block.ms 緩衝區滿時的最大阻塞時間,在阻塞時間達到 max.block.ms 時,生產者會丟擲超時異常。
block:
ms: 200
########### 消費者配置 ###############
consumer:
# auto-offset-reset: 沒有偏移量的分割槽或者偏移量無效時如何處理
# earliest: 消費者將從起始位置讀取分割槽的記錄
# latest: 消費者將從最新的記錄開始讀取資料
# none:只要有一個分割槽不存在已提交的offset,就丟擲異常;
auto-offset-reset: earliest
# group-id 預設的消費者群組
group-id: defaultGroup
# enable.auto.commit 是否自動提交偏移量,
enableAutoCommit: true
# 自動提交偏移量的間隔時間,100ms
autoCommitInterval: 100ms
# 單次請求能夠返回的記錄數量
max-poll-records: 3
# fetch.max.wait.ms 指定獲取記錄的最大等待時間,這裡是100ms
fetchMaxWait: 100ms
# key和value 的反序列化器,這兩個預設是 StringSerializer.class
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# fetch.min.bytes 從伺服器獲取記錄的最小位元組數
fetch:
min:
bytes: 102400
# request.timeout.ms 消費者請求超時時間
request:
timeout:
ms: 1000
# 會話過期時間
session:
timeout:
ms: 120000
# 向協調器傳送心跳的頻率
heartbeat:
interval:
ms: 40000
# 如果需要批量消費,則需要修改 spring.kafka.listener.type = batch,預設是 single,單次消費單條訊息
# listener:
# type: batch
# 手動提交偏移量時:消費者訊息確認模式改為手動確認
# listener:
# ack-mode: manual
3. 生產者向kafka寫資料
3.1 傳送並忽略結果
我們把訊息傳送給伺服器,但並不關心它是否正常到達。大多數情況下,訊息會正常到達,因為 Kafka 是高可用的,而且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些訊息。因為我們會忽略返回值,所以無法知道訊息是否傳送成功。
如果允許丟失一小部分訊息,並且不關心傳送結果,那麼可以使用這種傳送方式。這種方式可以達到最大的響應速度和吞吐效能。
3.2 同步傳送
返回一個 Future 物件,然後呼叫 Future 物件的get()
方法等待 Kafka 響應。如果伺服器返回錯誤, get() 方法會丟擲異常。如果沒有發生錯誤,我們會得到一個 RecordMetadata 物件,可以用它獲取訊息的主題、分割槽和偏移量等資訊。
3.3 非同步傳送
在非同步傳送訊息方式中生產者提供了回撥支援,可以在回撥中處理異常和獲取訊息的主題、分割槽和偏移量等資訊
Kafka生產者
/**
* Description: Kafka生產者
*
* @author Xander
* datetime: 2021-01-10 10:29
*/
@RestController
@RequestMapping("/kafka")
public class KafkaController {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 傳送訊息
@GetMapping("/{msg}")
public void send(@PathVariable String msg) throws ExecutionException, InterruptedException {
long start = Instant.now().toEpochMilli();
this.logger.info("------start");
// 傳送並忽略結果
// this.sendAndForget(msg);
// 同步傳送
this.sendSync(msg);
// 非同步傳送
// this.sendAsync(msg);
this.logger.info("------end: " + (Instant.now().toEpochMilli() - start));
}
/**
* 傳送並忽略結果
*
* @param msg
*/
private void sendAndForget(String msg) {
kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg);
}
/**
* 同步傳送
*
* @param msg
*/
private void sendSync(String msg) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg);
SendResult<String, String> sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
this.logger.info("傳送成功:" + recordMetadata.topic() + "--" + recordMetadata.partition() + "---" + recordMetadata.offset());
}
/**
* 非同步傳送
*
* @param msg
*/
private void sendAsync(String msg) {
kafkaTemplate.send(KafkaConfig.TOPIC_SPRING_KAFKA, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
logger.info("傳送成功:" + recordMetadata.topic() + "--" + recordMetadata.partition() + "---" + recordMetadata.offset());
}
});
}
}
3.1.2 傳送成功
傳送下面的請求:
http://localhost:8080/kafka/123
用kafka-console-consumer.sh
工具訂閱 SpringKafka 主題,可以看到訊息 '123' 傳送成功
[[email protected] ~]# /usr/local/kafka_2.13-2.6.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.31:9092 --topic SpringKafka
123
4. 消費者從kafka讀資料
**說明:**這裡只演示每次消費單條記錄的案例,如果要批量消費記錄,需要修改 spring.kafka.listener.type = batch,預設是 single (單次消費單條訊息)。
批量消費,請參考 Springboot kafka參考文件:https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation
Springboot kafka參考文件的批量消費舉例
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
4.1 自動提交偏移量
如果消費者屬性enable.auto.commit
被設為 true ,那麼每過auto.commit.interval.ms
(提交時間間隔,預設值是 5s ),消費者會自動把上一次輪詢接收到的最大偏移量提交上去。
自動提交是在輪詢裡進行的,消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,那麼就會提交從上一次輪詢返回的偏移量。
自動提交會可能會導致訊息重複消費
假設我們仍然使用預設的 5s 提交時間間隔,在最近一次提交之後的 3s 發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取訊息。這個時候偏移量已經落後了 3s ,所以在這 3s 內到達的訊息會被重複處理。
每次消費單個記錄,並在輪詢中自動提交偏移量
/**
* Description: Kafka消費者
*
* @author Xander
* datetime: 2021-01-10 10:32
*/
@Component
public class KafkaConsumer {
Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 每次消費單個記錄
*
* @param record
*/
@KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA})
public void onListen(ConsumerRecord<?, ?> record) {
this.logger.info("消費單個記錄----- 主題:" + record.topic() + "-分割槽:" + record.partition() + "-key:" + record.key()
+ "-value:" + record.value() + "-偏移量:" + record.offset());
}
}
4.2 手動提交偏移量
消費者也可以手動提交偏移量,在每處理成功一條訊息後就手動提交一次偏移,這能夠保證已經處理的訊息都被準確的提交。但是在前面說過,當前的 Kafka 版本無法保證每個訊息“只被儲存一次”,例如:當生產者傳送訊息到broker,broker傳送響應的時候,因為網路關係,生產者沒有接收到正確的響應,這時候,會發生重試,再次傳送訊息,這時,就可能產生重複的訊息。
防止訊息重複消費:
建議在生產中對訊息新增唯一標識,在消費者消費訊息的時候,對唯一標識進行判斷,是否已經消費了該訊息,如果已經消費過,則不做任何處理,從而達到防止訊息重複消費的目的。
手動提交偏移量,需要配置 enable.auto.commit = false 取消自動提交,並且 spring.kafka.listener.ack-mode = manual 消費者訊息確認模式改為手動確認
/**
* Description: Kafka消費者,手動提交偏移量,
* 需要配置 enable.auto.commit = false 取消自動提交,並且 spring.kafka.listener.ack-mode = manual 消費者訊息確認模式改為手動確認
*
* 提示:手動提交偏移量,能夠最大程度減少重複消費訊息,但是在訊息未處理完成,提前提交偏移量,也可能導致訊息丟失
* 關於提交偏移量,請參考下面文章的第6節
* [CSDN同步:kafka-05-消費者] https://blog.csdn.net/qq_20633779/article/details/112335534
*
* @author Xander
* datetime: 2021-01-10 10:32
*/
@Component
public class KafkaConsumerWithAck {
Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 每次消費單個記錄,並且手動提交偏移量
*
*
* @param record
*/
@KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA})
public void onListenWithAck(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException {
this.logger.info("消費單個記錄----- 主題:" + record.topic() + "-分割槽:" + record.partition() + "-key:" + record.key()
+ "-value:" + record.value() + "-偏移量:" + record.offset());
//模擬業務邏輯處理。。。
this.logger.info("業務處理中...");
TimeUnit.SECONDS.sleep(10);
// 手動提交偏移量,表示這個偏移量之前的所有記錄已經被處理
ack.acknowledge();
}
}
4.3 模擬MQ的死信佇列
在遇到可重試錯誤時,把錯誤寫入一個獨立的主題, 一個獨立的消費者群組負責從該主題上讀取錯誤訊息,並進行重試,這種模式有點像其他訊息系統裡的dead-letter-queue
/**
* Description: Kafka消費者: 訊息轉發
* 在遇到可重試錯誤時,把錯誤寫入一個獨立的主題, 一個獨立的消費者群組負責從該主題上讀取錯誤訊息,並進行重試,這種模式有點像其他訊息系統裡的 `dead-letter-queue`
*
* @author Xander
* datetime: 2021-01-10 10:32
*/
@Component
public class KafkaConsumerSendTo {
Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 訊息轉發
*
* @param record
*/
@KafkaListener(topics = {KafkaConfig.TOPIC_SPRING_KAFKA}, groupId = "sendToGroupId")
@SendTo("test")
public String onListen(ConsumerRecord<String, String> record) {
this.logger.info("轉發訊息到test主題 ----- 主題:" + record.topic() + "-分割槽:" + record.partition() + "-key:" + record.key()
+ "-value:" + record.value() + "-偏移量:" + record.offset());
// return的資料就是轉發到 test 主題的訊息
return record.value();
}
}
5. 自定義分割槽器
傳送訊息時候,kafkaTemplate會通過傳入的 主題topic、分割槽partition、鍵key、值value,其中分割槽partition和鍵key是可選的,建立一個ProducerRecord
物件。
-
如果在
ProducerRecord
物件裡指定了分割槽,那麼分割槽器就不會再做任何事情,直接把指定的分割槽返回。 -
如果沒有指定分割槽 ,那麼分割槽器會根據 key 來選擇一個分割槽 。
選好分割槽以後 ,生產者就知道該往哪個主題和分割槽傳送這條記錄了。 -
如果 key 為 null , 並且使用了預設的分割槽器,那麼記錄將被隨機地傳送到主題內各個可用的分割槽上。分割槽器使用輪詢(Round Robin )演算法將訊息均衡地分佈到各個分割槽上。
-
如果鍵不為空,並且使用了預設的分割槽器,那麼 Kafka 會對鍵進行雜湊,然後根據雜湊值把訊息對映到特定的分割槽上。這裡的關鍵之處在於 ,同一個鍵總是被對映到同一個分割槽上 ,所以在進行對映時,我們會使用主題所有的分割槽,而不僅僅是可用的分割槽 。這也意味著,如果寫入資料的分割槽是不可用的,那麼就會發生錯誤。但這種情況很少發生。
上面說的是預設的分割槽器,我們也可以根據業務場景自定義分割槽器。
新建一個 org.apache.kafka.clients.producer.Partitioner 介面的實現類 com.xander.kafka.partitioner.XdPartitioner,然後配置
spring.kafka.producer.properties.partitioner.class= com.xander.kafka.partitioner.XdPartitioner
自定義Kafka分割槽器,每條訊息都發送到分割槽0
/**
* Description: 自定義Kafka分割槽器,每條訊息都發送到分割槽0
*
* @author Xander
* datetime: 2021-01-13 19:41
*/
public class XdPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//這裡可以根據業務場景將訊息路由到不同的分割槽
// return 0 表示每條訊息都發送到分割槽0
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
依次傳送請求:
http://localhost:8080/kafka/111
http://localhost:8080/kafka/222
http://localhost:8080/kafka/333
通過日誌列印可以看到,傳送的訊息都被路由到分割槽0上了,列印的日誌格式是 “傳送成功:主題--分割槽---偏移量”。
2021-01-13 19:55:27.957 INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController : ------start
2021-01-13 19:55:28.192 INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController : 傳送成功:SpringKafka--0---0
2021-01-13 19:55:28.193 INFO 17384 --- [nio-8080-exec-1] c.xander.kafka.producer.KafkaController : ------end: 236
2021-01-13 19:55:34.935 INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController : ------start
2021-01-13 19:55:35.037 INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController : 傳送成功:SpringKafka--0---1
2021-01-13 19:55:35.038 INFO 17384 --- [nio-8080-exec-2] c.xander.kafka.producer.KafkaController : ------end: 103
2021-01-13 19:55:38.163 INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController : ------start
2021-01-13 19:55:38.268 INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController : 傳送成功:SpringKafka--0---2
2021-01-13 19:55:38.268 INFO 17384 --- [nio-8080-exec-4] c.xander.kafka.producer.KafkaController : ------end: 105
6. 業務場景舉例
一個應用程式在很多情況下需要往 Kafka 寫入訊息 :
- 記錄使用者的活動(用於審計和分析)、
- 記錄度量指標、
- 儲存日誌訊息、
- 記錄智慧家電的資訊、
- 與其他應用程式進行非同步通訊、
- 緩衝即將寫入到資料庫的資料,等等。
多樣的使用場景意味著多樣的需求:
- 是否每個訊息都很重要?
- 是否允許丟失一小部分訊息?
- 偶爾出現重複訊息是否可以接受?
- 是否有嚴格的延遲和吞吐量要求?
6.1 不允許的訊息丟失或訊息重複,允許一點點的延遲
在信用卡事務處理系統裡,訊息丟失或訊息重複是不允許的,可以接受的延遲最大為 500ms ,對吞吐量要求較高 我們希望每秒鐘可以處理一百萬個訊息。
這種情況下,實現方案:建議生產者端可以使用同步傳送解決訊息丟失問題,同時給訊息新增唯一標識,來解決訊息的重複消費問題。
6.2 允許丟失少量的訊息或出現少量的訊息重複,追求高響應和高吞吐
儲存網站的點選資訊是另一種使用場景。在這個場景裡,允許丟失少量的訊息或出現少量的訊息重複,只要不影響使用者體驗就行,在數以千萬計的點選量中,丟失少量的訊息並不會有什麼影響。
這種情況下,實現方案:建議生產者使用傳送並忘記的方式來發送訊息,如果系統要對傳送失敗的訊息進行處理,則可以使用非同步傳送的方式,在回撥中處理異常,以追求最大的吞吐量。
[慕課手記同步:kafka-08-SpringBoot Kafka實戰]https://www.imooc.com/article/314288
歡迎關注文章同步公眾號"黑桃"