Apache Flink 漫談系列(13) - Table API 概述
什麼是Table API
在《Apache Flink 漫談系列(08) - SQL概覽》中我們概要的向大家介紹了什麼是好SQL,SQL和Table API是Apache Flink中的同一層次的API抽象,如下圖所示:
Apache Flink 針對不同的使用者場景提供了三層使用者API,最下層ProcessFunction API可以對State,Timer等複雜機制進行有效的控制,但使用者使用的便捷性很弱,也就是說即使很簡單統計邏輯,也要較多的程式碼開發。第二層DataStream API對視窗,聚合等運算元進行了封裝,使用者的便捷性有所增強。最上層是SQL/Table API,Table API是Apache Flink中的宣告式,可被查詢優化器優化的高階分析API。
Table API的特點
Table API和SQL都是Apache Flink中最高層的分析API,SQL所具備的特點Table API也都具有,如下:
- 宣告式 - 使用者只關心做什麼,不用關心怎麼做;
- 高效能 - 支援查詢優化,可以獲取最好的執行效能;
- 流批統一 - 相同的統計邏輯,既可以流模式執行,也可以批模式執行;
- 標準穩定 - 語義遵循SQL標準,語法語義明確,不易變動。
當然除了SQL的特性,因為Table API是在Flink中專門設計的,所以Table API還具有自身的特點:
- 表達方式的擴充套件性 - 在Flink中可以為Table API開發很多便捷性功能,如:Row.flatten(), map/flatMap 等
- 功能的擴充套件性 - 在Flink中可以為Table API擴充套件更多的功能,如:Iteration,flatAggregate 等新功能
- 編譯檢查 - Table API支援java和scala語言開發,支援IDE中進行編譯檢查。
說明:上面說的map/flatMap/flatAggregate都是Apache Flink 社群 FLIP-29 中規劃的新功能。
HelloWorld
在介紹Table API所有運算元之前我們先編寫一個簡單的HelloWorld來直觀瞭解如何進行Table API的開發。
Maven 依賴
在pom檔案中增加如下配置,本篇以flink-1.7.0功能為準進行後續介紹。
<properties>
<table.version>1.7.0</table.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${table.version}</version>
</dependency>
</dependencies>
程式結構
在編寫第一Flink Table API job之前我們先簡單瞭解一下Flink Table API job的結構,如下圖所示:
- 外部資料來源,比如Kafka, Rabbitmq, CSV 等等;
- 查詢計算邏輯,比如最簡單的資料匯入select,雙流Join,Window Aggregate 等;
- 外部結果儲存,比如Kafka,Cassandra,CSV等。
說明:1和3 在Apache Flink中統稱為Connector。
主程式
我們以一個統計單詞數量的業務場景,編寫第一個HelloWorld程式。
根據上面Flink job基本結構介紹,要Table API完成WordCount的計算需求,我們需要完成三部分程式碼:
- TableSoruce Code - 用於建立資料來源的程式碼
- Table API Query - 用於進行word count統計的Table API 查詢邏輯
- TableSink Code - 用於儲存word count計算結果的結果表程式碼
執行模式選擇
一個job我們要選擇是Stream方式執行還是Batch模式執行,所以任何統計job的第一步是進行執行模式選擇,如下我們選擇Stream方式執行。
// Stream執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
構建測試Source
我們用最簡單的構建Source方式進行本次測試,程式碼如下:
// 測試資料
val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
// 最簡單的獲取Source方式
val source = env.fromCollection(data).toTable(tEnv, 'word)
WordCount 統計邏輯
WordCount核心統計邏輯就是按照單詞分組,然後計算每個單詞的數量,統計邏輯如下:
// 單詞統計核心邏輯
val result = source
.groupBy('word) // 單詞分組
.select('word, 'word.count) // 單詞統計
定義Sink
將WordCount的統計結果寫入Sink中,程式碼如下:
// 自定義Sink
val sink = new RetractSink // 自定義Sink(下面有完整程式碼)
// 計算結果寫入sink
result.toRetractStream[(String, Long)].addSink(sink)
完整的HelloWord程式碼
為了方便大家執行WordCount查詢統計,將完整的程式碼分享大家(基於flink-1.7.0),如下:
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import scala.collection.mutable
object HelloWord {
def main(args: Array[String]): Unit = {
// 測試資料
val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
// Stream執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 最簡單的獲取Source方式
val source = env.fromCollection(data).toTable(tEnv, 'word)
// 單詞統計核心邏輯
val result = source
.groupBy('word) // 單詞分組
.select('word, 'word.count) // 單詞統計
// 自定義Sink
val sink = new RetractSink
// 計算結果寫入sink
result.toRetractStream[(String, Long)].addSink(sink)
env.execute
}
}
class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
private var resultSet: mutable.Set[(String, Long)] = _
override def open(parameters: Configuration): Unit = {
// 初始化記憶體儲存結構
resultSet = new mutable.HashSet[(String, Long)]
}
override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
if (v._1) {
// 計算資料
resultSet.add(v._2)
}
else {
// 撤回資料
resultSet.remove(v._2)
}
}
override def close(): Unit = {
// 列印寫入sink的結果資料
resultSet.foreach(println)
}
}
執行結果如下:
雖然上面用了較長的紙墨介紹簡單的WordCount統計邏輯,但source和sink部分都是可以在學習後面運算元中被複用的。本例核心的統計邏輯只有一行程式碼: source.groupBy('word).select('word, 'word.count)
所以Table API開發技術任務非常的簡潔高效。
Table API 運算元
雖然Table API與SQL的運算元語義一致,但在表達方式上面SQL以文字的方式展現,Table API是以java或者scala語言的方式進行開發。為了大家方便閱讀,即便是在《Apache Flink 漫談系列(08) - SQL概覽》中介紹過的運算元,在這裡也會再次進行介紹,當然對於Table API和SQL不同的地方會進行詳盡介紹。
示例資料及測試類
測試資料
- customer_tab 表 - 客戶表儲存客戶id,客戶姓名和客戶描述資訊。欄位及測試資料如下:
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
- order_tab 表 - 訂單表儲存客戶購買的訂單資訊,包括訂單id,訂單時間和訂單描述資訊。 欄位節測試資料如下:
o_id | c_id | o_time | o_desc |
---|---|---|---|
o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
- Item_tab
商品表, 攜帶商品id,商品型別,出售時間,價格等資訊,具體如下:
itemID | itemType | onSellTime | price |
---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 |
ITEM003 | Electronic | 2017-11-11 10:03:00 | 30 |
ITEM004 | Electronic | 2017-11-11 10:03:00 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 |
- PageAccess_tab
頁面訪問表,包含使用者ID,訪問時間,使用者所在地域資訊,具體資料如下:
region | userId | accessTime |
---|---|---|
ShangHai | U0010 | 2017-11-11 10:01:00 |
BeiJing | U1001 | 2017-11-11 10:01:00 |
BeiJing | U2032 | 2017-11-11 10:10:00 |
BeiJing | U1100 | 2017-11-11 10:11:00 |
ShangHai | U0011 | 2017-11-11 12:10:00 |
- PageAccessCount_tab
頁面訪問表,訪問量,訪問時間,使用者所在地域資訊,具體資料如下:
region | userCount | accessTime |
---|---|---|
ShangHai | 100 | 2017.11.11 10:01:00 |
BeiJing | 86 | 2017.11.11 10:01:00 |
BeiJing | 210 | 2017.11.11 10:06:00 |
BeiJing | 33 | 2017.11.11 10:10:00 |
ShangHai | 129 | 2017.11.11 12:10:00 |
- PageAccessSession_tab
頁面訪問表,訪問量,訪問時間,使用者所在地域資訊,具體資料如下:
region | userId | accessTime |
---|---|---|
ShangHai | U0011 | 2017-11-11 10:01:00 |
ShangHai | U0012 | 2017-11-11 10:02:00 |
ShangHai | U0013 | 2017-11-11 10:03:00 |
ShangHai | U0015 | 2017-11-11 10:05:00 |
ShangHai | U0011 | 2017-11-11 10:10:00 |
BeiJing | U0110 | 2017-11-11 10:10:00 |
ShangHai | U2010 | 2017-11-11 10:11:00 |
ShangHai | U0410 | 2017-11-11 12:16:00 |
測試類
我們建立一個TableAPIOverviewITCase.scala
用於接下來介紹Flink Table API運算元的功能體驗。程式碼如下:
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class Table APIOverviewITCase {
// 客戶表資料
val customer_data = new mutable.MutableList[(String, String, String)]
customer_data.+=(("c_001", "Kevin", "from JinLin"))
customer_data.+=(("c_002", "Sunny", "from JinLin"))
customer_data.+=(("c_003", "JinCheng", "from HeBei"))
// 訂單表資料
val order_data = new mutable.MutableList[(String, String, String, String)]
order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
// 商品銷售表資料
val item_data = Seq(
Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
Right((1510365720000L)),
Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
Right((1510365780000L)),
Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
Right((1510365900000L)),
Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
Right((1510365960000L)),
Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
Right((1510366020000L)),
Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
Right((151036608000L)))
// 頁面訪問表資料
val pageAccess_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
Right((1510365660000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
Right((1510366260000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
Right((1510373400000L)))
// 頁面訪問量表資料2
val pageAccessCount_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
Right((1510365660000L)),
Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
Right((1510366200000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
Right((1510373400000L)))
// 頁面訪問表資料3
val pageAccessSession_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
Right((1510365720000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
Right((1510365720000L)),
Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
Right((1510365900000L)),
Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
Right((1510366260000L)),
Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
Right((1510373760000L)))
val _tempFolder = new TemporaryFolder
// Streaming 環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
env.setStateBackend(getStateBackend)
def getProcTimeTables(): (Table, Table) = {
// 將order_tab, customer_tab 註冊到catalog
val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
(customer, order)
}
def getEventTimeTables(): (Table, Table, Table, Table) = {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 將item_tab, pageAccess_tab 註冊到catalog
val item =
env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
.toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
val pageAccess =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
val pageAccessCount =
env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
.toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
val pageAccessSession =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
(item, pageAccess, pageAccessCount, pageAccessSession)
}
@Rule
def tempFolder: TemporaryFolder = _tempFolder
def getStateBackend: StateBackend = {
new MemoryStateBackend()
}
def procTimePrint(result: Table): Unit = {
val sink = new RetractingSink
result.toRetractStream[Row].addSink(sink)
env.execute()
}
def rowTimePrint(result: Table): Unit = {
val sink = new RetractingSink
result.toRetractStream[Row].addSink(sink)
env.execute()
}
@Test
def testProc(): Unit = {
val (customer, order) = getProcTimeTables()
val result = ...// 測試的查詢邏輯
procTimePrint(result)
}
@Test
def testEvent(): Unit = {
val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables()
val result = ...// 測試的查詢邏輯
procTimePrint(result)
}
}
// 自定義Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
var retractedResults: ArrayBuffer[String] = null
override def open(parameters: Configuration): Unit = {
super.open(parameters)
retractedResults = mutable.ArrayBuffer.empty[String]
}
def invoke(v: (Boolean, Row)) {
retractedResults.synchronized {
val value = v._2.toString
if (v._1) {
retractedResults += value
} else {
val idx = retractedResults.indexOf(value)
if (idx >= 0) {
retractedResults.remove(idx)
} else {
throw new RuntimeException("Tried to retract a value that wasn't added first. " +
"This is probably an incorrectly implemented test. " +
"Try to set the parallelism of the sink to 1.")
}
}
}
}
override def close(): Unit = {
super.close()
retractedResults.sorted.foreach(println(_))
}
}
// Water mark 生成器
class EventTimeSourceFunction[T](
dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = ???
}
SELECT
SELECT 用於從資料集/流中選擇資料,語義是關係代數中的投影(Projection),對關係進行垂直分割,消去或增加某些列, 如下圖所示:
Table API 示例
從customer_tab
選擇使用者姓名,並用內建的CONCAT函式拼接客戶資訊,如下:
val result = customer
.select('c_name, concat_ws('c_name, " come ", 'c_desc))
Result
c_name | desc |
---|---|
Kevin | Kevin come from JinLin |
Sunny | Sunny come from JinLin |
Jincheng | Jincheng come from HeBei |
特別說明
大家看到在 SELECT
不僅可以使用普通的欄位選擇,還可以使用ScalarFunction
,當然也包括User-Defined Function
,同時還可以進行欄位的alias
設定。其實SELECT
可以結合聚合,在GROUPBY部分會進行介紹,一個比較特殊的使用場景是去重的場景,示例如下:
Table API示例
在訂單表查詢所有的客戶id,消除重複客戶id, 如下:
val result = order
.groupBy('c_id)
.select('c_id)
Result
c_id |
---|
c_001 |
c_002 |
WHERE
WHERE 用於從資料集/流中過濾資料,與SELECT一起使用,語義是關係代數的Selection,根據某些條件對關係做水平分割,即選擇符合條件的記錄,如下所示:
Table API 示例
在customer_tab
查詢客戶id為c_001
和c_003
的客戶資訊,如下:
val result = customer
.where("c_id = 'c_001' || c_id = 'c_003'")
.select( 'c_id, 'c_name, 'c_desc)
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_003 | JinCheng | from HeBei |
特別說明
我們發現WHERE
是對滿足一定條件的資料進行過濾,WHERE
支援=, <, >, <>, >=, <=以及&&
, ||
等表示式的組合,最終滿足過濾條件的資料會被選擇出來。 SQL中的IN
和NOT IN
在Table API裡面用intersect
和 minus
描述(flink-1.7.0版本)。
Intersect 示例
Intersect
只在Batch模式下進行支援,Stream模式下我們可以利用雙流JOIN來實現,如:在customer_tab
查詢已經下過訂單的客戶資訊,如下:
// 計算客戶id,並去重
val distinct_cids = order
.groupBy('c_id) // 去重
.select('c_id as 'o_c_id)
val result = customer
.join(distinct_cids, 'c_id === 'o_c_id)
.select('c_id, 'c_name, 'c_desc)
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
Minus 示例
Minus
只在Batch模式下進行支援,Stream模式下我們可以利用雙流JOIN來實現,如:在customer_tab
查詢沒有下過訂單的客戶資訊,如下:
// 查詢下過訂單的客戶id,並去重
val distinct_cids = order
.groupBy('c_id)
.select('c_id as 'o_c_id)
// 查詢沒有下過訂單的客戶資訊
val result = customer
.leftOuterJoin(distinct_cids, 'c_id === 'o_c_id)
.where('o_c_id isNull)
.select('c_id, 'c_name, 'c_desc)
說明上面實現邏輯比較複雜,我們後續考慮如何在流上支援更簡潔的方式。
Result
c_id | c_name | c_desc |
---|---|---|
c_003 | JinCheng | from HeBei |
Intersect/Minus與關係代數
如上介紹Intersect
是關係代數中的Intersection, Minus
是關係代數的Difference, 如下圖示意:
- Intersect(Intersection)
- Minus(Difference)
GROUP BY
GROUP BY 是對資料進行分組的操作,比如我需要分別計算一下一個學生表裡面女生和男生的人數分別是多少,如下:
Table API 示例
將order_tab資訊按c_id
分組統計訂單數量,簡單示例如下:
val result = order
.groupBy('c_id)
.select('c_id, 'o_id.count)
Result
c_id | o_count |
---|---|
c_001 | 2 |
c_002 | 1 |
特別說明
在實際的業務場景中,GROUP BY除了按業務欄位進行分組外,很多時候使用者也可以用時間來進行分組(相當於劃分視窗),比如統計每分鐘的訂單數量:
Table API 示例
按時間進行分組,查詢每分鐘的訂單數量,如下:
val result = order
.select('o_id, 'c_id, 'o_time.substring(1, 16) as 'o_time_min)
.groupBy('o_time_min)
.select('o_time_min, 'o_id.count)
Result
o_time_min | o_count |
---|---|
2018-11-05 10:01 | 2 |
2018-11-05 10:03 | 1 |
說明:如果我們時間欄位是timestamp型別,建議使用內建的 DATE_FORMAT
函式。
UNION ALL
UNION ALL 將兩個表合併起來,要求兩個表的欄位完全一致,包括欄位型別、欄位順序,語義對應關係代數的Union,只是關係代數是Set集合操作,會有去重複操作,UNION ALL 不進行去重,如下所示:
Table API 示例
我們簡單的將customer_tab
查詢2次,將查詢結果合併起來,如下:
val result = customer.unionAll(customer)
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
特別說明
UNION ALL 對結果資料不進行去重,如果想對結果資料進行去重,傳統資料庫需要進行UNION操作。
UNION
UNION 將兩個流給合併起來,要求兩個流的欄位完全一致,包括欄位型別、欄位順序,並其UNION 不同於UNION ALL,UNION會對結果資料去重,與關係代數的Union語義一致,如下:
Table API 示例
我們簡單的將customer_tab
查詢2次,將查詢結果合併起來,如下:
val result = customer.union(customer)
我們發現完全一樣的表資料進行 UNION
之後,資料是被去重的,UNION
之後的資料並沒有增加。
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
特別說明
UNION 對結果資料進行去重,在實際的實現過程需要對資料進行排序操作,所以非必要去重情況請使用UNION ALL操作。
JOIN
JOIN 用於把來自兩個表的行聯合起來形成一個寬表,Apache Flink支援的JOIN型別:
- JOIN - INNER JOIN
- LEFT JOIN - LEFT OUTER JOIN
- RIGHT JOIN - RIGHT OUTER JOIN
- FULL JOIN - FULL OUTER JOIN
JOIN與關係代數的Join語義相同,具體如下:
Table API 示例 (JOIN)
INNER JOIN
只選擇滿足ON
條件的記錄,我們查詢customer_tab
和 order_tab
表,將有訂單的客戶和訂單資訊選擇出來,如下:
val result = customer
.join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)
Result
c_id | c_name | c_desc | o_id | o_c_id | o_time | o_desc |
---|---|---|---|---|---|---|
c_001 | Kevin | from JinLin | o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
c_001 | Kevin | from JinLin | o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
c_002 | Sunny | from JinLin | o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
Table API 示例 (LEFT JOIN)
LEFT JOIN
與INNER JOIN
的區別是當右表沒有與左邊相JOIN的資料時候,右邊對應的欄位補NULL
輸出,語義如下:
對應的SQL語句如下(LEFT JOIN):
SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;
- 細心的讀者可能發現上面T2.ColC是添加了字首T2了,這裡需要說明一下,當兩張表有欄位名字一樣的時候,我需要指定是從那個表裡面投影的。
我們查詢customer_tab
和 order_tab
表,將客戶和訂單資訊選擇出來如下:
val result = customer
.leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)
Result
c_id | c_name | c_desc | o_id | c_id | o_time | o_desc |
---|---|---|---|---|---|---|
c_001 | Kevin | from JinLin | o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
c_001 | Kevin | from JinLin | o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
c_002 | Sunny | from JinLin | o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
c_003 | JinCheng | from HeBei | NULL | NULL | NULL | NULL |
特別說明
RIGHT JOIN
相當於 LEFT JOIN
左右兩個表互動一下位置。FULL JOIN
相當於 RIGHT JOIN
和 LEFT JOIN
之後進行UNION ALL
操作。
Time-Interval JOIN
Time-Interval JOIN 相對於UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條資料會與另一條流上的不同時間區域的資料進行JOIN。對應Apache Flink官方文件的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。 Time-Interval JOIN的語義和實現原理詳見《Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN》。其Table API核心的語法示例,如下:
...
val result = left
.join(right)
// 定義Time Interval
.where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
...
Lateral JOIN
Apache Flink Lateral JOIN 是左邊Table與一個UDTF進行JOIN,詳細的語義和實現原理請參考《Apache Flink 漫談系列(10) - JOIN LATERAL》。其Table API核心的語法示例,如下:
...
val udtf = new UDTF
val result = source.join(udtf('c) as ('d, 'e))
...
Temporal Table JOIN
Temporal Table JOIN 是左邊表與右邊一個攜帶版本資訊的表進行JOIN,詳細的語法,語義和實現原理詳見《Apache Flink 漫談系列(11) - Temporal Table JOIN》,其Table API核心的語法示例,如下:
...
val rates = tEnv.scan("versonedTable").createTemporalTableFunction('rowtime, 'r_currency)
val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)
...
Window
在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統資料庫的標準開窗,每一個元素都對應一個視窗。一種是GroupWindow,目前在SQL中GroupWindow都是基於時間進行視窗劃分的。
Over Window
Apache Flink中對OVER Window的定義遵循標準SQL的定義語法。
按ROWS和RANGE分類是傳統資料庫的標準分類方法,在Apache Flink中還可以根據時間型別(ProcTime/EventTime)和視窗的有限和無限(Bounded/UnBounded)進行分類,共計8種類型。為了避免大家對過細分類造成困擾,我們按照確定當前行的不同方式將OVER Window分成兩大類進行介紹,如下:
- ROWS OVER Window - 每一行元素都視為新的計算行,即,每一行都是一個新的視窗。
- RANGE OVER Window - 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個視窗。
Bounded ROWS OVER Window
Bounded ROWS OVER Window 每一行元素都視為新的計算行,即,每一行都是一個新的視窗。
語義
我們以3個元素(2 PRECEDING)的視窗為例,如下圖:
上圖所示視窗 user 1 的 w5和w6, user 2的 視窗 w2 和 w3,雖然有元素都是同一時刻到達,但是他們仍然是在不同的視窗,這一點有別於RANGE OVER Window。
Table API 示例
利用item_tab
測試資料,我們統計同類商品中當前和當前商品之前2個商品中的最高價格。
val result = item
.window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
.select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
Result
itemID | itemType | onSellTime | price | maxPrice |
---|---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 | 50 |
ITEM003 | Electronic | 2017-11-11 10:03:00 | 30 | 50 |
ITEM004 | Electronic | 2017-11-11 10:03:00 | 60 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 | 60 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 | 60 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 | 20 |
Bounded RANGE OVER Window
Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個視窗。
語義
我們以3秒中資料(INTERVAL '2' SECOND)的視窗為例,如下圖:
注意: 上圖所示視窗 user 1 的 w6, user 2的 視窗 w3,元素都是同一時刻到達,他們是在同一個視窗,這一點有別於ROWS OVER Window。
Tabel API 示例
我們統計同類商品中當前和當前商品之前2分鐘商品中的最高價格。
val result = item
.window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.minute following CURRENT_RANGE as 'w)
.select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
Result(Bounded RANGE OVER Window)
itemID | itemType | onSellTime | price | maxPrice |
---|---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 | 50 |
ITEM003 | Electronic | 2017-11-11 10:03:00 | 30 | 60 |
ITEM004 | Electronic | 2017-11-11 10:03:00 | 60 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 | 60 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 | 40 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 | 20 |
特別說明
OverWindow最重要是要理解每一行資料都確定一個視窗,同時目前在Apache Flink中只支援按時間欄位排序。並且OverWindow開窗與GroupBy方式資料分組最大的不同在於,GroupBy資料分組統計時候,在SELECT
中除了GROUP BY的key,不能直接選擇其他非key的欄位,但是OverWindow沒有這個限制,SELECT
可以選擇任何欄位。比如一張表table(a,b,c,d)4個欄位,如果按d分組求c的最大值,兩種寫完如下:
- GROUP BY -
tab.groupBy('d).select(d, MAX(c))
- OVER Window =
tab.window(Over.. as 'w).select('a, 'b, 'c, 'd, c.max over 'w)
如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然可以選擇 a,b,c欄位。但在GROUPBY中,SELECT 只能選擇 d 欄位。
Group Window
根據視窗資料劃分的不同,目前Apache Flink有如下3種Bounded Winodw:
- Tumble - 滾動視窗,視窗資料有固定的大小,視窗資料無疊加;
- Hop - 滑動視窗,視窗資料有固定大小,並且有固定的視窗重建頻率,視窗資料有疊加;
- Session - 會話視窗,視窗資料沒有固定的大小,根據視窗資料活躍程度劃分視窗,視窗資料無疊加。
說明: Aapche Flink 還支援UnBounded的 Group Window,也就是全域性Window,流上所有資料都在一個窗口裡面,語義非常簡單,這裡不做詳細介紹了。
Tumble
語義
Tumble 滾動視窗有固定size,視窗資料不重疊,具體語義如下:
Table API 示例
利用pageAccess_tab
測試資料,我們需要按不同地域統計每2分鐘的淘寶首頁的訪問量(PV)。
val result = pageAccess
.window(Tumble over 2.minute on 'rowtime as 'w)
.groupBy('w, 'region)
.select('region, 'w.start, 'w.end, 'region.count as 'pv)
Result
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:12:00.0 | 2 |
ShangHai | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
ShangHai | 2017-11-11 04:10:00.0 | 2017-11-11 04:12:00.0 | 1 |
Hop
Hop 滑動視窗和滾動視窗類似,視窗有固定的size,與滾動視窗不同的是滑動視窗可以通過slide引數控制滑動視窗的新建頻率。因此當slide值小於視窗size的值的時候多個滑動視窗會重疊。
語義
Hop 滑動視窗語義如下所示:
Table API 示例
利用pageAccessCount_tab
測試資料,我們需要每5分鐘統計近10分鐘的頁面訪問量(PV).
val result = pageAccessCount
.window(Slide over 10.minute every 5.minute on 'rowtime as 'w)
.groupBy('w)
.select('w.start, 'w.end, 'accessCount.sum as 'accessCount)
Result
winStart | winEnd | accessCount |
---|---|---|
2017-11-11 01:55:00.0 | 2017-11-11 02:05:00.0 | 186 |
2017-11-11 02:00:00.0 | 2017-11-11 02:10:00.0 | 396 |
2017-11-11 02:05:00.0 | 2017-11-11 02:15:00.0 | 243 |
2017-11-11 02:10:00.0 | 2017-11-11 02:20:00.0 | 33 |
2017-11-11 04:05:00.0 | 2017-11-11 04:15:00.0 | 129 |
2017-11-11 04:10:00.0 | 2017-11-11 04:20:00.0 | 129 |
Session
Seeeion 會話視窗 是沒有固定大小的視窗,通過session的活躍度分組元素。不同於滾動視窗和滑動視窗,會話視窗不重疊,也沒有固定的起止時間。一個會話視窗在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話視窗 分配器通過配置session gap來指定非活躍週期的時長.
語義
Session 會話視窗語義如下所示:
val result = pageAccessSession
.window(Session withGap 3.minute on 'rowtime as 'w)
.groupBy('w, 'region)
.select('region, 'w.start, 'w.end, 'region.count as 'pv)
Result
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:13:00.0 | 1 |
ShangHai | 2017-11-11 02:01:00.0 | 2017-11-11 02:08:00.0 | 4 |
ShangHai | 2017-11-11 02:10:00.0 | 2017-11-11 02:14:00.0 | 2 |
ShangHai | 2017-11-11 04:16:00.0 | 2017-11-11 04:19:00.0 | 1 |
巢狀Window
在Window之後再進行Window劃分也是比較常見的統計需求,那麼在一個Event-Time的Window之後,如何再寫一個Event-Time的Window呢?一個Window之後再描述一個Event-Time的Window最重要的是Event-time屬性的傳遞,在Table API中我們可以利用'w.rowtime
來傳遞時間屬性,比如:Tumble Window之後再接一個Session Window 示例如下:
...
val result = pageAccess
.window(Tumble over 2.minute on 'rowtime as 'w1)
.groupBy('w1)
.select('w1.rowtime as 'rowtime, 'col1.count as 'cnt)
.window(Session withGap 3.minute on 'rowtime as 'w2)
.groupBy('w2)
.select('cnt.sum)
...
Source&Sink
上面我們介紹了Apache Flink Table API核心運算元的語義和具體示例,這部分將選取Bounded EventTime Tumble Window為例為大家編寫一個完整的包括Source和Sink定義的Apache Flink Table API Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,使用者ID和訪問時間。我們需要按不同地域統計每2分鐘的淘寶首頁的訪問量(PV)。具體資料如下:
region | userId | accessTime |
---|---|---|
ShangHai | U0010 | 2017-11-11 10:01:00 |
BeiJing | U1001 | 2017-11-11 10:01:00 |
BeiJing | U2032 | 2017-11-11 10:10:00 |
BeiJing | U1100 | 2017-11-11 10:11:00 |
ShangHai | U0011 | 2017-11-11 12:10:00 |
Source 定義
自定義Apache Flink Stream Source需要實現StreamTableSource
, StreamTableSource
中通過StreamExecutionEnvironment
的addSource
方法獲取DataStream
, 所以我們需要自定義一個 SourceFunction
, 並且要支援產生WaterMark,也就是要實現DefinedRowtimeAttributes
介面。
Source Function定義
支援接收攜帶EventTime的資料集合,Either的資料結構,Right表示WaterMark和Left表示資料:
class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = ???
}
定義 StreamTableSource
我們自定義的Source要攜帶我們測試的資料,以及對應的WaterMark資料,具體如下:
class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
val fieldNames = Array("accessTime", "region", "userId")
val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
val rowType = new RowTypeInfo(
Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)
// 頁面訪問表資料 rows with timestamps and watermarks
val data = Seq(
Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
Right(1510365660000L),
Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
Right(1510365660000L),
Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
Right(1510366200000L),
Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
Right(1510366260000L),
Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
Right(1510373400000L)
)
override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
Collections.singletonList(new RowtimeAttributeDescriptor(
"accessTime",
new ExistingField("accessTime"),
PreserveWatermarks.INSTANCE))
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
}
override def getReturnType: TypeInformation[Row] = rowType
override def getTableSchema: TableSchema = schema
}
Sink 定義
我們簡單的將計算結果寫入到Apache Flink內建支援的CSVSink中,定義Sink如下:
def getCsvTableSink: TableSink[Row] = {
val tempFile = File.createTempFile("csv_sink_", "tem")
// 列印sink的檔案路徑,方便我們檢視執行結果
println("Sink path : " + tempFile)
if (tempFile.exists()) {
tempFile.delete()
}
new CsvTableSink(tempFile.getAbsolutePath).configure(
Array[String]("region", "winStart", "winEnd", "pv"),
Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
}
構建主程式
主程式包括執行環境的定義,Source/Sink的註冊以及統計查SQL的執行,具體如下:
def main(args: Array[String]): Unit = {
// Streaming 環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 設定EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//方便我們查出輸出資料
env.setParallelism(1)
val sourceTableName = "mySource"
// 建立自定義source資料結構
val tableSource = new MyTableSource
val sinkTableName = "csvSink"
// 建立CSV sink 資料結構
val tableSink = getCsvTableSink
// 註冊source
tEnv.registerTableSource(sourceTableName, tableSource)
// 註冊sink
tEnv.registerTableSink(sinkTableName, tableSink)
val result = tEnv.scan(sourceTableName)
.window(Tumble over 2.minute on 'accessTime as 'w)
.groupBy('w, 'region)
.select('region, 'w.start, 'w.end, 'region.count as 'pv)
result.insertInto(sinkTableName)
env.execute()
}
執行並檢視執行結果
執行主程式後我們會在控制檯得到Sink的檔案路徑,如下:
Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
Cat 方式檢視計算結果,如下:
jinchengsunjcdeMacBook-Pro:FlinkTable APIDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1
表格化如上結果:
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:12:00.0 | 2 |
ShangHai | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
ShangHai | 2017-11-11 04:10:00.0 | 2017-11-11 04:12:00.0 | 1 |
上面這個端到端的完整示例也可以應用到本篇前面介紹的其他運算元示例中,只是大家根據Source和Sink的Schema不同來進行相應的構建即可!
小結
本篇首先向大家介紹了什麼是Table API, Table API的核心特點,然後以此介紹Table API的核心運算元功能,並附帶了具體的測試資料和測試程式,最後以一個End-to-End的示例展示瞭如何編寫Apache Flink Table API的Job收尾。希望對大家學習Apache Flink Table API 過程中有所幫助。
關於點贊和評論
本系列文章難免有很多缺陷和不足,真誠希望讀者對有收穫的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!