Flink中TaskManager端執行使用者邏輯過程(原始碼分析)
TaskManager接收到來自JobManager的jobGraph轉換得到的TDD物件,啟動了任務,在StreamInputProcessor類的processInput()方法中
通過一個while(true)中不停的拉取上游的資料,然後呼叫streamOperator.processElement(record)呼叫使用者實現的方法去處理資料拉取的資料
首先先來看下這個operator物件
然後看看OneInputStreamOperator類的UML
這裡所有的實現類沒有全部列出,只列了一些代表
看到這裡,寫過Flink的streamAPI的同學,肯定感覺到很熟悉!!!!!!
這裡!不就是我們常寫flink程式碼的那些運算元嘛
對沒有錯,我們程式中實現的那些運算元邏輯,最後都會被封裝成一個OneInputStreamOperator,這裡具體看一個最熟悉的Fliter
來看一下StreamFilter的processElement方法
!!!這裡傳入一個數據後,這個userFunction呼叫了filter方法並且把資料放進去了
當返回true通過這個output.collect傳送出去了
這不就對應了我們使用者自己實現的filter運算元嘛,沒錯這個方法其實就是客戶端的filter方法,這個userFunction包含了使用者實現filter運算元的邏輯
(!!!!!就是說這個processElement方法會呼叫使用者的邏輯)
(所以這個userFunction可以帶上client的方法實現,這對我們很重要,特別是對flink原始碼修改,為clientApi新增新功能方法,執行時可以通過這裡拿到)
繼續
來看看這個output.collect()方法
然後
看到這個,等等等等
我不是從這個processElement()方法進來的嗎,怎麼又開始調processElement()方法了
難道遞迴了? 不對不對
這裡operator不是上一個operator了,而是這個output物件的(這裡是chainOutPut)
看下這個output物件
看下UML類圖,也是隻列舉了重要的
先看chainingOutPut的屬性
發現了又出現了OneInputStreamOperator物件
看到這個實現類的名字!chain聯想起了什麼
Flink會將可以chain在一起的運算元在streamGraph轉換成jobGraph的時候根據條件chain在一起
一驚!
來分別看一下ChainingOutPut和RecordWriterOutput的collect()方法有什麼區別
在chain中
在RecordWriter中
這裡chain的ouput,又繼續呼叫了下一個operator的processElement方法,然後又在processElement方法中又呼叫output.collect( ),collect中又呼叫了下一個operator的processElement方法
整個過程就是個無限的迴圈,直到,某一個operator的ouput不為ChainingOutPut,當變為RecordWriterOutput時
上面看到RecordWriterOutput的processElement直接emit傳送出去了這個資料,再也沒有繼續呼叫processElement方法了
這裡也就對應了,flink中的責任鏈,chain在一起的運算元會一個接著一個執行,直到無法chain,就會往下游傳送emit了
來看一下UML類圖幫助理解
裡中有我,我中有你,一直相互呼叫直到無法chain,然後emit往下游傳送(這裡肯定就有傳送端的反壓邏輯,以後隨緣更新)
那這裡的迴圈呼叫理解了就會想,那如何確定第一個operator呼叫,然後進入整個呼叫鏈呢
回到TaskManager接收到JobManager的TDD以後初始化整個任務的時候
StreamTask.java中invoke方法中
先是初始化了一個OperatorChain,裡面其實就是一個數組StreamOperator
在他初始化的時候,其實就是為我們所有的streamOutputs設定了他的output以及會根據jobManager傳送過來的TDD(包含資訊)
設定成對應的ChainingOutPut還是RecordWriterOutput,chainOutput會設定他的的operator
然後獲取了getHeadOperator()其實就是獲取了他呼叫連中的第一個
然後在
將這個第一個operator關聯到了inputProcessor物件裡面
後面就簡單了在inputProcessor.processInput中就進入了while(true)迴圈拉取上游資料的邏輯
然後
在這裡呼叫的第一個processElement方法就是我們的那個headOperator
這樣整個呼叫責任鏈就開始從第一個Operator執行起來了
&n