kafka資料丟包原因及解決方案
資料丟失是一件非常嚴重的事情事,針對資料丟失的問題我們需要有明確的思路來確定問題所在,針對這段時間的總結,我個人面對kafka 資料丟失問題的解決思路如下:
- 是否真正的存在資料丟失問題,比如有很多時候可能是其他同事操作了測試環境,所以首先確保資料沒有第三方干擾。
- 理清你的業務流程,資料流向,資料到底是在什麼地方丟失的資料,在kafka 之前的環節或者kafka之後的流程丟失?比如kafka的資料是由flume提供的,也許是flume丟失了資料,kafka 自然就沒有這一部分資料。
- 如何發現有資料丟失,又是如何驗證的。從業務角度考慮,例如:教育行業,每年高考後資料量巨大,但是卻反常的比高考前還少,或者源端資料量和目的端資料量不符
- 定位資料是否在kafka之前就已經丟失還事消費端丟失資料的
- kafka支援資料的重新回放功能(換個消費group),清空目的端所有資料,重新消費。
- 如果是在消費端丟失資料,那麼多次消費結果完全一模一樣的機率很低。
- 如果是在寫入端丟失資料,那麼每次結果應該完全一樣(在寫入端沒有問題的前提下)。
- kafka環節丟失資料,常見的kafka環節丟失資料的原因有:
- 如果auto.commit.enable=true,當consumer fetch了一些資料但還沒有完全處理掉的時候,剛好到commit interval出發了提交offset操作,接著
- 網路負載很高或者磁碟很忙寫入失敗的情況下,沒有自動重試重發訊息。沒有做限速處理,超出了網路頻寬限速。kafka一定要配置上訊息重試的機制,並且重試的時間間隔一定要長一些,預設1秒鐘並不符合生產環境(網路中斷時間有可能超過1秒)。
- 如果磁碟壞了,會丟失已經落盤的資料
- 單批資料的長度超過限制會丟失資料,報kafka.common.MessageSizeTooLargeException異常
解決: - Consumer side:fetch.message
- Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
- Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
- Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
- partition leader在未完成副本數follows的備份時就宕機的情況,即使選舉出了新的leader但是已經push的資料因為未備份就丟失了!
kafka是多副本的,當你配置了同步複製之後。多個副本的資料都在PageCache裡面,出現多個副本同時掛掉的概率比1個副本掛掉的概率就很小了。(官方推薦是通過副本來保證資料的完整性的) - kafka的資料一開始就是儲存在PageCache上的,定期flush到磁碟上的,也就是說,不是每個訊息都被儲存在磁碟了,如果出現斷電或者機器故障等,PageCache上的資料就丟失了。
可以通過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔,interval大丟的資料多些,小會影響效能但在0.8版本,可以通過replica機制保證資料不丟,代價就是需要更多資源,尤其是磁碟資源,kafka當前支援GZip和Snappy壓縮,來緩解這個問題 是否使用replica取決於在可靠性和資源代價之間的balance
同時kafka也提供了相關的配置引數,來讓你在效能與可靠性之間權衡(一般預設):
當達到下面的訊息數量時,會將資料flush到日誌檔案中。預設10000
log.flush.interval.messages=10000
當達到下面的時間(ms)時,執行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。預設3000ms
log.flush.interval.ms=1000
檢查是否需要將日誌flush的時間間隔
log.flush.scheduler.interval.ms = 3000
Kafka的優化建議
producer端:
- 設計上保證資料的可靠安全性,依據分割槽數做好資料備份,設立副本數等。
push資料的方式:同步非同步推送資料:權衡安全性和速度性的要求,選擇相應的同步推送還是非同步推送方式,當發現數據有問題時,可以改為同步來查詢問題。 - flush是kafka的內部機制,kafka優先在記憶體中完成資料的交換,然後將資料持久化到磁碟.kafka首先會把資料快取(快取到記憶體中)起來再批量flush.
可以通過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔 - 可以通過replica機制保證資料不丟.
代價就是需要更多資源,尤其是磁碟資源,kafka當前支援GZip和Snappy壓縮,來緩解這個問題
是否使用replica(副本)取決於在可靠性和資源代價之間的balance(平衡) - broker到 Consumer kafka的consumer提供兩種介面.
- high-level版本已經封裝了對partition和offset的管理,預設是會定期自動commit offset,這樣可能會丟資料的
- low-level版本自己管理spout執行緒和partition之間的對應關係和每個partition上的已消費的offset(定期寫到zk)
並且只有當這個offset被ack後,即成功處理後,才會被更新到zk,所以基本是可以保證資料不丟的即使spout執行緒crash(崩潰),重啟後還是可以從zk中讀到對應的offset
- 非同步要考慮到partition leader在未完成副本數follows的備份時就宕機的情況,即使選舉出了新的leader但是已經push的資料因為未備份就丟失了!
- 不能讓記憶體的緩衝池太滿,如果滿了記憶體溢位,也就是說資料寫入過快,kafka的緩衝池資料落盤速度太慢,這時肯定會造成資料丟失。
- 儘量保證生產者端資料一直處於執行緒阻塞狀態,這樣一邊寫記憶體一邊落盤。
- 非同步寫入的話還可以設定類似flume回滾型別的batch數,即按照累計的訊息數量,累計的時間間隔,累計的資料大小設定batch大小。
- 設定合適的方式,增大batch 大小來減小網路IO和磁碟IO的請求,這是對於kafka效率的思考。
- 不過非同步寫入丟失資料的情況還是難以控制
- 還是得穩定整體叢集架構的執行,特別是zookeeper,當然正對非同步資料丟失的情況儘量保證broker端的穩定運作吧
kafka不像hadoop更致力於處理大量級資料,kafka的訊息佇列更擅長於處理小資料。針對具體業務而言,若是源源不斷的push大量的資料(eg:網路爬蟲),可以考慮訊息壓縮。但是這也一定程度上對CPU造成了壓力,還是得結合業務資料進行測試選擇
- 結合上游的producer架構,
broker端:
topic設定多分割槽,分割槽自適應所在機器,為了讓各分割槽均勻分佈在所在的broker中,分割槽數要大於broker數。分割槽是kafka進行並行讀寫的單位,是提升kafka速度的關鍵。
- broker能接收訊息的最大位元組數的設定一定要比消費端能消費的最大位元組數要小,否則broker就會因為消費端無法使用這個訊息而掛起。
- broker可賦值的訊息的最大位元組數設定一定要比能接受的最大位元組數大,否則broker就會因為資料量的問題無法複製副本,導致資料丟失
comsumer端:
關閉自動更新offset,等到資料被處理後再手動跟新offset。
在消費前做驗證前拿取的資料是否是接著上回消費的資料,不正確則return先行處理排錯。
一般來說zookeeper只要穩定的情況下記錄的offset是沒有問題,除非是多個consumer group 同時消費一個分割槽的資料,其中一個先提交了,另一個就丟失了。
問題:
kafka的資料一開始就是儲存在PageCache上的,定期flush到磁碟上的,也就是說,不是每個訊息都被儲存在磁碟了,如果出現斷電或者機器故障等,PageCache上的資料就丟失了。
這個是總結出的到目前為止沒有發生丟失資料的情況
//producer用於壓縮資料的壓縮型別。預設是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理訊息越多,壓縮效能越好
props.put("compression.type", "gzip");
//增加延遲
props.put("linger.ms", "50");
//這意味著leader需要等待所有備份都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失資料。這是最強的保證。,
props.put("acks", "all");
//無限重試,直到你意識到出現了問題,設定大於0的值將使客戶端重新發送任何資料,一旦這些資料傳送失敗。注意,這些重試與客戶端接收到傳送錯誤時的重試沒有什麼不同。允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是傳送到同一個partition,則第一個訊息失敗第二個傳送成功,則第二條訊息會比第一條訊息出現要早。
props.put("retries ", MAX_VALUE);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000);
//關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免資料丟失
props.put("unclean.leader.election.enable", false);
//關閉自動提交offset
props.put("enable.auto.commit", false);
限制客戶端在單個連線上能夠傳送的未響應請求的個數。設定此值是1表示kafka broker在響應請求之前client不能再向同一個broker傳送請求。注意:設定此引數是為了避免訊息亂序
props.put("max.in.flight.requests.per.connection", 1);
Kafka重複消費原因
強行kill執行緒,導致消費後的資料,offset沒有提交,partition就斷開連線。比如,通常會遇到消費的資料,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本預設是30秒),那麼就會re-blance重平衡,此時有一定機率offset沒提交,會導致重平衡後重復消費。
如果在close之前呼叫了consumer.unsubscribe()則有可能部分offset沒提交,下次重啟會重複消費
kafka資料重複 kafka設計的時候是設計了(at-least once)至少一次的邏輯,這樣就決定了資料可能是重複的,kafka採用基於時間的SLA(服務水平保證),訊息儲存一定時間(通常為7天)後會被刪除
kafka的資料重複一般情況下應該在消費者端,這時log.cleanup.policy = delete使用定期刪除機制