1. 程式人生 > 其它 >Flink sql 之 TopN 與 StreamPhysicalRankRule (原始碼解析)

Flink sql 之 TopN 與 StreamPhysicalRankRule (原始碼解析)

基於flink1.14的原始碼做解析

公司內有很多業務方都在使用我們Flink sql平臺做TopN的計算,今天同事突然問到我,Flink sql 是怎麼實現topN的 ?

蒙圈了,這塊原始碼沒看過啊 ,業務要問起來怎麼辦,趕快開啟原始碼補一下

拿到這個問題先冷靜分析一下範圍

首先肯定屬於Flink sql模組,原始碼裡面肯定是在flink-table-planner包裡面,接著topN那不就是ROW_NUMBER嘛,是個函式呀

既然如此那就從flink原始碼的系統函式作為線索開始找起來,來到org.apache.calcite.sql.fun.SqlStdOperatorTable類

果然找到了,那calcite的某個rule肯定有個地方判斷了它,繼續查呼叫鏈

不出所料,FlinkLogicalRankRuleBase這個calcite的rule裡面果然根據這個function的型別來確定rank的型別了

看下這個rule的匹配條件

這裡也好理解,overAgg的時候會判斷這個rank以及對應的型別

這是隻是做了一下簡單的提取了rank的欄位啊,提取謂語啊,提取表示式啊這一些拿資訊的操作

然後直接生成新的relNode叫FlinkLogicalRank通過transformTo直接返回了這個等價節點

既然是relNode那肯定又會有calcite的rule去處理它,來找一找

批處理的就不管了,從名字就可以看出來我們要找的類了

看個不帶window的吧

返回StreamPhysicalRank

這個類是一個FlinkPhysicalRel是可以轉換成execNode的

返回的這個StreamExecRank就是可以轉換成具體的Flink的運算元了,具體邏輯就在裡面了

接下來看下row_number的具體邏輯,找到方法translateToPlanInternal

根據策略主要分為三種類型

AppendFastStrategy (輸入僅包含插入時)

RetractStrategy (輸入包含update和delete)

UpdateFastStrategy (輸入不應包含刪除且輸入有給定的primaryKeys且按欄位排序時)

來看個retractStrategy的吧

先通過sort的欄位獲取一個用於排序RowData的比較器ComparableRecordComparator

根據比較器建立RetractableTopNFunction

這個類還有兩個主要的狀態資料結構

dataState這個map用來存放當key相同的所有資料會放在同一個list裡面

treeMap這個可排序的map就是通過上面我們sql裡面定義的sort by 來排序資料的,Long是指這個相同的key有多少個record

!!!!!!!!!!! 那就是用java的treeMap排序唄

繼續往下看

主邏輯就是這個了

每進入一條資料,會根據這條資料的型別劃分

當資料是Insert , UPDATE_AFTER型別是會走emitRecordsWithRowNumber()方法

當資料是UPDATE_BEFORE,DELETE型別會走retractRecordWithRowNumber ()方法

來看下具體邏輯先看INSERT的

遍歷treeMap

解讀一下,當資料是insert資料的時候

INSERT資料會先放到treeMap裡面去,delete則不會

按順序遍歷treeMap

當遍歷過程中發現遍歷的key與當前資料的key相同時,和當前資料key相同的所有資料資料(dataState中的LIST),全部撤回並且更新他們的rowNumber+1

繼續遍歷treeMap

之後的資料全部撤回UpdateBefore,並且向下遊傳送UpdateAfter使rowNumber+1,遍歷直到已經到第TopN個數據迴圈結束

當資料是DELETE型別的時候,會和Insert反過來,當前key之後的資料全部撤回,然後rowNumber-1

整個處理流程差不多就結束了,可以看到rowNumber當N較大且排序變化頻繁的時候,效能消耗還是非常大的,極端情況下游的資料會翻很多倍

這個還需要注意在其他兩個策略中還有一個引數,table.exec.topn.cache-size

影響下面這個本地lruCache的大小

調大可以減少狀態的訪問,可以按需要新增