1. 程式人生 > >kafka:如何保證訊息不丟失不重複

kafka:如何保證訊息不丟失不重複

首先要考慮這麼幾個問題:

訊息丟失是什麼造成的?(從生產端和消費端兩個角度來考慮)

訊息重複是什麼造成的?(從生產端和消費端兩個角度來考慮)

如何保證訊息有序?

如果保證訊息不重不漏,損失的是什麼?

下面是文章詳情,這裡先簡單總結一下:

消費端重複消費:很容易解決,建立去重表

消費端丟失資料:也容易解決,關閉自動提交offset,處理完之後受到移位。

生產端重複傳送:這個不重要,消費端消費之前從去重表中判重就可以。

生產端丟失資料:這個是最麻煩的情況。

kafka動態維護了一個同步狀態的副本的集合(a set of In-Sync Replicas),簡稱ISR。

在這個集合中的節點都是和leader保持高度一致的,任何一條訊息只有被這個集合中的每個節點讀取並追加到日誌中,才會向外部通知說“這個訊息已經被提交”。

只有當訊息被所有的副本加入到日誌中時,才算是“committed”,只有committed的訊息才會傳送給consumer,這樣就不用擔心一旦leader down掉了訊息會丟失。

訊息從leader複製到follower, 我們可以通過決定producer是否等待訊息被提交的通知(ack)來區分同步複製和非同步複製。

同步傳送:

發出訊息後,必須阻塞等待收到通知後,才傳送下一條訊息。

特點:同步傳送模式雖然吞吐量小,但是發一條收到確認後再發下一條,既能保證不丟失訊息,又能保證順序。

非同步傳送:

一直往緩衝區寫,然後一把寫到佇列中去。

特點:非同步的可能會丟失資料。

如何保證有序:

如果有一個傳送失敗了,後面的就不能繼續發了,不然重發的那個肯定亂序了。

生產者在收到傳送成功的反饋之前,不能發下一條資料,但我感覺生產者是一個流,阻塞生產者感覺業務上不可行,怎麼會因為一條訊息發出去沒收到反饋,就阻塞生產者。

解決策略:

1.非同步方式緩衝區滿了,就阻塞在那,等著緩衝區可用,不能清空緩衝區。

2.傳送訊息之後回撥函式,傳送成功就傳送下一條,傳送失敗就記在日誌中,等著定時指令碼來掃描。

注意:傳送失敗可能並不真的傳送失敗,只是沒收到反饋,定時指令碼可能會重發。

ack:

ack確認機制設定為0,表示不等待響應,不等待borker的確認資訊,最小延遲,producer無法知道訊息是否發生成功,訊息可能丟失,但具有最大吞吐量。

ack確認機制設定為-1,也就是讓訊息寫入leader和所有的副本,ISR列表中的所有replica都返回確認訊息。

ack確認機制設定為1,leader已經接收了資料的確認資訊,replica非同步拉取訊息,比較折中。

ack確認機制設定為2,表示producer寫partition leader和其他一個follower成功的時候,broker就返回成功,無論其他的partition follower是否寫成功。

ack確認機制設定為 "all" 即所有副本都同步到資料時send方法才返回, 以此來完全判斷資料是否傳送成功, 理論上來講資料不會丟失。

min.insync.replicas=1  意思是至少有1個replica返回成功,否則product異常

===================Kafka訊息保證生產的資訊不丟失和重複消費問題===================

Kafka到底會不會丟資料(data loss)?

通常不會,但有些情況下的確有可能會發生。
下面的引數配置及Best practice列表可以較好地保證資料的永續性(當然是trade-off 即權衡利弊,犧牲了吞吐量)。
筆者會在該列表之後對列表中的每一項進行討論,有興趣的同學可以看下後面的分析。

沒有銀彈(徹底消除軟體危機,帶來軟體研發的變革的神奇武器)。

如果想要高吞吐量就要能容忍偶爾的失敗(比如重發漏發無順序保證)。

block.on.buffer.full = true
acks = all
retries = MAX_VALUE
max.in.flight.requests.per.connection = 1
使用KafkaProducer.send(record, callback)
callback邏輯中顯式關閉producer:close(0) 
unclean.leader.election.enable=false
replication.factor = 3 
min.insync.replicas = 2

replication.factor > min.insync.replicas
enable.auto.commit=false

訊息處理完成之後再提交位移

產生資料丟失的兩種情況:

1)同步模式:有3種狀態保證訊息被安全生產,但是在配置為1(只保證寫入leader成功)的話,如果剛好leader partition掛了,資料就會丟失。
2)非同步模式:當緩衝區滿了,如果配置為0(還沒有收到確認的情況下,緩衝池一滿,就清空緩衝池裡的訊息),資料就會被立即丟棄掉。

在資料生產時避免資料丟失的方法:

1)同步模式:確認機制設定為-1,也就是讓訊息寫入leader和所有的副本。
2)非同步模式:如果訊息發出去了,但還沒有收到確認的時候,緩衝池滿了,在配置檔案中設定成不限制阻塞超時的時間,也就說讓生產端一直阻塞,這樣也能保證資料不會丟失。
在資料消費時,避免資料丟失的方法:如果使用了storm,要開啟storm的ackfail機制。

如果沒有使用storm,確認資料被完成處理之後,再更新offset值。低階API中需要手動控制offset值。

訊息佇列的問題都要從源頭找問題,就是生產者是否有問題。

討論一種情況:

如果資料傳送成功,但是接受response的時候丟失了,機器重啟之後就會重發。

重發 解決:

消費端增加去重表就能解決,但是如果生產者丟失了資料,問題就很麻煩了。

重複消費 解決:
(1)去重:將訊息的唯一標識儲存到外部介質中,每次消費處理時判斷是否處理過。
(2)不管:大資料場景中,報表系統或者日誌資訊丟失幾條都無所謂,不會影響最終的統計分析結果。


給出列表之後,我們從兩個方面來探討一下資料為什麼會丟失:


1. Producer端
 

問題1:丟失訊息

目前比較新版本的Kafka正式替換了Scala版本的old producer,使用了由Java重寫的producer。新版本的producer採用非同步傳送機制。

      KafkaProducer.send(ProducerRecord)方法僅僅是把這條訊息放入一個快取中(即RecordAccumulator,本質上使用了佇列來快取記錄),同時後臺的IO執行緒會不斷掃描該快取區,將滿足條件的訊息封裝到某個batch中然後傳送出去。顯然,這個過程中就有一個數據丟失的視窗:若IO執行緒傳送之前client端掛掉了,累積在accumulator中的資料的確有可能會丟失。
 

問題2:訊息亂序

假設客戶端程式碼依次執行下面的語句將兩條訊息發到相同的分割槽
producer.send(record1);
producer.send(record2);


如果此時由於某些原因(比如瞬時的網路抖動)導致record1沒有成功傳送。

同時Kafka又配置了重試機制和max.in.flight.requests.per.connection大於1(預設值是5,本來就是大於1的)。

那麼重試record1成功後,record1在分割槽中就在record2之後,從而造成訊息的亂序。

很多某些要求強順序保證的場景是不允許出現這種情況的。

傳送之後重發就會丟失順序。


解決:

鑑於producer的這兩個問題,我們應該如何規避呢??

對於訊息丟失的問題,很容易想到的一個方案就是:

既然非同步傳送有可能丟失資料, 我改成同步傳送總可以。比如這樣:producer.send(record).get();
這樣當然是可以的,但是效能會很差,不建議這樣使用。

因此特意總結了一份配置列表:

個人認為該配置清單應該能夠比較好地規避producer端資料丟失情況的發生。

特此說明一下:軟體配置的很多決策都是trade-off(平衡,權衡,按實際情況權衡一下,得到最終決策)。

下面的配置也不例外:
應用了這些配置,你可能會發現你的producer/consumer 吞吐量會下降,這是正常的,因為你換取了更高的資料安全性。
 

block.on.buffer.full = true

儘管該引數在0.9.0.0已經被標記為“deprecated”(不贊成,反對的意思),但鑑於它的含義非常直觀。

所以這裡還是顯式設定它為true,使得producer將一直等待緩衝區直至其變為可用。否則如果producer生產速度過快耗盡了緩衝區,producer將丟擲異常。緩衝區滿了就阻塞在那,不要拋異常,也不要丟失資料。


acks=all

很好理解,所有follower都響應了才認為訊息提交成功,即"committed"。


retries = MAX

無限重試,直到你意識到出現了問題


max.in.flight.requests.per.connection = 1

限制客戶端在單個連線上能夠傳送的未響應請求的個數。設定此值是1表示kafka broker在響應請求之前client不能再向同一個broker傳送請求。注意:設定此引數是為了避免訊息亂序
使用KafkaProducer.send(record, callback)而不是send(record)方法   自定義回撥邏輯處理訊息傳送失敗,比如記錄在日誌中,用定時指令碼掃描重處理。


callback

邏輯中最好顯式關閉producer:close(0)

注意:設定此引數是為了避免訊息亂序(僅僅因為一條訊息傳送沒收到反饋就關閉生產者,感覺代價很大)。


unclean.leader.election.enable=false

關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免資料丟失。


replication.factor >= 3

這個完全是個人建議了,參考了Hadoop及業界通用的三備份原則


min.insync.replicas > 1

訊息至少要被寫入到這麼多副本才算成功,也是提升資料永續性的一個引數。與acks配合使用。


保證

replication.factor > min.insync.replicas  

如果兩者相等,當一個副本掛掉了分割槽也就沒法正常工作了。

通常設定

replication.factor = min.insync.replicas + 1

即可。

2. Consumer端
  

consumer端丟失訊息的情形比較簡單:

如果在訊息處理完成前就提交了offset,那麼就有可能造成資料的丟失。

由於Kafka consumer預設是自動提交位移的,所以在後臺提交位移前一定要保證訊息被正常處理了,因此不建議採用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個執行緒中去做。

為了避免資料丟失,現給出兩點建議
enable.auto.commit=false

關閉自動提交位移,在訊息被完整處理之後再手動提交位移。

-------------------------------------------------------------------------------------------------------------------------------------------------------

總結1:

訊息的完整性和系統的吞吐量是互斥的,為了確保訊息不丟失就必然會損失系統的吞吐量

producer:

1、ack設定-1

2、設定副本同步成功的最小同步個數為副本數-1

3、加大重試次數

4、同步傳送

5、對於單條資料過大,要設定可接收的單條資料的大小

6、對於非同步傳送,通過回撥函式來感知丟訊息

7、配置不允許非ISR集合中的副本當leader

8、客戶端緩衝區滿了也可能會丟訊息;或者非同步情況下訊息在客戶端緩衝區還未傳送,客戶端就宕機

9、block.on.buffer.full = true

consumer:

1、enable.auto.commit=false  關閉自動提交位移

同一分割槽訊息亂序:

假設a,b兩條訊息,a先發送後由於傳送失敗重試,這時順序就會在b的訊息後面,可以設定max.in.flight.requests.per.connection=1來避免

max.in.flight.requests.per.connection:限制客戶端在單個連線上能夠傳送的未響應請求的個數。設定此值是1表示kafka broker在響應請求之前client不能再向同一個broker傳送請求,但吞吐量會下降

總結2:

生產者如何保證資料的不丟失

kafka的ack機制:

在kafka傳送資料的時候,每次傳送訊息都會有一個確認反饋機制,確保訊息正常的能夠被收到。

同步模式:

ack機制能夠保證資料的不丟失,如果ack設定為0,風險很大,一般不建議設定為0

producer.type=sync

request.required.acks=1  //只leader

# 當所有的follower都同步訊息成功後傳送ack

request.required.acks=-1

非同步模式:

通過buffer來進行控制資料的傳送,有兩個值來進行控制,時間閾值與訊息的數量閾值,如果buffer滿了資料還沒有傳送出去,如果設定的是立即清理模式,風險很大,一定要設定為阻塞模式。

結論:

生產者如何保證資料的不丟失

producer有丟資料的可能,但是可以通過配置保證訊息的不丟失。

producer.type=async  非同步模式

request.required.acks=1  需要ack校驗機制

queue.buffering.max.ms=5000  佇列最大快取時間大一些5秒

queue.buffering.max.messages=10000  佇列最大快取訊息數量大些10000

queue.enqueue.timeout.ms = -1  進入佇列超時時間設定為永不超時

batch.num.messages=200 每一批處理訊息的數量設定小些200

消費者如何保證資料的不丟失

設定不自動提交,通過offset  commit  來保證資料的不丟失,kafka自己記錄了每次消費的offset數值,下次繼續消費的時候,接著上次的offset進行消費即可。

叢集監控工具

ganglia