1. 程式人生 > 實用技巧 >Flink 反壓 淺入淺出

Flink 反壓 淺入淺出

前言

微信搜【Java3y】關注這個樸實無華的男人,點贊關注是對我最大的支援!

文字已收錄至我的GitHubhttps://github.com/ZhongFuCheng3y/3y,有300多篇原創文章,最近在連載面試和專案系列!

最近一直在遷移Flink相關的工程,期間也踩了些坑,checkpoint反壓是其中的一個。

敖丙太菜了,Flink都不會,只能我自己來了。看敖丙只能圖一樂,學技術還是得看三歪

平時敖丙黑我都沒啥水平,拿點簡單的東西來就說我不會。我是敖丙的頭號黑粉

今天來分享一下 Flinkcheckpoint機制和背壓原理,我相信通過這篇文章,大家在玩Flink的時候可以更加深刻地瞭解Checkpoint

是怎麼實現的,並且在設定相關引數以及使用的時候可以更加地得心應手。

上一篇已經寫過Flink的入門教程了,如果還不瞭解Flink的同學可以先去看看:《Flink入門教程

前排提醒,本文基於Flink 1.7

淺入淺出學習Flink的背壓知識》

開胃菜

在講解FlinkcheckPoint背壓機制之前,我們先來看下checkpoint背壓的相關基礎,有助於後面的理解。

作為使用者,我們寫好Flink的程式,上管理平臺提交,Flink就跑起來了(只要程式程式碼沒有問題),細節對使用者都是遮蔽的。

實際上大致的流程是這樣的:

  1. Flink會根據我們所寫程式碼,會生成一個StreamGraph
    的圖出來,來代表我們所寫程式的拓撲結構。
  2. 然後在提交的之前會將StreamGraph這個圖優化一把(可以合併的任務進行合併),變成JobGraph
  3. JobGraph提交給JobManager
  4. JobManager收到之後JobGraph之後會根據JobGraph生成ExecutionGraphExecutionGraphJobGraph 的並行化版本)
  5. TaskManager接收到任務之後會將ExecutionGraph生成為真正的物理執行圖

可以看到物理執行圖真正執行在TaskManagerTransformSink之間都會有ResultPartitionInputGate這倆個元件,ResultPartition

用來發送資料,而InputGate用來接收資料。

遮蔽掉這些Graph,可以發現Flink的架構是:Client->JobManager->TaskManager

從名字就可以看出,JobManager是幹「管理」,而TaskManager是真正幹活的。回到我們今天的主題,checkpoint就是由JobManager發出。

Flink本身就是有狀態的,Flink可以讓你選擇執行過程中的資料儲存在哪裡,目前有三個地方,在Flink的角度稱作State Backends

  • MemoryStateBackend(記憶體)
  • FsStateBackend(檔案系統,一般是HSFS)
  • RocksDBStateBackend(RocksDB資料庫)

同樣的,checkpoint資訊也是儲存在State Backends

耗子屎

最近在Storm遷移Flink的時候遇到個問題,我來簡單描述一下背景。

我們從各個資料來源從清洗出資料,藉助Flink清洗,組裝成一個寬模型,最後交由kylin做近實時資料統計和展示,供運營實時檢視。

遷移的過程中,發現訂單的topic消費延遲了好久,初步懷疑是因為訂單上游的併發度不夠所影響的,所以調整了兩端的並行度重新發布一把。

釋出的過程中,系統起來以後,再去看topic 消費延遲的監控,就懵逼了。什麼?怎麼這麼久了啊?絲毫沒有降下去的意思。

這時候只能找組內的大神去尋求幫忙了,他排查一番後表示:這checkpoint一直沒做上,都堵住了,重新發布的時候只會在上一次checkpoint開始,由於checkpoint長時間沒完成掉,所以重新發布資料量會很大。這沒啥好辦法了,只能在這個堵住的環節下扔掉吧,估計是業務邏輯出了問題。

畫外音:接收到訂單的資料,會去溯源點選,判斷該訂單從哪個業務來,經過了哪些的業務,最終是哪塊業務致使該訂單成交。

畫外音:外部真正使用時,依賴「訂單結果HBase」資料

我們認為點選的資料有可能會比訂單的資料處理要慢一會,所以找不到的資料會間隔一段時間輪詢,又因為Flink提供State「狀態」 和checkpoint機制,我們把找不到的資料放入ListState按一定的時間輪詢就好了(即便系統由於重啟或其他原因掛了,也不會把資料丟了)。

理論上只要沒問題,這套方案是可行的。但現在結果告訴我們:訂單資料報來了以後,一小批量資料一直在「訂單結果HBase」沒找到資料,就放置到ListState上,然後來一條資料就去遍歷ListState。導致的後果就是:

  • 資料消費不過來,形成反壓
  • checkpoint一直沒成功

當時處理的方式就是把ListState清空掉,暫時丟掉這一部分的資料,讓資料追上進度。

後來排查後發現是上游在訊息報欄位上做了「手腳」,解析失敗導致點選丟失,造成這一連鎖的後果。

排查問題的關鍵是理解Flink反壓checkpoint的原理是什麼樣的,下面我來講述一下。

反壓

反壓backpressure是流式計算中很常見的問題。它意味著資料管道中某個節點成為瓶頸,處理速率跟不上「上游」傳送資料的速率,上游需要進行限速

上面的圖代表了是反壓極簡的狀態,說白了就是:下游處理不過來了,上游得慢點,要堵了!

最令人好奇的是:“下游是怎麼通知上游要發慢點的呢?

在前面Flink的基礎知識講解,我們可以看到ResultPartition用來發送資料,InputGate用來接收資料。

Flink在一個TaskManager內部讀寫資料的時候,會有一個BufferPool(緩衝池)供該TaskManager讀寫使用(一個TaskManager共用一個BufferPool),每個讀寫ResultPartition/InputGate都會去申請自己的LocalBuffer

以上圖為例,假設下游處理不過來,那InputGateLocalBuffer是不是被填滿了?填滿了以後,ResultPartition是不是沒辦法往InputGate發了?而ResultPartition沒法發的話,它自己本身的LocalBuffer 也遲早被填滿,那是不是依照這個邏輯,一直到Source就不會拉資料了...

這個過程就猶如InputGate/ResultPartition都開了自己的有界阻塞佇列,反正“我”就只能處理這麼多,往我這裡發,我滿了就堵住唄,形成連鎖反應一直堵到源頭上...

上面是隻有一個TaskManager的情況下的反壓,那多個TaskManager呢?(畢竟我們很多時候都是有多個TaskManager在為我們工作的)

我們再看回Flink通訊的總體資料流向架構圖:

從圖上可以清洗地發現:遠端通訊用的Netty,底層是TCP Socket來實現的。

所以,從巨集觀的角度看,多個TaskManager只不過多了兩個Buffer(緩衝區)。

按照上面的思路,只要InputGateLocalBuffer被打滿,Netty Buffer也遲早被打滿,而Socket Buffer同樣遲早也會被打滿(TCP 本身就帶有流量控制),再反饋到ResultPartition上,資料又又又發不出去了...導致整條資料鏈路都存在反壓的現象。

現在問題又來了,一個TaskManagertask可是有很多的,它們都共用一個TCP Buffer/Buffer Pool,那隻要其中一個task的鏈路存在問題,那不導致整個TaskManager跟著遭殃?

Flink 1.5版本之前,確實會有這個問題。而在Flink 1.5版本之後則引入了credit機制。

從上面我們看到的Flink所實現的反壓,巨集觀上就是直接依賴各個Buffer是否滿了,如果滿了則無法寫入/讀取導致連鎖反應,直至Source端。

credit機制,實際上可以簡單理解為以「更細粒度」去做流量控制:每次InputGate會告訴ResultPartition自己還有多少的空閒量可以接收,讓ResultPartition看著發。如果InputGate告訴ResultPartition已經沒有空閒量了,那ResultPartition就不發了。

那實際上是怎麼實現的呢?擼原始碼!

在擼原始碼之前,我們再來看看下面物理執行圖:實際上InPutGate下是InputChannelResultPartition下是ResultSubpartition(這些在原始碼中都有體現)。

InputGate(接收端處理反壓)

我們先從接收端看起吧。Flink接收資料的方法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput

隨後定位到處理反壓的邏輯:

finalBufferOrEventbufferOrEvent=barrierHandler.getNextNonBlocked();

進去getNextNonBlocked()方法看(選擇的是BarrierBuffer實現):

我們就直接看null的情況,看下從初始化階段開始是怎麼搞的,進去getNextBufferOrEvent()

進去方法裡面看到兩個比較重要的呼叫:

requestPartitions();

result=currentChannel.getNextBuffer();

先從requestPartitions()看起吧,發現裡邊套了一層(從InputChannel下獲取到subPartition):

於是再進requestSubpartition()(看RemoteInputChannel的實現吧)

在這裡看起來就是建立Client端,然後接收上游傳送過來的資料:

先看看client端的建立姿勢吧,進createPartitionRequestClient()方法看看(我們看Netty的實現)。

點了兩層,我們會進到createPartitionRequestClient()方法,看原始碼註釋就可以清晰發現,這會建立TCP連線並且創建出Client供我們使用

我們還是看null的情況,於是定位到這裡:

進去connect()方法看看:

我們就看看具體生成邏輯的實現吧,所以進到getClientChannelHandlers

意外發現原始碼還有個通訊簡要流程圖給我們看(哈哈哈):

好了,來看看getClientChannelHandlers方法吧,這個方法不長,主要判斷了下要生成的client是否開啟creditBased機制:

publicChannelHandler[]getClientChannelHandlers(){
NetworkClientHandlernetworkClientHandler=
creditBasedEnabled?newCreditBasedPartitionRequestClientHandler():
newPartitionRequestClientHandler();
returnnewChannelHandler[]{
messageEncoder,
newNettyMessage.NettyMessageDecoder(!creditBasedEnabled),
networkClientHandler};
}

於是我們的networkClientHandler例項是CreditBasedPartitionRequestClientHandler

到這裡,我們暫且就認為Client端已經生成完了,再退回去getNextBufferOrEvent()這個方法,requestPartitions()方法是生成接收資料的Client端,具體的例項是CreditBasedPartitionRequestClientHandler

下面我們進getNextBuffer()看看接收資料具體是怎麼處理的:

拿到資料後,就會開始執行我們使用者的程式碼了呼叫process方法了(這裡我們先不看了)。還是回到反壓的邏輯上,我們好像還沒看到反壓的邏輯在哪裡。重點就是receivedBuffers這裡,是誰塞進去的呢?

於是我們回看到Client具體的例項CreditBasedPartitionRequestClientHandler,開啟方法列表一看,感覺就是ChannelRead()沒錯了:

@Override
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
try{
decodeMsg(msg);
}catch(Throwablet){
notifyAllChannelsOfErrorAndClose(t);
}
}

跟著decodeMsg繼續往下走吧:

繼續下到decodeBufferOrEvent()

繼續下到onBuffer

所以我們往onSenderBacklog上看看:

最後呼叫notifyCreditAvailableCredit往上游傳送:

publicvoidnotifyCreditAvailable(finalRemoteInputChannelinputChannel){
ctx.executor().execute(()->ctx.pipeline().fireUserEventTriggered(inputChannel));
}

最後再畫張圖來理解一把(關鍵鏈路):

ResultPartition(傳送端處理反壓)

傳送端我們從org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager開始看起

於是我們進去看fromConfiguration()

進去start()去看,隨後進入connectionManager.start()(還是看Netty的例項):

image-20201206141859451

進去看service.init()方法做了什麼(又看到熟悉的身影):

好了,我們再進去getServerChannelHandlers()看看吧:

有了上面經驗的我們,直接進去看看它的方法,沒錯,又是channnelRead,只是這次是channelRead0

ok,我們進去addCredit()看看:

reader.addCredit(credit)只是更新了下數量

publicvoidaddCredit(intcreditDeltas){
numCreditsAvailable+=creditDeltas;
}

重點我們看下enqueueAvailableReader() 方法,而enqueueAvailableReader()的重點就是判斷Credit是否足夠傳送

isAvailable的實現也很簡單,就是判斷Credit是否大於0且有真實資料可發

writeAndFlushNextMessageIfPossible實際上就是往下游傳送資料:

拿資料的時候會判斷Credit是否足夠,不足夠拋異常:

再畫張圖來簡單理解一下:

背壓總結

「下游」的處理速度跟不上「上游」的傳送速度,從而降低了處理速度,看似是很美好的(畢竟看起來就是幫助我們限流了)。

但在Flink裡,背壓再加上Checkponit機制,很有可能導致State狀態一直變大,拖慢完成checkpoint速度甚至超時失敗。

checkpoint處理速度延遲時,會加劇背壓的情況(很可能大多數時間都在處理checkpoint了)。

checkpoint做不上時,意味著重啟Flink應用就會從上一次完成checkpoint重新執行(...

舉個我真實遇到的例子:

我有一個Flink任務,我只給了它一臺TaskManager去執行任務,在更新DB的時候發現會有併發的問題。

只有一臺TaskManager定位問題很簡單,稍微定位了下判斷:我更新DB的Sink 並行度調高了。

如果Sink的並行度設定為1,那肯定沒有併發的問題,但這樣處理起來太慢了。

於是我就在Sink之前根據userId進行keyBy(相同的userId都由同一個Thread處理,那這樣就沒併發的問題了)

看似很美好,但userId存在熱點資料的問題,導致下游資料處理形成反壓。原本一次checkpoint執行只需要30~40ms反壓後一次checkpoint需要2min+

checkpoint執行間隔相對頻繁(6s/次),執行時間2min+,最終導致資料一直處理不過來,整條鏈路的消費速度從原來的3000qps到背壓後的300qps,一直堵住(程式沒問題,就是處理速度大大下降,影響到資料的最終產出)。

最後

本來想著這篇文章把反壓和Checkpoint都一起寫了,但寫著寫著發現有點長了,那checkpoint開下一篇吧。

相信我,只要你用到Flink,遲早會遇到這種問題的,現在可能有的同學還沒看懂,沒關係,先點個贊