1. 程式人生 > 其它 >實時數倉入門訓練營:實時計算 Flink 版 SQL 實踐

實時數倉入門訓練營:實時計算 Flink 版 SQL 實踐

簡介:《實時數倉入門訓練營》由阿里雲研究員王峰、阿里雲資深技術專家金曉軍、阿里雲高階產品專家劉一鳴等實時計算 Flink 版和 Hologres 的多名技術/產品一線專家齊上陣,合力搭建此次訓練營的課程體系,精心打磨課程內容,直擊當下同學們所遇到的痛點問題。由淺入深全方位解析實時數倉的架構、場景、以及實操應用,7 門精品課程幫助你 5 天時間從小白成長為大牛!

本文整理自直播《實時計算 Flink 版 SQL 實踐-李麟(海豹)》
視訊連結:https://c.tb.cn/F3.0dBssY

內容簡要:
一、實時計算Flink版SQL簡介
二、實時計算Flink版SQL上手示例
三、開發常見問題和解法

實時計算Flink版SQL簡介

(一)關於實時計算Flink版SQL

實時計算Flink版選擇了SQL這種宣告式語言作為頂層API,比較穩定,也方便使用者使用。Flink SQL具備流批統一的特性,給使用者統一的開發體驗,並且語義一致。另外,Flink SQL能夠自動優化,包括遮蔽流計算裡面State的複雜性,也提供了自動優化的Plan,並且還集成了AutoPilot自動調優的功能。Flink SQL的應用場景也比較廣泛,包括資料整合、實時報表、實時風控,還有線上機器學習等場景。

(二)基本操作

在基本操作上,可以看到SQL的語法和標準SQL非常類似。示例中包括了基本的SELECT、FILTER操作。,可以使用內建函式,如日期的格式化,也可以使用自定義函式,比如示例中的匯率轉換就是一個使用者自定義函式,在平臺上註冊後就可以直接使用。

(三)維表 Lookup Join

在實際的資料處理過程中,維表的Lookup Join也是一個比較常見的例子。

這裡展示的是一個維表INNER JOIN示例。

例子中顯示的SOURCE表是一個實時變化的訂單資訊表,它通過INNER JOIN去關聯維表資訊,這裡標黃高亮的就是維表JOIN的語法,可以看到它和傳統的批處理有一個寫法上的差異,多了FOR SYSTEM_TIME AS OF這個子句來標明它是一個維表JOIN的操作。SOURCE表每來一條訂單訊息,它都會觸發維表運算元,去做一次對維表資訊的查詢,所以把它叫做一個Lookup Join。

(四)Window Aggregation

Window Aggregation(視窗聚合)操作也是常見的操作,Flink SQL中內建支援了幾種常用的Window型別,比如Tumble Window,Session Window,Hop Window,還有新引入的Cumulate Window。


Tumble

Tumble Window可以理解成固定大小的時間視窗,也叫滾窗,比如說5分鐘、10分鐘或者1個小時的固定間隔的視窗,視窗之間沒有重疊。


Session

Session Window(會話視窗) 定義了一個連續事件的範圍,視窗定義中的一個引數叫做Session Gap,表示兩條資料的間隔如果超過定義的時長,那麼前一個Window就結束了,同時生成了一個新的視窗。


Hop

Hop Window不同於滾動視窗的視窗不重疊,滑動視窗的視窗之間可以重疊。滑動視窗有兩個引數:size 和 slide。size 為視窗的大小,slide 為每次滑動的步長。如果slide < size,則視窗會重疊,同一條資料可能會被分配到多個視窗;如果 slide = size,則等同於 Tumble Window。如果 slide > size,視窗之間沒有重疊且有間隙。


Cumulate

Cumulate Window(累積視窗),是Flink社群1.13版本里新引入的,可以對比 Hop Window來理解,區別是從Window Start開始不斷去累積。示例中Window 1、Window 2、Window 3是在不斷地增長的。它有一個最大的視窗長度,比如我們定義Window Size是一天,然後Step步長是1個小時,那麼它會在一天中的每個小時產生累積到當前小時的聚合結果。

看一個具體的Window聚合處理示例。

如上圖所示,比如說需要進行每5分鐘單個使用者的點選數統計。

源資料是使用者的點選日誌,我們期望算出每5分鐘單個使用者的點選總數, SQL 中使用的是社群最新的 WindowTVF語法,先對源表開窗,再 GROUP BY 視窗對應的屬性 window_start和window_end, COUNT(*)就是點選數統計。

可以看到,當處理12:00到12:04的資料,有2個使用者產生了4次點選,分別能統計出來使用者Mary是3次,Bob是1次。在接下來一批資料裡面,又來了3條資料,對應地更新到下一個視窗中,分別是1次和2次。

(五)Group Aggregation

相對於Window Aggregation來說,Group Aggregation直接觸發計算,並不需要等到視窗結束,適用的一個場景是計算累積值。

上圖的例子是單個使用者累積到當前的點選數統計。從Query上看,寫法相對簡單一點,直接 GROUP BY user 去計算COUNT(*),就是累積計數。

可以看到,在結果上和Window的輸出是有差異的,在與Window相同的前4條輸入資料,Group Aggregation輸出的結果是Mary的點選數已更新到3次,具體的計算過程可能是從1變成2再變成3,Bob是1次,隨著後面3條資料的輸入,Bob對應的點選數又會更新成2次,對結果是持續更新的過程,這和Window的計算場景是有一些區別的。

之前Window窗口裡面輸出的資料,在視窗結束後結果就不會再改變,而在Group Aggregation裡,同一個Group Key的結果是會產生持續更新的。

(六)Window Aggregation Vs Group Aggregation

更全面地對比一下Window和Group Aggregation的一些區別。

Window Aggregation在輸出模式上是按時輸出,是在定義的資料到期之後它才會輸出。比如定義5分鐘的視窗,結果是延遲輸出的,比如00:00~00:05這個時間段,它會等整個視窗資料都到齊之後,才完整輸出出來,並且結果只輸出一次,不會再改變。

Group Aggregation是資料觸發,比如第一條資料來它就會輸出結果,同一個Key 的第二條資料來結果會更新,所以在輸出流的性質上兩者也是不一樣的。Window Aggregation一般情況下輸出的是Append Stream,而在Group Aggregation輸出的是Update Stream。

在狀態State處理上兩者的差異也比較大。Window Aggregation會自動清理過期資料,使用者就不需要額外再去關注 State的膨脹情況。Group Aggregation是基於無限的狀態去做累積,所以需要使用者根據自己的計算場景來定義State的TTL,就是State儲存多久。

比如統計一天內累計的PV和UV,不考慮資料延遲的情況,也至少要保證State的TTL要大於等於一天,這樣才能保證計算的精確性。如果State的TTL定義成半天,統計值就可能不準確了。

對輸出的儲存要求也是由輸出流的性質來決定的。在Window的輸出上,因為它是Append流,所有的型別都是可以對接輸出的。而Group Aggregatio輸出了更新流,所以要求目標儲存支援更新,可以用Hologres、MySQL或者HBase這些支援更新的儲存。

實時計算 Flink 版SQL上手示例

下面通過具體的例子來看每一種SQL操作在真實的業務場景中會怎麼使用,比如SQL基本的語法操作,包括一些常見的Aggregation的使用。

(一)示例場景說明:電商交易資料 - 實時數倉場景

這裡的例子是電商交易資料場景,模擬了實時數倉裡分層資料處理的情況。

在資料接入層,我們模擬了電商的交易訂單資料,它包括了訂單ID,商品ID,使用者ID,交易金額,商品的葉子類目,交易時間等基本資訊,這是一個簡化的表。

示例1會從接入層到資料明細層,完成一個數據清洗工作,此外還會做類目資訊的關聯,然後資料的彙總層我們會演示怎麼完成分鐘級的成交統計、小時級口徑怎麼做實時成交統計,最後會介紹下在天級累積的成交場景上,怎麼去做準實時統計。

- 示例環境:內測版

演示環境是目前內測版的實時計算Flink產品,在這個平臺可以直接做一站式的作業開發,包括除錯,還有線上的運維工作。

- 接入層資料

使用 SQL DataGen Connector 生成模擬電商交易資料。

接入層資料:為了方便演示,簡化了鏈路,用內建的SQL DataGen Connector來模擬電商資料的產生。

這裡面order_id是設計了一個自增序列,Connector的引數沒有完整貼出來。 DataGen Connector支援幾種生成模式,比如可以用Sequence產生自增序列,Random模式可以模擬隨機值,這裡根據不同的欄位業務含義,選擇了不同的生成策略。

比如order_id是自增的,商品ID是隨機選取了1~10萬,使用者ID是1~1000萬,交易金額用分做單位, cate_id是葉子類目ID,這裡共模擬100個葉子類目,直接通過計算列對商品ID取餘來生成,訂單建立時間使用當前時間模擬,這樣就可以在開發平臺上除錯,而不需要去建立Kafka或者DataHub做接入層的模擬。

(二)示例1-1 資料清洗

- 電商交易資料-訂單過濾

這是一個數據清洗的場景,比如需要完成業務上的訂單過濾,業務方可能會對交易金額有最大最小的異常過濾,比如要大於1元,小於1萬才保留為有效資料。

交易的建立時間是選取某個時刻之後的,通過WHERE條件組合過濾,就可以完成這個邏輯。

真實的業務場景可能會複雜很多,下面來看下SQL如何執行。

這是使用除錯模式,在平臺上點選執行按鈕進行本地除錯,可以看到金額這一列被過濾,訂單建立時間也都是大於要求的時間值。

從這個簡單的清洗場景可以看到,實時和傳統的批處理相比,在寫法上包括輸出結果差異並不大,流作業主要的差異是執行起來之後是長週期保持執行的,而不像傳統批處理,處理完資料之後就結束了。

(三)示例1-2 類目資訊關聯

接下來看一下怎麼做維表關聯。

根據剛才接入層的訂單資料,因為原始資料裡面是葉子類目資訊,在業務上需要關聯類目的維度表,維度表裡面記錄了葉子類目到一級類目的關聯關係,ID和名稱,清洗過程需要完成的目標是用原始表裡面葉子類目ID去關聯維表,補齊一級類目的ID和Name。這裡通過INNER JOIN維表的寫法,關聯之後把維表對應的欄位選出來。

和批處理的寫法差異僅僅在於維表的特殊語法FOR SYSTEM_TIME AS OF。

如上所示,平臺上可以上傳自己的資料用於除錯,比如這裡使用了1個CSV的測試資料,把100個葉子類目對映到10個一級類目上。

對應葉子類目ID的個位數就是它一級類目的ID,會關聯到對應的一級類目資訊,返回它的名稱。本地除錯執行優點是速度比較快,可以即時看到結果。在本地除錯模式中,終端收到1000條資料之後,會自動暫停,防止結果過大而影響使用。

(四)示例2-1 分鐘級成交統計

接下來我們來看一下基於Window的統計。

第一個場景是分鐘級成交統計,這是在彙總層比較常用的計算邏輯。

分鐘級統計很容易想到Tumble Window,每一分鐘都是各算各的,需要計算幾個指標,包括總訂單數、總金額、成交商品數、成交使用者數等。成交的商品數和使用者數要做去重,所以在寫法上做了一個Distinct處理。
視窗是剛剛介紹過的Tumble Window,按照訂單建立時間去劃一分鐘的視窗,然後按一級類目的維度統計每一分鐘的成交情況。

- 執行模式

上圖和剛才的除錯模式有點區別,上線之後就真正提交到叢集裡去執行一個作業,它的輸出採用了除錯輸出,直接Print到Log裡。展開作業拓撲,可以看到自動開啟了Local-Global的兩階段優化。

- 執行日誌 - 檢視除錯輸出結果

在執行一段時間之後,通過Task裡面的日誌可以看到最終的輸出結果。

用的是Print Sink,會直接打到Log裡面。在真實場景的輸出上,比如寫到Hologres/MySQL,那就需要去對應儲存的資料庫上檢視。

可以看到,輸出的資料相對於資料的原始時間是存在一定滯後的。

在19:46:05的時候,輸出了19:45:00這一個視窗的資料,延遲了5秒鐘左右輸出前1分鐘的聚合結果。

這5秒鐘實際上和定義源表時WATERMARK的設定是有關係的,在宣告WATERMARK時是相對gmt_create欄位加了5秒的offset。這樣起到的效果是,當到達的最早資料是 19:46:00 時,我們認為水位線是到了19:45:55,這就是5秒的延遲效果,來實現對亂序資料的寬容處理。

(五)示例2-2 小時級實時成交統計

第二個例子是做小時級實時成交統計。

如上圖所示,當要求實時統計,直接把Tumble Window開成1小時Size的Tumble Window,這樣能滿足實時性嗎?按照剛才展示的輸出結果,具有一定的延遲效果。因此開一個小時的視窗,必須等到這一個小時的資料都收到之後,在下一個小時的開始,才能輸出上一個小時的結果,延遲在小時級別的,滿足不了實時性的要求。回顧之前介紹的 Group Aggregation 是可以滿足實時要求的。

具體來看,比如需要完成小時+類目以及只算小時的兩個口徑統計,兩個統計一起做,在傳統批處理中常用的GROUPING SETS功能,在實時Flink上也是支援的。

我們可以直接GROUP BY GROUPING SETS,第一個是小時全口徑,第二個是類目+小時的統計口徑,然後計算它的訂單數,包括總金額,去重的商品數和使用者數。

這種寫法對結果加了空值轉換處理便於檢視資料,就是對小時全口徑的統計,輸出的一級類目是空的,需要對它做一個空值轉換處理。

上方為除錯模式的執行過程,可以看到Datagen生成的資料實時更新到一級類目和它對應的小時上。

這裡可以看到,兩個不同GROUP BY的結果在一起輸出,中間有一列ALL是通過空值轉換來的,這就是全口徑的統計值。本地除錯相對來說比較直觀和方便,有興趣的話也可以到阿里雲官網申請或購買進行體驗。

(六)示例2-3 天級累積成交準實時統計

第三個示例是天級累計成交統計,業務要求是準實時,比如說能夠接受分鐘級的更新延遲。

按照剛才Group Aggregation小時的實時統計,容易聯想到直接把Query改成天維度,就可以實現這個需求,而且實時性比較高,資料觸發之後可以達到秒級的更新。

回顧下之前提到的Window和Group Aggregation對於內建狀態處理上的區別,Window Aggregation可以實現State的自動清理,Group Aggregation需要使用者自己去調整 TTL。由於業務上是準實時的要求,在這裡可以有一個替代的方案,比如用新引入的Cumulate Window做累積的Window計算,天級的累積然後使用分鐘級的步長,可以實現每分鐘更新的準實時要求。

回顧一下Cumulate Window,如上所示。天級累積的話,Window的最大Size是到天,它的Window Step就是一分鐘,這樣就可以表達天級的累積統計。

具體的Query如上,這裡使用新的TVF語法,通過一個TABLE關鍵字把Windows的定義包含在中間,然後 Cumulate Window引用輸入表,接著定義它的時間屬性,步長和size 引數。GROUP BY就是普通寫法,因為它有提前輸出,所以我們把視窗的開始時間和結束時間一起打印出來。

這個例子也通過線上執行的方式去看Log輸出。

- 執行模式

可以看到,它和之前Tumble Window執行的結構類似,也是預聚合加上全域性聚合,它和Tumble Window的區別就是並不需要等到這一天資料都到齊了才輸出結果。

- 執行日誌 – 觀察除錯結果

從上方示例可以看到,在20:47:00的時候,已經有00:00:00到20:47:00的結果累積,還有對應的4列統計值。下一個輸出就是接下來的累計視窗,可以看到20:47:00到20:48:00就是一個累計的步長,這樣既滿足了天級別的累計統計需求,也能夠滿足準實時的要求。

(七)示例小結:電商交易資料-實時數倉場景

然後我們來整體總結一下以上的示例。

在接入層到明細層的清洗處理特點是相對簡單,也比較明確,比如業務邏輯上需要做固定的過濾條件,包括維度的擴充套件,這都是非常明確和直接的。

從明細層到彙總層,例子中的分鐘級統計,我們是用了Tumble Window,而小時級因為實時性的要求,換成了Group Aggregation,然後到天級累積分別展示Group Aggregation和新引入的Cumulate Window。

從彙總層的計算特點來說,我們需要去關注業務上的實時性要求和資料準確性要求,然後根據實際情況選擇Window聚合或者Group 聚合。

這裡為什麼要提到資料準確性?

在一開始比較Window Aggregation和Group Aggregation的時候,提到Group Aggregation的實時性非常好,但是它的資料準確性是依賴於State的TTL,當統計的週期大於TTL,那麼TTL的資料可能會失真。

相反,在Window Aggregation上,對亂序的容忍度有一個上限,比如最多接受等一分鐘,但在實際的業務資料中,可能99%的資料能滿足這樣的要求,還有1%的資料可能需要一個小時後才來。基於WATERMARK的處理,預設它就是一個丟棄策略,超過了最大的offset的這些資料就會被丟棄,不納入統計,此時資料也會失去它的準確性,所以這是一個相對的指標,需要根據具體的業務場景做選擇。

開發常見問題和解法

(一)開發中的常見問題

上方是實時計算真實業務接觸過程中比較高頻的問題。

首先是實時計算不知道該如何下手,怎麼開始做實時計算,比如有些同學有批處理的背景,然後剛開始接觸Flink SQL,不知道從哪開始。

另外一類問題是SQL寫完了,也清楚輸入處理的資料量大概是什麼級別,但是不知道實時作業執行起來之後需要設定多大的資源

還有一類是SQL寫得比較複雜,這個時候要去做除錯,比如要查為什麼計算出的資料不符合預期等類似問題,許多同學反映無從下手。

作業跑起來之後如何調優,這也是一個非常高頻的問題。

(二)開發常見問題解法

1.實時計算如何下手?

對於上手的問題,社群有很多官方的文件,也提供了一些示例,大家可以從簡單的例子上手,慢慢了解SQL裡面不同的運算元,在流式計算的時候會有一些什麼樣的特性。

此外,還可以關注開發者社群實時計算 Flink 版、 ververica.cn網站、 B 站的Apache Flink 公眾號等分享內容。

逐漸熟悉了SQL之後,如果想應用到生產環境中去解決真實的業務問題,阿里雲的行業解決方案裡也提供了一些典型的架構設計,可以作為參考。

2.複雜作業如何除錯?

如果遇到千行級別的複雜SQL,即使對於Flink的開發同學來也不能一目瞭然地把問題定位出來,其實還是需要遵循由簡到繁的過程,可能需要藉助一些除錯的工具,比如前面演示的平臺除錯功能,然後做分段的驗證,把小段SQL區域性的結果正確性除錯完之後,再一步一步組裝起來,最終讓這個複雜作業能達到正確性的要求。

另外,可以利用SQL語法上的特性,把SQL組織得更加清晰一點。實時計算Flink產品上有一個程式碼結構功能,可以比較方便地定位長SQL裡具體的語句,這都是一些輔助工具。

3.作業初始資源設定,如何調優?

我們有一個經驗是根據輸入的資料,初始做小併發測試一下,看它的效能如何,然後再去估算。在大併發壓測的時候,按照需求的吞吐量,逐步逼近,然後拿到預期的效能配置,這個是比較直接但也比較可靠的方式。

調優這一塊主要是藉助於作業的執行是情況,我們會去關注一些重點指標,比如說有沒有產生資料的傾斜,維表的Lookup Join需要訪問外部儲存,有沒有產生IO的瓶頸,這都是影響作業效能的常見瓶頸點,需要加以關注。

在實時計算Flink產品上集成了一個叫AutoPilot的功能,可以理解為類似於自動駕駛,在這種功能下,初始資源設多少就不是一個麻煩問題了。

在產品上,設定作業最大的資源限制後,根據實際的資料處理量,該用多少資源可以由引擎自動幫我們去調到最優狀態,根據負載情況來做伸縮。

原文連結 本文為阿里雲原創內容,未經允許不得轉載。