1. 程式人生 > 其它 >kafka相關使用以及線上遇到的問題和解決方法

kafka相關使用以及線上遇到的問題和解決方法

技術標籤:kafka

kafka相關使用以及線上遇到的問題和解決方法


專案選用kafka原因

kafka保證有序:在一個分組下,分割槽只能被一個消費者消費,一個消費者可以消費多個分割槽;傳送訊息時一批指定相同的訊息key,kafka根據key計算所屬分割槽,key相同所屬分割槽也相同

一、正式環境生產者及消費者初始配置

applaction.yml配置:

producer:
  retries: 0
  batch-size: 16384
  buffer-memory: 33554432
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
  group-id: event_upload
  auto-offset-
reset: earliest enable-auto-commit: true auto-commit-interval: 100 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

消費者監聽

/**
     * 監聽上報
     * @param record
     */
    @KafkaListener(id="eventUploadID"
,topics={"eventUpload"}) public void listen0(ConsumerRecord<String, String> record) { if (record != null) { String content = record.value(); // TODO 業務程式碼 // ... } }

以上可知,專案剛上線採用自動提交,每次重啟有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

二、生產環境遇到的問題

1.消費者接收到的日誌,生產者中卻不存在該條訊息記錄

問題具體描述如下:

因專案中生產者和消費者的訊息都會被記錄在資料庫中,消費者中的訊息在生產者資料庫中並不存在

原因

因在實際使用中kafka傳送訊息是在業務處理中間傳送的,當傳送訊息後業務拋異常回滾就造成了訊息錯發的現象

解決方法

剛開始想使用kafka事務,但發現在傳送完訊息後業務邏輯發生異常回滾後kafka事務不起效果,後來定位是因為在傳送訊息後的業務中有新建執行緒的操作,只要有新建執行緒的操作就會存在kafka事務不起效果的情況,放棄使用kafka事務。
因考慮到業務程式碼較複雜,想要直接修改程式碼在完成業務之後統一提交訊息工作量太大,採用以下方法解決來實現這種效果:
自定義方法註解,使用AOP攔截有此註解的方法,在執行方法前註冊事務監聽器,在事務內傳送的訊息都存放在池中,事務正常提交發送事務內所有訊息,回滾則清空池

kafka事務應用場景:在完成業務流程後傳送多個訊息,在傳送多個訊息時一個訊息失敗其他訊息也失敗

2.上線幾天後發現kafka消費緩慢,並且出現了重複消費的問題

原因

因消費端的消費效率或網路寬頻、資料量引起了kafka消費遲緩,當時懷疑是poll訊息間隔超過了預設的時間間隔,並且因採用的是自動提交,消費端未能順利提交偏移量,kafka伺服器認為沒有消費又重新發送一遍訊息導致了重複消費

解決方法

1、將提交方式改為手動提交,並且使用MANUAL_IMMEDIATE模式(手動呼叫Acknowledgment.acknowledge()後立即提交),在消費者接收到訊息時立刻呼叫Acknowledgment.acknowledge()提交偏移量
2、調大max.poll.interval.ms(每次poll間隔超時時間)值,將其改為1200000(20分鐘),可根據具體的業務來定
3、記錄kafka的分割槽及偏移量以便維護資料

優化後applaction.yml consumer的配置:

consumer:
  group-id: event_upload
  auto-offset-reset: earliest
  enable-auto-commit: false
  auto-commit-interval: 100
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
  ack-mode: MANUAL_IMMEDIATE # 手動呼叫Acknowledgment.acknowledge()後立即提交
properties:
  max-poll-interval-ms: 1200000

消費者監聽

/**
     * 監聽上報
     * @param record
     */
    @KafkaListener(id="eventUploadID",topics={"eventUpload"})
    public void listen0(ConsumerRecord<String, String> record, Acknowledgment ack) {
        ack.acknowledge();
        if (record != null) {
            String content = record.value();
            // TODO 業務程式碼
            // ...
        }
    }

3.在放假後第一天上班發現線上kafka又出現消費緩慢的情況,並且堆積了較多訊息

經排查日誌存在報錯:Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

原因

由於取出的一批訊息數量太大,consumer在session.timeout.ms時間之內沒有消費完成引起kafka rebalance

解決方法

session.timeout.ms改為10分鐘,max.poll.records改為10(預設500)

優化後applaction.yml的配置:

producer:
  retries: 0
  batch-size: 16384
  buffer-memory: 33554432
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
  group-id: event_upload
  auto-offset-reset: earliest
  enable-auto-commit: false
  auto-commit-interval: 100
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  max-poll-records: 10
listener:
  ack-mode: MANUAL_IMMEDIATE # 手動呼叫Acknowledgment.acknowledge()後立即提交
properties:
  max-poll-interval-ms: 1200000
  session-timeout-ms: 600000