1. 程式人生 > >Kafka訊息保證不丟失

Kafka訊息保證不丟失

Kafka訊息保證生產的資訊不丟失和重複消費問題
1)使用同步模式的時候,有3種狀態保證訊息被安全生產,在配置為1(只保證寫入leader成功)的話,如果剛好leader partition掛了,資料就會丟失。
2)還有一種情況可能會丟失訊息,就是使用非同步模式的時候,當緩衝區滿了,如果配置為0(還沒有收到確認的情況下,緩衝池一滿,就清空緩衝池裡的訊息),
資料就會被立即丟棄掉。


在資料生產時避免資料丟失的方法:
只要能避免上述兩種情況,那麼就可以保證訊息不會被丟失。
1)就是說在同步模式的時候,確認機制設定為-1,也就是讓訊息寫入leader和所有的副本。
2)還有,在非同步模式下,如果訊息發出去了,但還沒有收到確認的時候,緩衝池滿了,在配置檔案中設定成不限制阻塞超時的時間,也就說讓生產端一直阻塞,這樣也能保證資料不會丟失。
在資料消費時,避免資料丟失的方法:如果使用了storm,要開啟storm的ackfail機制;如果沒有使用storm,確認資料被完成處理之後,再更新offset值。低階API中需要手動控制offset值。

資料重複消費的情況,如果處理
(1)去重:將訊息的唯一標識儲存到外部介質中,每次消費處理時判斷是否處理過;
(2)不管:大資料場景中,報表系統或者日誌資訊丟失幾條都無所謂,不會影響最終的統計分析結

Kafka到底會不會丟資料(data loss)? 通常不會,但有些情況下的確有可能會發生。下面的引數配置及Best practice列表可以較好地保證資料的永續性(當然是trade-off,犧牲了吞吐量)。筆者會在該列表之後對列表中的每一項進行討論,有興趣的同學可以看下後面的分析。


block.on.buffer.full = true
acks = all
retries = MAX_VALUE
max.in.flight.requests.per.connection = 1
使用KafkaProducer.send(record, callback)
callback邏輯中顯式關閉producer:close(0) 
unclean.leader.election.enable=false
replication.factor = 3 
min.insync.replicas = 2
replication.factor > min.insync.replicas
enable.auto.commit=false
訊息處理完成之後再提交位移
給出列表之後,我們從兩個方面來探討一下資料為什麼會丟失:


1. Producer端


  目前比較新版本的Kafka正式替換了Scala版本的old producer,使用了由Java重寫的producer。新版本的producer採用非同步傳送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條訊息放入一個快取中(即RecordAccumulator,本質上使用了佇列來快取記錄),同時後臺的IO執行緒會不斷掃描該快取區,將滿足條件的訊息封裝到某個batch中然後傳送出去。顯然,這個過程中就有一個數據丟失的視窗:若IO執行緒傳送之前client端掛掉了,累積在accumulator中的資料的確有可能會丟失。


  Producer的另一個問題是訊息的亂序問題。假設客戶端程式碼依次執行下面的語句將兩條訊息發到相同的分割槽


producer.send(record1);
producer.send(record2);
如果此時由於某些原因(比如瞬時的網路抖動)導致record1沒有成功傳送,同時Kafka又配置了重試機制和max.in.flight.requests.per.connection大於1(預設值是5,本來就是大於1的),那麼重試record1成功後,record1在分割槽中就在record2之後,從而造成訊息的亂序。很多某些要求強順序保證的場景是不允許出現這種情況的。


  鑑於producer的這兩個問題,我們應該如何規避呢??對於訊息丟失的問題,很容易想到的一個方案就是:既然非同步傳送有可能丟失資料, 我改成同步傳送總可以吧?比如這樣:


producer.send(record).get();
這樣當然是可以的,但是效能會很差,不建議這樣使用。因此特意總結了一份配置列表。個人認為該配置清單應該能夠比較好地規避producer端資料丟失情況的發生:(特此說明一下,軟體配置的很多決策都是trade-off,下面的配置也不例外:應用了這些配置,你可能會發現你的producer/consumer 吞吐量會下降,這是正常的,因為你換取了更高的資料安全性)


block.on.buffer.full = true  儘管該引數在0.9.0.0已經被標記為“deprecated”,但鑑於它的含義非常直觀,所以這裡還是顯式設定它為true,使得producer將一直等待緩衝區直至其變為可用。否則如果producer生產速度過快耗盡了緩衝區,producer將丟擲異常
acks=all  很好理解,所有follower都響應了才認為訊息提交成功,即"committed"
retries = MAX 無限重試,直到你意識到出現了問題:)
max.in.flight.requests.per.connection = 1 限制客戶端在單個連線上能夠傳送的未響應請求的個數。設定此值是1表示kafka broker在響應請求之前client不能再向同一個broker傳送請求。注意:設定此引數是為了避免訊息亂序
使用KafkaProducer.send(record, callback)而不是send(record)方法   自定義回撥邏輯處理訊息傳送失敗
callback邏輯中最好顯式關閉producer:close(0) 注意:設定此引數是為了避免訊息亂序
unclean.leader.election.enable=false   關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免資料丟失
replication.factor >= 3   這個完全是個人建議了,參考了Hadoop及業界通用的三備份原則
min.insync.replicas > 1 訊息至少要被寫入到這麼多副本才算成功,也是提升資料永續性的一個引數。與acks配合使用
保證replication.factor > min.insync.replicas  如果兩者相等,當一個副本掛掉了分割槽也就沒法正常工作了。通常設定replication.factor = min.insync.replicas + 1即可
2. Consumer端


  consumer端丟失訊息的情形比較簡單:如果在訊息處理完成前就提交了offset,那麼就有可能造成資料的丟失。由於Kafka consumer預設是自動提交位移的,所以在後臺提交位移前一定要保證訊息被正常處理了,因此不建議採用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個執行緒中去做。為了避免資料丟失,現給出兩點建議:


enable.auto.commit=false  關閉自動提交位移
在訊息被完整處理之後再手動提交位移