Flink基礎(二十七):FLINK SQL(三)查詢語句(三)操作符(二)
5 OrderBy & Limit
操作符 | 描述 |
---|---|
Order By 批處理流處理 |
注意:流處理結果需主要根據時間屬性按照升序進行排序。支援使用其他排序屬性。
|
Limit 批處理 |
注意:LIMIT 查詢需要有一個 ORDER BY 字句。
|
6 Top-N
目前僅 Blink 計劃器支援 Top-N 。
Top-N 查詢是根據列排序找到N個最大或最小的值。最大值集和最小值集都被視為是一種 Top-N 的查詢。若在批處理或流處理的表中需要顯示出滿足條件的 N 個最底層記錄或最頂層記錄, Top-N 查詢將會十分有用。得到的結果集將可以進行進一步的分析。
Flink 使用 OVER 視窗條件和過濾條件相結合以進行 Top-N 查詢。利用 OVER 視窗的PARTITION BY
子句的功能,Flink 還支援逐組 Top-N 。 例如,每個類別中實時銷量最高的前五種產品。批處理表和流處理表都支援基於SQL的 Top-N 查詢。 以下是 TOP-N 表示式的語法:
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]
引數說明:
ROW_NUMBER()
: 根據當前分割槽內的各行的順序從第一行開始,依次為每一行分配一個唯一且連續的號碼。目前,我們只支援ROW_NUMBER
在 over 視窗函式中使用。未來將會支援RANK()
和DENSE_RANK()
函式。PARTITION BY col1[, col2...]
: 指定分割槽列,每個分割槽都將會有一個 Top-N 結果。ORDER BY col1 [asc|desc][, col2 [asc|desc]...]
WHERE rownum <= N
: Flink 需要rownum <= N
才能識別一個查詢是否為 Top-N 查詢。 其中, N 代表最大或最小的 N 條記錄會被保留。[AND conditions]
: 在 where 語句中,可以隨意新增其他的查詢條件,但其他條件只允許通過AND
與rownum <= N
結合使用。
流處理模式需注意TopN 查詢的結果會帶有更新。 Flink SQL 會根據排序鍵對輸入的流進行排序;若 top N 的記錄發生了變化,變化的部分會以撤銷、更新記錄的形式傳送到下游。 推薦使用一個支援更新的儲存作為 Top-N 查詢的 sink 。另外,若 top N 記錄需要儲存到外部儲存,則結果表需要擁有相同與 Top-N 查詢相同的唯一鍵。
Top-N 的唯一鍵是分割槽列和 rownum 列的結合,另外 Top-N 查詢也可以獲得上游的唯一鍵。以下面的任務為例,product_id
是ShopSales
的唯一鍵,然後 Top-N 的唯一鍵是 [category
,rownum
] 和 [product_id
] 。
下面的樣例描述瞭如何指定帶有 Top-N 的 SQL 查詢。這個例子的作用是我們上面提到的“查詢每個分類實時銷量最大的五個產品”。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 讀取外部資料來源的 DataStream val ds: DataStream[(String, String, String, Long)] = env.addSource(...) // 註冊名為 “ShopSales” 的 DataStream tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales") // 選擇每個分類中銷量前5的產品 val result1 = tableEnv.sqlQuery( """ |SELECT * |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num | FROM ShopSales) |WHERE row_num <= 5 """.stripMargin)
7 無排名輸出優化
如上文所描述,rownum
欄位會作為唯一鍵的其中一個欄位寫到結果表裡面,這會導致大量的結果寫出到結果表。比如,當原始結果(名為product-1001
)從排序第九變化為排序第一時,排名 1-9 的所有結果都會以更新訊息的形式傳送到結果表。若結果表收到太多的資料,將會成為 SQL 任務的瓶頸。
優化方法是在 Top-N 查詢的外部 SELECT 子句中省略 rownum 欄位。由於前N條記錄的數量通常不大,因此消費者可以自己對記錄進行快速排序,因此這是合理的。去掉 rownum 欄位後,上述的例子中,只有變化了的記錄(product-1001
)需要傳送到下游,從而可以節省大量的對結果表的 IO 操作。
以下的例子描述瞭如何以這種方式優化上述的 Top-N 查詢:
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 從外部資料來源讀取 DataStream val ds: DataStream[(String, String, String, Long)] = env.addSource(...) // 註冊名為 “ShopSales” 的資料來源 tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales") // 選擇每個分類中銷量前5的產品 val result1 = tableEnv.sqlQuery( """ |SELECT product_id, category, product_name, sales -- omit row_num field in the output |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num | FROM ShopSales) |WHERE row_num <= 5 """.stripMargin)
使用流處理模式時需注意為了使上述查詢輸出可以輸出到外部儲存並且結果正確,外部儲存需要擁有與 Top-N 查詢一致的唯一鍵。在上述的查詢例子中,若product_id
是查詢的唯一鍵,那麼外部表必須要有product_id
作為其唯一鍵。
8 去重
注意僅 Blink planner 支援去重。
去重是指對在列的集合內重複的行進行刪除,只保留第一行或最後一行資料。 在某些情況下,上游的 ETL 作業不能實現精確一次的端到端,這將可能導致在故障恢復 時,sink 中有重複的記錄。 由於重複的記錄將影響下游分析作業的正確性(例如,SUM
、COUNT
), 所以在進一步分析之前需要進行資料去重。
與 Top-N 查詢相似,Flink 使用ROW_NUMBER()
去除重複的記錄。理論上來說,去重是一個特殊的 Top-N 查詢,其中 N 是 1 ,記錄則是以處理時間或事件事件進行排序的。
以下程式碼展示了去重語句的語法:
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
引數說明:
ROW_NUMBER()
: 從第一行開始,依次為每一行分配一個唯一且連續的號碼。PARTITION BY col1[, col2...]
: 指定分割槽的列,例如去重的鍵。ORDER BY time_attr [asc|desc]
: 指定排序的列。所制定的列必須為時間屬性。目前僅支援proctime attribute,在未來版本中將會支援Rowtime atttribute。升序( ASC )排列指只保留第一行,而降序排列( DESC )則指保留最後一行。WHERE rownum = 1
: Flink 需要rownum = 1
以確定該查詢是否為去重查詢。
以下的例子描述瞭如何指定 SQL 查詢以在一個流計算表中進行去重操作。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 從外部資料來源讀取 DataStream val ds: DataStream[(String, String, String, Int)] = env.addSource(...) // 註冊名為 “Orders” 的 DataStream tableEnv.createTemporaryView("Orders", ds, $"order_id", $"user", $"product", $"number", $"proctime".proctime) // 由於不應該出現兩個訂單有同一個order_id,所以根據 order_id 去除重複的行,並保留第一行 val result1 = tableEnv.sqlQuery( """ |SELECT order_id, user, product, number |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num | FROM Orders) |WHERE row_num = 1 """.stripMargin)
9 分組視窗
SQL 查詢的分組視窗是通過GROUP BY
子句定義的。類似於使用常規GROUP BY
語句的查詢,視窗分組語句的GROUP BY
子句中帶有一個視窗函式為每個分組計算出一個結果。以下是批處理表和流處理表支援的分組視窗函式:
分組視窗函式 | 描述 |
---|---|
TUMBLE(time_attr, interval) |
定義一個滾動視窗。滾動視窗把行分配到有固定持續時間(interval )的不重疊的連續視窗。比如,5 分鐘的滾動視窗以 5 分鐘為間隔對行進行分組。滾動視窗可以定義在事件時間(批處理、流處理)或處理時間(流處理)上。 |
HOP(time_attr, interval, interval) |
定義一個跳躍的時間視窗(在 Table API 中稱為滑動視窗)。滑動視窗有一個固定的持續時間( 第二個interval 引數 )以及一個滑動的間隔(第一個interval 引數 )。若滑動間隔小於視窗的持續時間,滑動視窗則會出現重疊;因此,行將會被分配到多個視窗中。比如,一個大小為 15 分組的滑動視窗,其滑動間隔為 5 分鐘,將會把每一行資料分配到 3 個 15 分鐘的視窗中。滑動視窗可以定義在事件時間(批處理、流處理)或處理時間(流處理)上。 |
SESSION(time_attr, interval) |
定義一個會話時間視窗。會話時間視窗沒有一個固定的持續時間,但是它們的邊界會根據interval 所定義的不活躍時間所確定;即一個會話時間視窗在定義的間隔時間內沒有時間出現,該視窗會被關閉。例如時間視窗的間隔時間是 30 分鐘,當其不活躍的時間達到30分鐘後,若觀測到新的記錄,則會啟動一個新的會話時間視窗(否則該行資料會被新增到當前的視窗),且若在 30 分鐘內沒有觀測到新紀錄,這個視窗將會被關閉。會話時間視窗可以使用事件時間(批處理、流處理)或處理時間(流處理)。 |
10 時間屬性
在流處理表中的 SQL 查詢中,分組視窗函式的time_attr
引數必須引用一個合法的時間屬性,且該屬性需要指定行的處理時間或事件時間。可參考時間屬性文件以瞭解如何定義時間屬性。
對於批處理的 SQL 查詢,分組視窗函式的time_attr
引數必須是一個TIMESTAMP
型別的屬性。
11 選擇分組視窗的開始和結束時間戳
可以使用以下輔助函式選擇組視窗的開始和結束時間戳以及時間屬性:
輔助函式 | 描述 |
---|---|
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) |
返回相對應的滾動、滑動和會話視窗範圍內的下界時間戳。 |
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) |
返回相對應的滾動、滑動和會話視窗範圍以外的上界時間戳。 注意:範圍以外的上界時間戳不可以在隨後基於時間的操作中,作為行時間屬性使用,比如interval join以及分組視窗或分組視窗上的聚合。 |
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) |
返回相對應的滾動、滑動和會話視窗範圍以內的上界時間戳。 返回的是一個可用於後續需要基於時間的操作的時間屬性(rowtime attribute),比如interval join以及分組視窗或分組視窗上的聚合。 |
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) |
返回一個可用於後續需要基於時間的操作的處理時間引數,比如interval join以及分組視窗或分組視窗上的聚合. |
注意:輔助函式必須使用與GROUP BY
子句中的分組視窗函式完全相同的引數來呼叫.
以下的例子展示瞭如何在流處理表中指定使用分組視窗函式的 SQL 查詢。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // 從外部資料來源讀取 DataSource val ds: DataStream[(Long, String, Int)] = env.addSource(...) // 計算每日(使用處理時間)的 SUM(amount) tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount", $"proctime".proctime, $"rowtime".rowtime) // 計算每日的 SUM(amount) (使用事件時間) val result1 = tableEnv.sqlQuery( """ |SELECT | user, | TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, | SUM(amount) | FROM Orders | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user """.stripMargin) // 計算每日的 SUM(amount) (使用處理時間) val result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user") // 使用事件時間計算過去24小時中每小時的 SUM(amount) val result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product") // 計算每個以12小時(事件時間)作為不活動時間的會話的 SUM(amount) val result4 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user """.stripMargin)
12 模式匹配
操作符 | 描述 |
---|---|
MATCH_RECOGNIZE 流處理 |
根據 更多詳情請參考檢測表中的模式.
|