1. 程式人生 > >Flink中TaskManager端執行使用者邏輯過程(原始碼分析)

Flink中TaskManager端執行使用者邏輯過程(原始碼分析)

TaskManager接收到來自JobManager的jobGraph轉換得到的TDD物件,啟動了任務,在StreamInputProcessor類的processInput()方法中

通過一個while(true)中不停的拉取上游的資料,然後呼叫streamOperator.processElement(record)呼叫使用者實現的方法去處理資料拉取的資料

首先先來看下這個operator物件

然後看看OneInputStreamOperator類的UML

這裡所有的實現類沒有全部列出,只列了一些代表

看到這裡,寫過Flink的streamAPI的同學,肯定感覺到很熟悉!!!!!!

這裡!不就是我們常寫flink程式碼的那些運算元嘛

對沒有錯,我們程式中實現的那些運算元邏輯,最後都會被封裝成一個OneInputStreamOperator,這裡具體看一個最熟悉的Fliter

來看一下StreamFilter的processElement方法

!!!這裡傳入一個數據後,這個userFunction呼叫了filter方法並且把資料放進去了

當返回true通過這個output.collect傳送出去了

這不就對應了我們使用者自己實現的filter運算元嘛,沒錯這個方法其實就是客戶端的filter方法,這個userFunction包含了使用者實現filter運算元的邏輯

(!!!!!就是說這個processElement方法會呼叫使用者的邏輯)

(所以這個userFunction可以帶上client的方法實現,這對我們很重要,特別是對flink原始碼修改,為clientApi新增新功能方法,執行時可以通過這裡拿到)

繼續

來看看這個output.collect()方法

然後

 

看到這個,等等等等

我不是從這個processElement()方法進來的嗎,怎麼又開始調processElement()方法了

難道遞迴了? 不對不對

這裡operator不是上一個operator了,而是這個output物件的(這裡是chainOutPut)

看下這個output物件

看下UML類圖,也是隻列舉了重要的

先看chainingOutPut的屬性

 

發現了又出現了OneInputStreamOperator物件

看到這個實現類的名字!chain聯想起了什麼

Flink會將可以chain在一起的運算元在streamGraph轉換成jobGraph的時候根據條件chain在一起

一驚!

來分別看一下ChainingOutPut和RecordWriterOutput的collect()方法有什麼區別

在chain中

 在RecordWriter中

這裡chain的ouput,又繼續呼叫了下一個operator的processElement方法,然後又在processElement方法中又呼叫output.collect( ),collect中又呼叫了下一個operator的processElement方法

整個過程就是個無限的迴圈,直到,某一個operator的ouput不為ChainingOutPut,當變為RecordWriterOutput時

上面看到RecordWriterOutput的processElement直接emit傳送出去了這個資料,再也沒有繼續呼叫processElement方法了

這裡也就對應了,flink中的責任鏈,chain在一起的運算元會一個接著一個執行,直到無法chain,就會往下游傳送emit了

來看一下UML類圖幫助理解

 

 裡中有我,我中有你,一直相互呼叫直到無法chain,然後emit往下游傳送(這裡肯定就有傳送端的反壓邏輯,以後隨緣更新)

那這裡的迴圈呼叫理解了就會想,那如何確定第一個operator呼叫,然後進入整個呼叫鏈呢

回到TaskManager接收到JobManager的TDD以後初始化整個任務的時候

StreamTask.java中invoke方法中

 先是初始化了一個OperatorChain,裡面其實就是一個數組StreamOperator

在他初始化的時候,其實就是為我們所有的streamOutputs設定了他的output以及會根據jobManager傳送過來的TDD(包含資訊)

設定成對應的ChainingOutPut還是RecordWriterOutput,chainOutput會設定他的的operator

然後獲取了getHeadOperator()其實就是獲取了他呼叫連中的第一個

然後在

 

將這個第一個operator關聯到了inputProcessor物件裡面

後面就簡單了在inputProcessor.processInput中就進入了while(true)迴圈拉取上游資料的邏輯

然後

在這裡呼叫的第一個processElement方法就是我們的那個headOperator

這樣整個呼叫責任鏈就開始從第一個Operator執行起來了

&n