Flink中傳送端反壓以及Credit機制(原始碼分析)
上一篇《Flink接收端反壓機制》說到因為Flink每個Task的接收端和傳送端是共享一個bufferPool的,形成了天然的反壓機制,當Task接收資料的時候,接收端會根據積壓的資料量以及可用的buffer數量(可用的memorySegment數)來決定是否向上遊傳送Credit(簡而言之就是當我還有空間的時候,我向上游也就是上一個Task的傳送端傳送一個ack訊息,表明我還有空間你可以傳送資料過來,如果下游沒有給你Credit就證明下游已經堵了,沒有空間了也就不能繼續往下游傳送了)
現在從原始碼來看一下Task的資料傳送端,也就是Netty的Server端的實現
先看Task初始化的時候TaskManagerRunner.java中startTaskManager()方法中
這個connectionManager其實分為兩種,Netty,local一看就知道netty這種肯定是對應需要通過網路傳輸,本地模式這裡就不講了
這個地方看到Flink的client和server都初始化了,需要注意的是其實這個地方client端只是初始化了一些配置,並沒有呼叫bind()方法啟動起來,這裡看過上一篇文章的同學就會知道,client只有當第一次需要拉取上游subpatition資料的時候才會啟動起來也就是bind(),
而server端在這裡也就是task啟動的時候就啟動起來了,繼續看server端如何啟動的server.init()方法
init方法中,這裡可以看到,這是Flink1.6以前只有基於netty的tcp網路層反壓,這裡是通過bootstrap的兩個引數
ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK 大小為兩倍的memorySegment大小
ChannelOption.WRITE_BUFFER_LOW_WATER_MARK 大小為memorySegment + 1
接著
1處2處常規的Netty定長編解碼器,沒有什麼好說的
看看3處,4這裡先不講後面會提到
看到3是一個inboundHandler,反壓機制時他的用處是用來接收來自下游響應的Credit,來看他的ChannelRead0方法
當接收到的訊息是一個Credit信任的時候
先是
增加了這個reader的可用的Credit可用數
然後
其實瞭解了接收端的反壓,傳送端接收到了下游的credit,那傳送資料的時候肯定有一個地方會先判斷是否有可用的Credit才決定是否往下發資料
其實就是這個帶星號的地方判斷,然後下面就是常規的從queue中拉取reader往netty下游writeAndFlash()資料了,沒什麼好講的
來看一下他判斷Credit是否滿足的地方
可以看到只有當
有資料且可用的Credit數量大於0
或者有資料且資料是一個事件而不是record的時候,才返回true往下游傳送
可以看到這個 enqueueAvailableReader()方法比較重要,裡面包含了判斷credit以及往後下游傳送資料的邏輯
那這個enqueueAvailableReader()方法除了會在接收到下游的Credit的時候觸發一次,還有哪會被觸發呢
既然是往下游傳送資料那我們task處理完資料以後應該也會呼叫這個方法
於是來看一下Task傳送資料,以前的文章講過,這裡就不贅述了,直接看到RecordWriterOutput的emit()
會先將record寫入到這個Serializer裡面去
然後copyFromSerializerToTargetChannel()方法中
先去localBufferPool中請求buffer,這裡就是反壓了
請求到buffer了以後
這個呼叫鏈有點長不全列出來了
最後
這個requestQueue其實是前面Netty初始化時具體邏輯中的4,是一個ChannelInboundHandlerAdapter
這個Inbound一開始我也很疑惑,這個Inbound沒有重寫他的channelRead()方法,那這個不就直接轉發資料了嗎,那他的作用是幹嘛的呢
繼續往下看
原來發送資料的時候會觸發這個inbound的eventTrigger
看下userEventTriggered()具體觸發了什麼
這個方法就很眼熟了,就是前面到接收到下游傳送過來的Credit時會觸發一次的方法,用來判斷是否有Credit以及通過netty往下游傳送資料
這裡在傳送資料的時候果然又觸發了,後面就是判斷是否有Credit滿足往下游傳送資料的條件,然後往下游傳送資料
也就是說
當接收到下游返回的Credit的時候會觸發一次,是否能往下游寫資料的判斷並拉queue資料寫資料
每次Task處理完資料以後emit,也會觸發一次判斷並拉queue資料寫資料
&n