Flink SQL深度篇
Flink SQL深度篇
問題導讀
- 怎樣優化Logical Plan?
- 怎樣優化Stream Graph?
- TimeWindow, EventTime, ProcessTime 和 Watermark 四者之間的關係是什麼?
序言
時效性提升資料的價值, 所以 Flink 這樣的流式 (Streaming) 計算系統應用得越來越廣泛.
廣大的普通使用者決定一個產品的介面和介面. ETL開發者需要簡單而有效的開發工具, 從而把更多時間花在理業務和對口徑上. 因此流式計算系統都趨同以 SQL 作為唯一開發語言, 讓使用者以 Table 形式操作 Stream.
程式開發三部曲:First make it work, then make it right, and, finally, make it fast.
讓程式執行起來
- 開發者能用 SQL 方便地表達問題.
- 開發者能通過任務管理系統一體化地管理任務, 如:開發, 上線, 調優, 監控和排查任務.
讓程式執行正確
- 簡單資料清洗之外的流計算開發需求通常會涉及到 Streaming SQL 的兩個核心擴充套件:Window 和 Emit.
- 開發者深入理解 Window 和 Emit 的語義是正確實現這些業務需求的關鍵,
- 否則無法在資料時效性和資料準確性上做適合各個業務場景的決策和折中.
讓程式執行越來越快
流計算系統每年也會有很大的效能提升和功能擴充套件, 但想要深入調優及排錯, 還是要學習分散式系統的各個元件及原理, 各種運算元實現方法, 效能優化技術等知識.
以後, 隨著系統的進一步成熟和完善, 開發者在效能優化上的負擔會越來越低, 無需瞭解底層技術實現細節和手動配置各種引數, 就能享受效能和穩定性的逐步提升.
**分散式系統的一致性和可用性是一對矛盾, 流計算系統的資料準確性和資料時效性也是一對矛盾. ** 應用開發者都需要認識到這些矛盾, 並且知道自己在什麼場景下該作何種取捨.
本文希望通過剖析Flink Streaming SQL的三個具體例子:Union, Group By 和 Join , 來依次闡述流式計算模型的核心概念: What, Where, When, How . 以便開發者加深對 Streaming SQL 的 Window 和 Emit 語義的理解, 從而能在資料準確性
Union
程式碼
通過這個例子來闡述 Streaming SQL 的底層實現和優化手段:Logical Plan Optimization 和 Operator Chaining.
例子改編自 Flink StreamSQLExample . 只在最外層加了一個Filter, 以便觸發Filter下推及合併.
Code:
package com.atguigu.tableapi
import com.atguigu.bean.Order
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
object UnionTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "bear", 3),
Order(1L, "diaper", 4),
Order(3L, "rubber", 2)
))
val orderB: DataStream[Order] = env.fromCollection(Seq(
Order(2L, "pen", 3),
Order(2L, "rubber", 3),
Order(4L, "bear", 1)
))
// 以流注冊表
tEnv.createTemporaryView("OrderA", orderA)
tEnv.createTemporaryView("OrderB", orderB)
// sql
val sql =
"""
|select *
|from (
| select *
| from OrderA
| where user < 3
| union all
| select *
| from OrderB
|where product <> 'rubber') OrderAll
|where amount > 2
|""".stripMargin
val result: Table = tEnv.sqlQuery(sql)
result.toAppendStream[Order].print()
env.execute()
}
}
執行結果:
5> Order(1,diaper,4)
9> Order(2,pen,3)
4> Order(1,bear,3)
轉換 Table 為 Stream: Flink 會把基於 Table 的 Streaming SQL 轉為基於 Stream 的底層運算元, 並同時完成 Logical Plan 及 Operator Chaining 等優化
轉為邏輯計劃 Logical Plan
上述 UNION ALL SQL 依據 Relational Algebra 轉換為下面的邏輯計劃:
LogicalProject(user=[$0], product=[$1], amount=[$2])
LogicalFilter(condition=[>($2, 2)])
LogicalUnion(all=[true])
LogicalProject(user=[$0], product=[$1], amount=[$2])
LogicalFilter(condition=[<($0, 3)])
LogicalTableScan(table=[[OrderA]])
LogicalProject(user=[$0], product=[$1], amount=[$2])
LogicalFilter(condition=[<>($1, _UTF-16LE'rubber')])
LogicalTableScan(table=[[OrderB]])
SQL欄位與邏輯計劃有如下的對應關係:
Logical Plan 優化
理論基礎
冪等
數學: 19 * 10 * 1 * 1 = 19 * 10 = 190
SQL: SELECT * FROM (SELECT user, product FROM OrderA) = SELECT user, product FROM OrderA
交換律
數學:10 * 19 = 19 * 10 = 190
SQL: tableA UNION ALL tableB = tableB UNION ALL tableA
結合律
數學:
(1900 * 0.5)* 0.2 = 1900 * (0.5 * 0.2) = 190
1900 * (1.0 + 0.01) = 1900 * 1.0 + 1900 * 0.01 = 1919
SQL:
SELECT * FROM (SELECT user, amount FROM OrderA) WHERE amount > 2
SELECT * FROM (SELECT user, amount FROM OrderA WHERE amount > 2)
優化過程
Flink 的邏輯計劃優化規則清單請見: .
此 Union All 例子根據冪等, 交換律和結合律來完成以下三步優化:
消除冗餘的Project
利用冪等特性, 消除冗餘的 Project:
下推Filter
利用交換率和結合律特性, 下推 Filter:
合併Filter
利用結合律, 合併 Filter:
轉為物理計劃 Physical Plan
轉換後的 Flink 的物理執行計劃如下:
DataStreamUnion(all=[true], union all=[user, product, amount])
DataStreamcCalc(select][user, product, amount], where=[AND(<(user, 3), >(amount, 2))])
DataStreamScan(table=[[OrderA]])
DataStreamcCalc(select][user, product, amount], where=[AND(<>(product, _UTF-16LE'rubber'), >(amount, 2))])
DataStreamScan(table=[[OrderB]])
優化 Physical Plan
有 Physical Plan 優化這一步驟, 但對以上例子沒有效果, 所以忽略.
優化 Stream Graph
Stream Graph
這樣, 加上 Source 和 Sink, 產生了如下的 Stream Graph:
通過 來減少上下游運算元的資料傳輸消耗, 從而提高效能:
Chaining 判斷條件
private boolean isChainable(StreamEdge edge, boolean isChainingEnabled, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& isChainingEnabled;
}
Chaining 結果
按深度優先的順序遍歷 Stream Graph, 最終產生 5 個 Task 任務:
Group By
程式碼
package com.atguigu.tableapi
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.TimeZone
import com.atguigu.bean.OrderT
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* @author Zhang Chao
* @version java_day
* @date 2020/10/28 4:15 下午
*/
object GroupByTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
val orderA: DataStream[OrderT] = env.fromCollection(Seq(
OrderT(1L, "bear", 3, Timestamp.valueOf("2020-10-10 2:11:00")),
OrderT(3L, "rubber", 2,Timestamp.valueOf("2020-10-10 2:38:35")),
OrderT(1L, "diaper", 4, Timestamp.valueOf("2020-10-10 3:11:03")),
OrderT(1L, "diaper", 1, Timestamp.valueOf("2020-10-10 2:48:05"))
)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderT](Time.milliseconds(3000)) {
override def extractTimestamp(element: OrderT) = {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"+8))
dateFormat.parse(element.rowtime.toString).getTime
}
})
// 以流注冊表
tEnv.createTemporaryView("OrderA", orderA, 'user, 'product, 'amount, 'rowtime.rowtime)
// sql
val sql =
"""
|select
| user,
| TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS startDate,
| SUM(amount) AS totalAmount
|FROM
| OrderA
|GROUP BY user, tumble(rowtime, INTERVAL '1' HOUR)
|""".stripMargin
val result: Table = tEnv.sqlQuery(sql)
result.toAppendStream[Row].print()
env.execute()
}
}
輸出:
12> 3,2020-10-10 02:00:00.0,2
4> 1,2020-10-10 02:00:00.0,4
4> 1,2020-10-10 03:00:00.0,4
轉換Table為Stream: 因為 Union All 例子比較詳細地闡述了轉換規則, 此處只討論特殊之處.
轉為邏輯計劃 Logical Plan
LogicalProject(user=[$0], startDate=[TUMBLE_START($1)], totalAmount=[$2])
LogicalAggregate(group=[{0, 1}], totalAmount=[SUM($2)])
LogicalProject(user=[$0], $f1=[TUMBLE($4, 86400000)], amount=[$2])
LogicalTableScan(table=[[OrderA]])
優化 Logical Plan
FlinkLogicalCalc(expr#0..5=[{inputs}], user=[$t0], rowtime=[$t2],amount=[$t1])
FlinkLogicalWindowAggregate(group=[{0}], totalAmount=[SUM($2)])
FlinkLogicalCalc(expr#0..4=[{inputs}], user=[$t0], rowtime=[$t4],amount=[$t2])
FlinkLogicalNativeTableScan(table=[[OrderA]])
GROUP BY 優化: 把 {"User + Window" -> SUM}
轉為 {User -> {Window -> SUM}}
.
新的資料結構確保同一 User 下所有 Window 都會被分配到同一個 Operator, 以便實現 SessionWindow 的 Merge 功能:
轉為物理計劃 Physical Plan
DataStreamCala(select=[user, w$start AS startDate, totalAmount])
DataStreamGroupWindowAggregate(
groupBy=[user],
window=[TumblingGroupWindow('w$, 'rowtime, 86400000.millis)],
select=[user,
SUM(amount) AS totalAmount,
start('w$') AS w$start,
end('w$') AS w$end,
rowtime('w$') AS w$rowtime,
proctime('W$) AS w$proctime])
DataStreamCalc(select=[user, rowtime, amount])
DataStreamScan(table=[[OrderA]])
優化 Stream Graph
經過 Task Chaining 優化後, 最終生成 3 個 Task:
Streaming 各基本概念之間的聯絡
此處希望以圖表的形式闡述各個概念之間的關係:
Window 和 EventTime
Flink 支援三種 Window 型別: Tumbling Windows , Sliding Windows 和 Session Windows.
每個事件的 EventTime 決定事件會落到哪些 TimeWindow, 但只有 Window 的第一個資料來到時, Window 才會被真正建立.
Window 和 WaterMark
可以設定 TimeWindow 的 AllowedLateness, 從而使 Window 可以處理延時資料.
只有當 WaterMark 超過 TimeWindow.end + AllowedLateness 時, Window 才會被銷燬.
TimeWindow, EventTime, ProcessTime 和 Watermark
我們以 WaterMark 的推進圖來闡述這四者之間的關係.
Window 為 TumbleWindow, 視窗大小為 1 小時, 允許的資料延遲為 1 小時:
WaterMark 和 EventTime: 新資料的最新 Eventime 推進 WaterMark
TimeWindow 的生命週期:
以下三條資料的 EventTime 決定 TimeWindow 的狀態轉換.
資料 1 的 Eventtime 屬於 Window[10:00, 11,00), 因為Window不存在, 所以建立此 Window.
資料 2 的 Eventime 推進 WaterMark 超過11:00 (Window.end), 所以觸發Pass End.
資料 3 的 Eventime 推進 WaterMark 超過 12:00 (Window.end + allowedLateness), 所以關閉此Window.
TimeWindow 的結果輸出:
使用者可以通過 Trigger 來控制視窗結果的輸出, 按視窗的狀態型別有以下三種 Trigger.
Flink 的 Streaming SQL 目前只支援 PassEnd Trigger, 且預設 AllowedLateness = 0.
如果觸發頻率是 Repeated, 比如:每分鐘, 往下游輸出一次. 那麼這個時間只能是 ProcessTime.
因為 WarkMark 在不同場景下會有不同推進速度, 比如處理一小時的資料,
可能只需十分鐘 (重跑), 一個小時(正常執行) 或 大於1小時(積壓) .
執行結果:
允許資料亂序是分散式系統能夠併發處理訊息的前提.
當前這個例子, 資料如果亂序可以產生不同的輸出結果.
資料有序SUM運算元接收到的資料
資料的 Eventtime 按升序排列:
OrderT(1L, "bear", 3, Timestamp.valueOf("2020-10-10 2:11:00"))
OrderT(3L, "rubber", 2,Timestamp.valueOf("2020-10-10 2:38:35"))
OrderT(1L, "diaper", 1, Timestamp.valueOf("2020-10-10 2:48:05"))
OrderT(1L, "diaper", 4, Timestamp.valueOf("2020-10-10 3:11:03"))
WarterMark推進圖
每條新資料都能推進 Watermark:
結果輸出
所有資料都被處理, 沒有資料被丟棄:
12> 3,2020-10-10 02:00:00.0,2
4> 1,2020-10-10 02:00:00.0,4
4> 1,2020-10-10 03:00:00.0,4
資料亂序SUM運算元接收到的資料
第四條事件延時到來:
OrderT(1L, "bear", 3, Timestamp.valueOf("2020-10-10 2:11:00"))
OrderT(3L, "rubber", 2,Timestamp.valueOf("2020-10-10 2:38:35"))
OrderT(1L, "diaper", 4, Timestamp.valueOf("2020-10-10 3:11:03"))
OrderT(1L, "diaper", 1, Timestamp.valueOf("2020-10-10 2:48:05"))
WarterMark 推進圖
延遲的資料不會推進WaterMark, 且被丟棄.
輸出結果
沒有統計因延遲被丟棄的第四條事件:
12> 3,2020-10-10 02:00:00.0,2
4> 1,2020-10-10 02:00:00.0,3
4> 1,2020-10-10 03:00:00.0,4
Join
程式碼
package com.atguigu.tableapi
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import java.sql.Timestamp
import org.apache.flink.types.Row
case class Show(var impressionId: String, var name: String, eventTime: String)
case class Click(var impressionId: String, var name: String, eventTime: String)
object JoinTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
val showAd: DataStream[Show] = env.fromCollection(Seq(
Show("1", "show", "2020-10-10 10:10:10"),
Show("2", "show", "2020-10-10 10:11:10"),
Show("3", "show", "2020-10-10 10:12:10")
))
Thread.sleep(1000)
val clickAd: DataStream[Click] = env.fromCollection(Seq(
Click("1", "click", "2020-10-10 10:13:11"),
Click("3", "click", "2020-10-10 10:12:33"),
))
// 以流注冊表
tEnv.createTemporaryView("ShowAd", showAd, 'impressionId, 'name, 'eventTime)
tEnv.createTemporaryView("ClickAd", clickAd, 'impressionId, 'name, 'eventTime)
// sql
val sql =
"""
|select
| ShowAd.impressionId AS impressionId,
| ShowAd.eventTime As showTime,
| CASE WHEN ClickAd.eventTime <> '' THEN 'clicked' ELSE 'showed' END AS status
|FROM ShowAd
|LEFT JOIN ClickAd ON ShowAd.impressionId = ClickAd.impressionId
|""".stripMargin
val result: Table = tEnv.sqlQuery(sql)
result.toRetractStream[Row].print()
env.execute()
}
}
轉為邏輯計劃 Logical Plan
LogicalProject(impressId=[$0], showTime=[$2], clickTime=[$5])
LogicalJoin(condition=[=($0, $3)], joinType=[left])
LogicalTableScan(table=[[ShowAd]])
LogicalTableScan(table=[[ClickAd]])
優化 Logical Plan
LogicalProject(impressId=[$0], showTime=[$2], clickTime=[$5])
LogicalJoin(condition=[=($0, $3)], joinType=[left])
LogicalTableScan(table=[[ShowAd]])
LogicalTableScan(table=[[ClickAd]])
轉為物理計劃 Physical Plan
DataStreamCalc(Select=[impressionId AS impressId, eventTime AS showTime, eventTime0 AS clickTime])
DataStreamJoin(where=[=(impressionId, impressionId0)], join=[impressionId, eventTime, impressionId0, eventTime0], joinType=[LeftOuterJoin])
DataStreamCalc(select=[impressionId, eventTime])
DataStreamScan(table=[[ShowAd]])
DataStreamCalc(select=[impressionId, eventTime])
DataStreamScan(table=[[ClickAd]])
優化 Stream Graph
執行結果
1> (true,2,2020-10-10 10:11:10,showed )
11> (true,3,2020-10-10 10:12:10,showed )
11> (false,3,2020-10-10 10:12:10,showed )
11> (true,3,2020-10-10 10:12:10,clicked)
11> (true,1,2020-10-10 10:10:10,showed )
11> (false,1,2020-10-10 10:10:10,showed )
11> (true,1,2020-10-10 10:10:10,clicked)
Retraction Stream
雖然 Retraction 機制最多增加一倍的資料傳輸量, 但能降低下游運算元的儲存負擔和撤銷實現難度.
我們在 Left Join 的輸出流後加一個 GROUP BY, 以觀察 Retraction 流的後續運算元的輸出:
val sql2 = "select status, count(1) from (" + sql + ") impressionStatus group by status"
輸出:
5> (true,showed ,1)
4> (true,clicked,1)
4> (false,clicked,1)
4> (true,clicked,2)
5> (false,showed ,1)
5> (true,showed ,2)
5> (false,showed ,2)
5> (true,showed ,1)
5> (false,showed ,1)
5> (true,showed ,2)
5> (false,showed ,2)
5> (true,showed ,1)
由此可見, Retraction 具有傳遞性, RetractStream 的後續的 Stream 也會是RetractionStream.
終止
最終需要支援 Retraction 的 Sink 來終止 RetractionStream, 比如:
class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
val retractedResults = scala.collection.mutable.Map[String, String]()
override def invoke(value: (Boolean, Row)): Unit = {
retractedResults.synchronized{
val flag = value._1
val status = value._2.getField(0).toString
val count = value._2.getField(1).toString
if(flag == false){
retractedResults -= status
}else{
retractedResults += (status -> count)
}
}
}
override def close(): Unit = println(retractedResults)
}
result.toRetractStream[Row].addSink(new RetractingSink).setParallelism(1)
最終輸出 retractedResults:
Map(showed -> 1, clicked -> 2)
儲存
只有外部儲存支援 UPDATE 或 DELETE 操作時, 才能實現 RetractionSink, 常見的KV 儲存和資料庫, 如HBase, Mysql 都可實現 RetractionSink.
後續程式總能從這些儲存中讀取最新資料, 上游是否是 Retraction 流對使用者是透明的.
常見的訊息佇列, 如Kafka, 只支援 APPEND 操作, 則不能實現 RetractionSink.
後續程式從這些訊息佇列可能會讀到重複資料, 因此使用者需要在後續程式中處理重複資料.
總結
Flink Streaming SQL的實現從上到下共有三層:
- 和
- Distributed Snapshots
其中 Streaming Data Model 和 Distributed Snapshot 是 Flink 這個分散式流計算系統的核心架構設計.
Streaming Data Model 的 What, Where, When, How 明確了流計算系統的表達能力及預期應用場景.
Distributed Snapshots 針對預期的應用場景在資料準確性, 系統穩定性和執行效能上做了合適的折中.
操作, 則不能實現 RetractionSink.
後續程式從這些訊息佇列可能會讀到重複資料, 因此使用者需要在後續程式中處理重複資料.
總結
Flink Streaming SQL的實現從上到下共有三層:
- 和
- Distributed Snapshots
其中 Streaming Data Model 和 Distributed Snapshot 是 Flink 這個分散式流計算系統的核心架構設計.
Streaming Data Model 的 What, Where, When, How 明確了流計算系統的表達能力及預期應用場景.
Distributed Snapshots 針對預期的應用場景在資料準確性, 系統穩定性和執行效能上做了合適的折中.
本文通過例項闡述了流計算開發者需要了解的最上面兩層的概念和原理, 以便流計算開發者能在資料準確性和資料時效性上做適合業務場景的折中和取捨.