flink 有狀態udf 引起血案一
場景
近期在做一個畫像的任務,sql實現的,當中有一個udf,會做非常多事情,包含將從redis讀出歷史值加權,並將中間結果和加權後的結果更新到redis。
大家都知道,flink 是能夠支援事件處理的。也就是能夠沒有時間的概念,那麼在聚合,join等操作的時候,flink內部會維護一個狀態,假如此時你也用redis維護了歷史狀態,也即是相似 result = currentState(flink)+lastState(redis)。且此時要針對計算的結果用where進行篩選.
SQL例如以下
CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;
CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95) as `result`
FROM
view_count
;
insert
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
where `result` <> '0.0'
GROUP BY
gid,
cid,
`result`;
業務分析
第一個sql檢視完畢的是首先分組,然後統計某一個欄位並乘以權重;
第二個sql檢視。udf :Get_Strength_Weaken完畢當前值和歷史值疊加工作,歷史值儲存在redis。同一時候將結果返回並更新redis,返回值作為result欄位。
第三個sql在輸出的時候,result欄位作為了where的條件和group by裡的欄位。
這時候生成的flink概圖例如以下:
觀察中間的結構圖能夠發現。Get_Strength_Weaken被呼叫兩次:
1. where條件。這個的生成是因為第三條sql
where `result` <> '0.0'
產生的執行計劃,是不是看起來非常懵逼。。
。
2. select裡面另一次呼叫Get_Strength_Weaken。這個非常明顯。
當然。能夠列印一下flink udf裡eval函式的呼叫細節日誌,非常easy發現反覆呼叫的問題。浪院長這個也是通過分析日誌。對照輸出結果來得出的論。
綜合上面分析和udf呼叫日誌,結論就是udf被呼叫了兩次。
對於這個flink的udf被多次呼叫引起的結果偏大。整整除錯了一下午。
因為上面分析能夠得出結論,flink將where條件下推了,where 條件推斷會先執行,而select裡後執行,那麼能夠調整SQL。例如以下:
CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;
CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
getResult(gid,cid) as `result`
FROM
view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95) as `result` <> '0.0'
;
insert into
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
GROUP BY
gid,
cid,
`result`;
那麼實際上。select裡的udf主要目的是取出來計算結果。那麼這個時候能夠寫個簡單的udf--getResult,僅僅讓他從redis獲取 where條件裡更新到redis裡的結果,因為該udf是無狀態的即使多次呼叫。也無所謂。
所以。總結一下,對於flink 來說,因為基於事件的處理,聚合、join等操作會有狀態快取,那麼此時再用到含有外部儲存狀態的udf,一定要謹慎,結合執行計劃,來合理放置udf的位置,避免出錯。
當然。除錯階段最好是有具體的日誌。便於分析和定位問題。
flink 狀態刪除
事實上。flink聚合等內部狀態有配置能夠使其自己主動刪除的,具體配置使用例如以下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));
// define query
Table result = ...
// create TableSink
TableSink<Row> sink = ...
// emit result Table via a TableSink
result.writeToSink(sink, qConfig);
// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
[完]
推薦閱讀: