1. 程式人生 > >[原始碼解析] Flink UDAF 背後做了什麼

[原始碼解析] Flink UDAF 背後做了什麼

# [原始碼解析] Flink UDAF 背後做了什麼 [ToC] ## 0x00 摘要 本文涉及到Flink SQL UDAF,Window 狀態管理等部分,希望能起到拋磚引玉的作用,讓大家可以藉此深入瞭解這個領域。 ## 0x01 概念 ### 1.1 概念 大家知道,Flink的自定義聚合函式(UDAF)可以將多條記錄聚合成1條記錄,這功能是通過accumulate方法來完成的,官方參考指出: > 在系統執行過程中,底層runtime程式碼會把歷史狀態accumulator,和您指定的上游資料(支援任意數量,任意型別的資料)作為引數,一起傳送給accumulate計算。 但是實時計算還有一些特殊的場景,在此場景下,還需要提供merge方法才能完成。 > 在實時計算中一些場景需要merge,例如session window。 由於實時計算具有out of order的特性,後輸入的資料有可能位於2個原本分開的session中間,這樣就把2個session合為1個session。此時,需要使用merge方法把多個accumulator合為1個accumulator。 ### 1.2 疑問 之前因為沒親身操作,所以一直忽略merge的特殊性。最近無意中看到了一個UDAF的實現,突然覺得有一個地方很奇怪,即 accumulate 和 merge 這兩個函式不應該定義在一個類中。因為這是兩個完全不同的處理方法。應該定義在兩個不同的類中。 比如用UDAF做word count,則: - accumulate 是在一個task中累積數字,其實就相當於 map; - merge 是把很多task的結果再次累積起來,就相當於 reduce; 然後又想出了一個問題:Flink是如何管理 UDAF的accumulator?其狀態存在哪裡? 看起來應該是Flink在背後做了一些黑魔法,把這兩個函式從一個類中拆分了。為了驗證我們的推測,讓我們從原始碼入手來看看這些問題: - Flink SQL轉換/執行計劃生成階段,如何處理在 "同一個類中" 的不同型別功能函式 accumulate 和 merge? - Flink runtime 如何處理 merge? - Flink runtime 如何處理 UDAF的accumulator的歷史狀態? ### 1.3 UDAF示例程式碼 示例程式碼摘要如下 : ```java public class CountUdaf extends Aggregate