Flink中接收端反壓以及Credit機制 (原始碼分析)
先上一張圖整體瞭解Flink中的反壓
可以看到每個task都會有自己對應的IG(inputgate)對接上游傳送過來的資料和RS(resultPatation)對接往下游傳送資料, 整個反壓機制通過inputgate,resultPatation公用一個一定大小的memorySegmentPool來實現(Flink中memorySegment作為記憶體使用的抽象,類比bytebuffer), 公用一個pool當接收上游資料時Decoder,往下游傳送資料時Encoder,都會向pool中請求記憶體memorySegment 。因為是公共pool,也就是說執行時,當接受的資料佔用的記憶體多了,往下游傳送的資料就少了,這樣是個什麼樣的情況呢?
比如說你sink端堵塞了,背壓了寫不進去,那這個task的resultPatation無法傳送資料了,也就無法釋放memorySegment了,相應的用於接收資料的memorySegment就會越來越少,直到接收資料端拿不到memorySegment了,也就無法接收上游資料了,既然這個task無法接收資料了,自然引起這個task的上一個task資料傳送端無法傳送,那上一個task又反壓了,所以這個反壓從發生反壓的地方,依次的往上游擴散直到source,這個就是flink的天然反壓。
從原始碼來看一下flink是如何實現的
來到資料接收的地方StreamInputProcessor.java中processInput()方法中
這裡通過通過handler的getNextNonBlocked()方法獲取到了bufferOrEvent後面就會將這個bufferOrEvent解析成record資料然後使用使用者的程式碼處理了
其實這裡的handler分為兩種
- BarrierBuffer
- BarrierTracker
區別主要是barrierbuffer實現了barrier對齊的資料快取,用於實現一次語義,這裡以後隨緣更新到容錯機制的時候講
來看一下getNextNonBlocked()方法
這個看到了通過會通過上游inputGate獲取資料,具體看一下getNextBufferOrEvent()其中有兩個比較重要的呼叫
先看requestPartitions()
先遍歷了所有的inputchannel然後呼叫了requestSubpartition()在其中
先看一下1處,這裡返回了一個Netty的Client來看一下createPartitionRequestClient是怎麼建立的
可以看到原始碼的描述,這裡其實就是建立與上游傳送資料端的tcp連線的client端,用來接收上游資料的
接著
這裡如果已經建立TCP連線就直接拿,與上游還沒有建立tcp連線的話就會先初始化Client端,通過這個connect()方法
來看一下第一次是如何初始化連線的
看到這個應該熟悉Netty的同學一眼就瞭解了,在1處就是Client的具體邏輯了,然後與上游埠建立連線
來看一下具體的Client端具體的邏輯,這裡最好對netty有一定的認識
- 1處是一個用於Encoder 的ChannelOutboundHandler常規的編碼器沒有什麼好說的
-
2處是用於Decoder的ChannelinboundHandler常規的解碼器沒有什麼好說的
- 3處 這裡分為兩種Handler,區別主要是在notifyCreditAvailable()方法
PartitionRequestClientHandler: 不帶信任機制的
CreditBasedPartitionRequestClientHandler:帶credit信任機制的
這裡取出了所有的帶有信任的上游inputChannel並且向其響應傳送了一個Credit物件
那帶Credit機制的handler何時觸發userEventTriggered()來觸發向上遊傳送Credit呢?
先不慌,先來看下client接收到資料後做了什麼,看下Nettyclient端的channelRead()方法(這裡只看credit機制的)
decodeMsg()方法中
decodeBufferOrEvent()方法
在沒有Credit機制的PartitionRequestClientHandler中
requestBuffer()方法就是請求memorySegmentPool中的memorySegment
這裡不能確保能獲取到,所以會用一個while(true)一直掛著
在Credit機制的CreditBasedPartitionRequestClientHandler中
請求requestBuffer()方法就是請求memorySegmentPool中的memorySegment因為信任機制在請求前就已經保證有足夠的memorySegment所以不會請求不到,這裡請求不到直接就拋異常了
然後OnBuffer( )方法
1處將將這個buffer加入到了這個receivedBuffers的ArrayDeque中,這裡要注意receivedBuffers,這個queue後面會用到(後面處理資料就是迴圈的從這個queue中poll拉資料出來)
這裡還要注意onBuffer方法還傳入了backlog引數,這裡是一個積壓的資料量
接著會根據積壓的資料量
當可用的buffer數 <(擠壓的資料量 + 已經分配給信任Credit的buffer量) 時,就會向Pool中繼續請求buffer,這裡請求不到也會一直while形成柱塞反壓
然後通過notifyCreditAvailable()方法傳送Credit,具體來看一下
可用看到這裡就觸發了前面說到的向上遊傳送Credit的方法了
到這裡,Nettyclient端的初始化以及Netty的處理邏輯就講完了
現在回到最最開始的地方
requestPartition()那裡建立nettyclient後
currentChannel.getNextBuffer()方法中
前面我們說到的NettyClient端channelRead讀取資料後會把資料放到一個recivedBuffers的queue中,這裡就是去那個queue中取資料然後返回到我們的 資料接收的地方StreamInputProcessor.java中processInput()方法中的得到上游資料以後,就是開始執行我們使用者的程式碼了呼叫processElement方法了。
然後while(true)開始了下一輪拉取資料然後處理的