kafka實戰 - 資料可靠性
概述
常見的儲存高可用方案的根本原理就是把資料複製到多個儲存裝置,通過資料冗餘的方式來實現資料的可靠性。比如同一份資料,一份在城市A,一份在城市B。如果城市A發生自然災害導致機房癱瘓,那麼業務就可以直接切到城市B進行服務,從而保障業務的高可用。但是這是理想情況,一旦資料被複制並且分開儲存,就涉及到了網路傳輸,即使在同機房,而且網路狀況良好的情況下,也會有10ms以上的延遲,而這種延遲導致各種各樣的問題。假設A和B是主備模式,B的資料是A的備份,A對外服務,如果A掛了,B直接替換掉A對外服務。假如A掛了,但是有一小部分資料沒來得及同步到B。這時候CAP的場景就出現了,是選擇一致性還是高可用?如果選擇高可用,直接讓B替換掉A成為master對外服務,當A恢復成為slave後,勢必需要截斷自己多餘的資料然後從B同步資料,那麼A掛掉之前寫入A但是沒有同步B的這些資料就算是丟了,導致資料不一致。如果選擇一致性,那麼B就不能對外服務,雖然資料一致了,但是高可用無法保證。
kafka的資料可靠性是通過副本機制實現的,同樣也會存在上述的問題。但是它提供了很多可配置的引數來幫使用者定製網路分割槽發生後的策略。我們的業務有個原則是不管發生什麼,資料一定不能丟。所以下面從資料一致性的角度來討論這些引數的配置,保證資料的可靠性。
資料的可靠性需要producer和broker二者的配置一起來保證。
producer端
producer負責給kafka生產資料,一共需要注意以下2個問題:
<1> 現在的大部分kafka的producer client都提供了非同步生產資料的介面。一方面使用非同步介面可以同時給多個broker生產資料,另一方面非同步介面在把資料寫入本地快取,還沒有開始傳送就會直接返回。所以非同步介面比同步介面要快的多,大部分場景下(比如php的client)produce方法預設使用的也是非同步介面。producer的資料一定是從上游通過某種方式傳過來的,但是呼叫produce方法後,資料其實並沒有發出去,而是快取在記憶體中。如果在資料發出去之前,producer這個程序掛掉,那麼沒有發出去的資料就會丟失,但是上游的資料已經被消費掉了。所以上游和producer之間,一定需要一種確認機制,確認producer這邊成功後,上游才把資料刪掉。不能呼叫完produce發現沒有返回錯誤就繼續去上游消費或者向上遊報告produce成功。因為是非同步介面,produce方法返回的只是是否成功寫入記憶體,而不是傳送結果。這種傳送結果一般是通過回撥函式的方式進行返回的,例如php就是在初始化producer例項時,呼叫setDrMsgCb()方法設定回撥的。在回撥函式中,你可以獲得傳送失敗的錯誤碼,錯誤字串,和partition,topic等有用的資訊。通過這些資訊即可判斷是否傳送成功。
<2> request.required.acks引數。這個引數會和要生產的資料一塊傳送給broker。告訴broker寫入多少個ISR副本後返回。一共可以設定3種類型的值:
a. 0,broker不會給producer返回response,producer也不會去等,這種情況下生產資料的速度是最快的。但是隻適用於對資料是否丟失根本不在意的場景下。網路丟包,leader partition 掛掉,都會導致資料丟失。
b. 1, broker會保證寫入leader副本之後再返回response(producer端的回撥函式會響應這個response)。這種方式在寫入leader副本,但是還沒有來得及同步到其他副本之前,leader掛了,就會丟失資料。
c. all或者-1。broker會保證把資料寫入所有的ISR副本後返回。這種方式是資料最高可靠性的設定。只要不是這個partition的所有副本所在的broker一塊死掉,資料就不會丟失。但是同時生產效率也是最慢的。比如有3個副本,ABC,A為leader,A和B的延時是50ms, A和C的延時是100ms, 那麼producer的耗時最起碼是 50 + 100 + 3臺機器資料寫入時間 + A和client的通訊時間。這個具體的效能下降程度和場景有關。筆者做的測試是,一臺機器,起3個broker,每個topic有2個副本,localhost的producer傳送的資料大小隨機從1M到150M。在acks=1時,每傳送1MB位25ms, acks=-1時,沒傳送1MB需要的時間為30ms,效能下降17%左右。
一般情況下,如果想要資料有比較好的保證,最好是設定成-1。另外注意一點,這個引數是topic-level,應該設定在topic-level-conf中,在php的client中親測設定到global-level-conf中沒有生效,也沒有報錯。
broker端
<1> unclean.leader.election.enable引數
kafka的副本機制的原理就是一個partition有多個副本,leader副本負責接受client的讀寫,其他follower副本負責從leader副本同步資料。和leader副本相差的資料在一定閾值內的副本叫做ISR副本(包括leader)。ISR是動態的,如果某個follower掛了,長時間不從leader拉取資料,就會被踢出ISR。而如果某個follower掛掉之後恢復,很快又追上leader的資料,那麼又會被放入ISR中。如果leader掛了,選一個follower作為新的leader。unclean.leader.election.enable引數控制的是非ISR副本是否可以被選為leader。
想把這種非ISR的副本被選為leader造成的後果講清楚,會涉及到很多其他的概念,比如HW和LEO。網上已經有不少好文解釋了這些概念:https://www.cnblogs.com/huxi2b/p/7453543.html,大家可以參考一下。
kafka會保證只要有一個ISR還活著,commited的資料就不會丟失。什麼是commited的資料?簡單來說就是已經儲存在所有ISR中的資料。這些資料都是已經給producer明確返回生產成功response的資料。consumer也只能消費到commited的資料。所以如果leader掛了,選舉的新leader是ISR中的另一個副本,新的leader會把自己的LEO做位HW,LEO比新leader小的follower會從新的leader的LEO處開始同步資料,LEO比新leader高的,會把自己的日誌截斷到新leader的LEO處,然後開始同步。所以這種情況下,對producer來說,已經成功生產的資料沒有丟失。對於consumer來說,仍然不會消費到對producer來說沒有produce成功的資料。partition的failover看起來是透明的。
但是如果unclean.leader.election.enable為True,允許選舉非ISR中的副本為新leader,這時候就會發生commited的資料也會丟失的情況。對於producer來說,已經生產成功的資料就會丟失。consumer也可能因為傳送的需要消費的offset比新leader的LEO還要大,所以從頭開始消費(這在消費資料不能重複或者已經儲存了很多資料的情況下是非常嚴重的事情,對於實時性的業務,不但會影響資料的實時性,而且可能把下游寫爆)。這篇部落格講的很好,可以參考一下:https://blog.csdn.net/u013256816/article/details/80790185。
<2> min.insync.replicas
這個引數是和producer端的request.required.acks配合使用的,只有把request.required.acks設定為-1或者all時,這個引數才會生效。這個引數設定的是broker在給producer返回response前,最少寫入ISR的個數。想向一種場景,某個partition,leader還在,但是其他副本全掛了。設定acks為-1,那麼broker把資料寫入所有ISR(僅僅是leader)之後,返回。副本掛的就剩leader一個了,我們不知道。資料只寫入leader,並沒有複製到其他副本,我們也不知道。leader一掛,資料就完蛋。所以把這個引數配置成和partition個數一樣或者比partition個數稍小一點。這樣當broker無法寫入這麼多ISR之後,會給producer返回特殊的response,producer也會通過回撥的錯誤碼,或者丟擲NotEnoughReplicas讓開發人員指導情況,該修叢集修叢集,該重新發送重新發送。這個引數可以設定為topic-level的,因為不同的topic副本個數可能不一樣,很難使用全域性的min.insync.replicas滿足所有topic。
其他參考資料:
kafka controller相關協議: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
kafka replica相關協議: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication
kafka各種failover場景:https://www.cnblogs.com/fxjwind/p/4972244.html