1. 程式人生 > WINDOWS開發 >kafka API

kafka API

(1)Producer的API

  1、傳送流程:Kafka 的 Producer 傳送訊息採用的是非同步傳送的方式。在訊息傳送的過程中,涉及到了兩個執行緒——main 執行緒和 Sender 執行緒,以及一個執行緒共享變數——RecordAccumulator。main 執行緒將訊息傳送給 RecordAccumulator,Sender 執行緒不斷從 RecordAccumulator 中拉取訊息傳送到 Kafka broker。 如下圖所示;

  batch.size:只有資料積累到 batch.size 之後,sender 才會傳送資料。

  linger.ms:如果資料遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送資料。

  ProducerRecord:每條資料都要封裝成一個ProducerRecord物件

技術分享圖片

  2、Producer的非同步傳送:分帶回調函式呼叫API和不帶回調函式API

    回撥函式會在producer收到ack時呼叫,為非同步呼叫,該方法有兩個引數,分別是RecordMetadata和Exception,如果Exception為null,說明訊息傳送成功,如果Exception不為null,說明訊息傳送失敗。

    注意:訊息傳送失敗會自動重試,不需要我們在回撥函式中手動重試。

  3、producer的同步傳送:同步傳送的意思就是,一條訊息傳送之後,會阻塞當前執行緒,直至返回 ack。

(2)consumer的API:Consumer 消費資料時的可靠性是很容易保證的,因為資料在 Kafka 中是持久化的,故不用擔心資料丟失問題。由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。 所以 offset 的維護是 Consumer 消費資料是必須考慮的問題。

  1、提交offset的方式

   1)自動提交offset:在建立一個消費者時,預設是自動提交偏移量,這種方式也被稱為【at most once】,fetch到訊息後就可以更新offset,無論是否消費成功。

   2)手動提交:對偏移量實行更加精確的管理,以保證訊息不被重複消費以及訊息不被丟失。這種方式稱為【at least once】。fetch到訊息後,等消費完成再呼叫方法consumer.commitSync(),手動更新offset;如果消費失敗,則offset也不會更新,此條訊息會被重複消費一次。手動提交又可以分為兩種

    手動提交 offset 的方法有兩種:分別是 commitSync(同步提交)和 commitAsync(非同步提交)。兩者的相同點是,都會將本次 poll 的一批資料最高的偏移量提交;不同點是,commitSync 阻塞當前執行緒,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而 commitAsync 則沒有失敗重試機制,故有可能提交失敗。

    a、同步提交:同步模式下提交失敗時一直嘗試提交,直到遇到無法重試的情況下才會結束,同時,同步方式下消費者執行緒在拉取訊息時會被阻塞,直到偏移量提交操作成功或者在提交過程中發生錯誤。

    b、雖然同步提交 offset 更可靠一些,但是非同步方式下消費者執行緒不會被阻塞,可能在提交偏移量操作的結果還未返回時就開始進行下一次的拉取操作,在提交失敗時也不會嘗試提交。

   3)資料漏消費和重複消費分析:無論是同步提交還是非同步提交 offset,都有可能會造成資料的漏消費或者重複消費。先提交 offset 後消費,有可能造成資料的漏消費;而先消費後提交 offset,有可能會造成資料的重複消費。

(3)kafka的攔截器:一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。主要的是Kafka Producer端的攔截器,它主要用來對訊息進行攔截或者修改,也可以用於Producer的Callback回撥之前進行相應的預處理。