kafka相關使用以及線上遇到的問題和解決方法
技術標籤:kafka
kafka相關使用以及線上遇到的問題和解決方法
專案選用kafka原因
kafka保證有序:在一個分組下,分割槽只能被一個消費者消費,一個消費者可以消費多個分割槽;傳送訊息時一批指定相同的訊息key,kafka根據key計算所屬分割槽,key相同所屬分割槽也相同
一、正式環境生產者及消費者初始配置
applaction.yml配置:
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: event_upload
auto-offset- reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
消費者監聽
/**
* 監聽上報
* @param record
*/
@KafkaListener(id="eventUploadID" ,topics={"eventUpload"})
public void listen0(ConsumerRecord<String, String> record) {
if (record != null) {
String content = record.value();
// TODO 業務程式碼
// ...
}
}
以上可知,專案剛上線採用自動提交,每次重啟有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
二、生產環境遇到的問題
1.消費者接收到的日誌,生產者中卻不存在該條訊息記錄
問題具體描述如下:
因專案中生產者和消費者的訊息都會被記錄在資料庫中,消費者中的訊息在生產者資料庫中並不存在
原因
因在實際使用中kafka傳送訊息是在業務處理中間傳送的,當傳送訊息後業務拋異常回滾就造成了訊息錯發的現象
解決方法
剛開始想使用kafka事務,但發現在傳送完訊息後業務邏輯發生異常回滾後kafka事務不起效果,後來定位是因為在傳送訊息後的業務中有新建執行緒的操作,只要有新建執行緒的操作就會存在kafka事務不起效果的情況,放棄使用kafka事務。
因考慮到業務程式碼較複雜,想要直接修改程式碼在完成業務之後統一提交訊息工作量太大,採用以下方法解決來實現這種效果:
自定義方法註解,使用AOP攔截有此註解的方法,在執行方法前註冊事務監聽器,在事務內傳送的訊息都存放在池中,事務正常提交發送事務內所有訊息,回滾則清空池
kafka事務應用場景:在完成業務流程後傳送多個訊息,在傳送多個訊息時一個訊息失敗其他訊息也失敗
2.上線幾天後發現kafka消費緩慢,並且出現了重複消費的問題
原因
因消費端的消費效率或網路寬頻、資料量引起了kafka消費遲緩,當時懷疑是poll訊息間隔超過了預設的時間間隔,並且因採用的是自動提交,消費端未能順利提交偏移量,kafka伺服器認為沒有消費又重新發送一遍訊息導致了重複消費
解決方法
1、將提交方式改為手動提交,並且使用MANUAL_IMMEDIATE模式(手動呼叫Acknowledgment.acknowledge()後立即提交),在消費者接收到訊息時立刻呼叫Acknowledgment.acknowledge()提交偏移量
2、調大max.poll.interval.ms(每次poll間隔超時時間)值,將其改為1200000(20分鐘),可根據具體的業務來定
3、記錄kafka的分割槽及偏移量以便維護資料
優化後applaction.yml consumer的配置:
consumer:
group-id: event_upload
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: MANUAL_IMMEDIATE # 手動呼叫Acknowledgment.acknowledge()後立即提交
properties:
max-poll-interval-ms: 1200000
消費者監聽
/**
* 監聽上報
* @param record
*/
@KafkaListener(id="eventUploadID",topics={"eventUpload"})
public void listen0(ConsumerRecord<String, String> record, Acknowledgment ack) {
ack.acknowledge();
if (record != null) {
String content = record.value();
// TODO 業務程式碼
// ...
}
}
3.在放假後第一天上班發現線上kafka又出現消費緩慢的情況,並且堆積了較多訊息
經排查日誌存在報錯:Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
原因
由於取出的一批訊息數量太大,consumer在session.timeout.ms時間之內沒有消費完成引起kafka rebalance
解決方法
session.timeout.ms改為10分鐘,max.poll.records改為10(預設500)
優化後applaction.yml的配置:
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: event_upload
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 10
listener:
ack-mode: MANUAL_IMMEDIATE # 手動呼叫Acknowledgment.acknowledge()後立即提交
properties:
max-poll-interval-ms: 1200000
session-timeout-ms: 600000