Flink sql 之 join 與 StreamPhysicalJoinRule (原始碼解析)
原始碼分析基於flink1.14
Join是flink中最常用的操作之一,但是如果濫用的話會有很多的效能問題,瞭解一下Flink原始碼的實現原理是非常有必要的
本文的join主要是指flink sql的Regular join 也就是平時我們的雙流join中普通的full join ,left join,right join
先找到calcite的relNode轉換rule
會將邏輯節點logiceJoin轉換成flink的FlinkLogicalJoin
接著看下哪裡Rule會轉換這個FlinkLogicalJoin
這裡會將這種普通join也就是regularJoin給匹配上
條件是
不是這三種join,並且
也不能join表示式包含時間屬性
匹配上次rule以後,接著
返回了StreamPhysicalJoin這個StreamPhysicalRel是個物理節點
他的translateToExecNode方法會返回StreamExecJoin,這個類就是我們具體的邏輯了
來看一下
首先會根據會join的型別,確定兩個流那個需要輸出,如果是fulljoin兩個流都會輸出,left join就是左流需要outer,right join就是right流需要outer
之後建立了具體的Operator,來看下這個StreamingJoinOperator
先看一下這個類裡面兩個比較重要的狀態
可以看到,左右流都會儲存一個狀態
看下狀態包裝類的描述
總共就三,方法,分別是加入資料,撤回資料,獲取這個資料關聯上的所有資料
在open方法裡面會根據上面計算的左右流是否需要輸出來初始化這個兩個狀態
這裡狀態包裝類的建立,將根據資料型別分為三種
1. 流帶主鍵,且join條件包含了主鍵
這樣資料唯一,就只用ValueState來存
2. 流帶主鍵,但join條件沒有包含主鍵
這裡就用MapState來存了,每次根據主鍵更新
3. 流不帶主鍵
就用map,直接把record當key存了
接著看processElement方法
這裡詳細的程式碼就不列出來了太複雜了,想看的直接看這個類
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement()
梳理邏輯我們還是來看下虛擬碼
主要分為兩段
1. 如果是 +Insert / +Update 型別的資料
判斷輸入資料的流需不需要輸出
如果需要輸出
看下和另外一個流關聯的上不
關聯的上輸出+I[record+other]s
關聯不上輸出+I[record+null]
將資料加入狀態中
如果不需要輸出
將資料加入狀態中
如果與另外一個流的資料關聯上了
如果另外一個流要outer, 輸出+I[record+other]s
如果另外一個流不用輸出 ,輸出 +I/+U[record+other]s
1. 如果是 -Delete / -Update 型別的資料
狀態裡面先撤回這條資料
如果與另外流沒有匹配上,如果輸入資料的流需要輸出,則輸出-D[record+null]
如果與另外一條流匹配上了
當前流outer,傳送-D[record+other]s,如果是inner join傳送-D/-U[record+other]s
最後的最後
用兩個流join的key作為狀態的selecter來完成transform的構建就完成了
總結一下:
Flink會根據join的key作為狀態分流的selecter,根據表是否有主鍵,join條件是否包含主鍵,來建立對應的state資料結構,來優化狀態的讀寫
兩條流會根據join型別,來設定此流需不需要輸出outer
當資料進入,查詢另一側的流是否有資料可以關聯上,以及兩條流的outer型別,來確定向下遊傳送的撤回和新增的資料