1. 程式人生 > 實用技巧 >Flink基礎(十四):Table API 和 Flink SQL(三)流處理中的特殊概念

Flink基礎(十四):Table API 和 Flink SQL(三)流處理中的特殊概念

  Table API 和 SQL,本質上還是基於關係型表的操作方式;而關係型表、關係代數,以及SQL 本身,一般是有界的,更適合批處理的場景。這就導致在進行流處理的過程中,理解會 稍微複雜一些,需要引入一些特殊概念。

1 流處理和關係代數(表,及 SQL)的區別

可以看到,其實關係代數(主要就是指關係型資料庫中的表)和 SQL,主要就是針對批處理的,這和流處理有天生的隔閡。

2 動態表(Dynamic Tables)

  因為流處理面對的資料,是連續不斷的,這和我們熟悉的關係型資料庫中儲存的“表”完全不同。所以,如果我們把流資料轉換成 Table,然後執行類似於 table 的 select 操作,結 果就不是一成不變的,而是隨著新資料的到來,會不停更新。   我們可以隨著新資料的到來,不停地在之前的基礎上更新結果。這樣得到的表,在 FlinkTable API 概念裡,就叫做“動態表”(Dynamic Tables)。   動態表是 Flink 對流資料的 Table API 和 SQL 支援的核心概念。與表示批處理資料的靜態表不同,動態表是隨時間變化的。動態表可以像靜態的批處理表一樣進行查詢,查詢一個動 態表會產生持續查詢(Continuous Query)。連續查詢永遠不會終止,並會生成另一個動態表。查詢(Query)會不斷更新其動態結果表,以反映其動態輸入表上的更改。

3 流式持續查詢的過程

  下圖顯示了流、動態表和連續查詢的關係:
流式持續查詢的過程為: 1. 流被轉換為動態表。 2. 對動態表計算連續查詢,生成新的動態表。 3. 生成的動態表被轉換回流。 3.1 將流轉換成表(Table)   為了處理帶有關係查詢的流,必須先將其轉換為表。   從概念上講,流的每個資料記錄,都被解釋為對結果表的插入(Insert)修改。因為流式持續不斷的,而且之前的輸出結果無法改變。本質上,我們其實是從一個、只有插入操作 的 changelog(更新日誌)流,來構建一個表。   為了更好地說明動態表和持續查詢的概念,我們來舉一個具體的例子。   比如,我們現在的輸入資料,就是使用者在網站上的訪問行為,資料型別(Schema)如下:
[
 user: VARCHAR, 
// 使用者名稱 cTime: TIMESTAMP, // 訪問某個 URL 的時間戳 url: VARCHAR // 使用者訪問的 URL ]
下圖顯示瞭如何將訪問 URL 事件流,或者叫點選事件流(左側)轉換為表(右側)。 隨著插入更多的訪問事件流記錄,生成的表將不斷增長。 3.2 持續查詢(Continuous Query)   持續查詢,會在動態表上做計算處理,並作為結果生成新的動態表。與批處理查詢不同,連續查詢從不終止,並根據輸入表上的更新更新其結果表。   在任何時間點,連續查詢的結果在語義上,等同於在輸入表的快照上,以批處理模式執行的同一查詢的結果。   在下面的示例中,我們展示了對點選事件流中的一個持續查詢。   這個 Query 很簡單,是一個分組聚合做 count 統計的查詢。它將使用者欄位上的 clicks 表分組,並統計訪問的 url 數。圖中顯示了隨著時間的推移,當 clicks 表被其他行更新時如何 計算查詢。 3.3 將動態錶轉換成流   與常規的資料庫表一樣,動態表可以通過插入(Insert)、更新(Update)和刪除(Delete)更改,進行持續的修改。將動態錶轉換為流或將其寫入外部系統時,需要對這些更改進行編 碼。Flink 的 Table API 和 SQL 支援三種方式對動態表的更改進行編碼:   1)僅追加(Append-only)流 僅通過插入(Insert)更改,來修改的動態表,可以直接轉換為“僅追加”流。這個流中發出的資料,就是動態表中新增的每一行。   2)撤回(Retract)流   Retract 流是包含兩類訊息的流,新增(Add)訊息和撤回(Retract)訊息。動態表通過將 INSERT 編碼為 add 訊息、DELETE 編碼為 retract 訊息、UPDATE 編碼為被 更改行(前一行)的 retract 訊息和更新後行(新行)的 add 訊息,轉換為 retract 流。   下圖顯示了將動態錶轉換為 Retract 流的過程。

3)Upsert(更新插入)流   Upsert 流包含兩種型別的訊息:Upsert 訊息和 delete 訊息。轉換為 upsert 流的動態表,需要有唯一的鍵(key)。   通過將 INSERT 和 UPDATE 更改編碼為 upsert 訊息,將 DELETE 更改編碼為 DELETE 訊息,就可以將具有唯一鍵(Unique Key)的動態錶轉換為流。   下圖顯示了將動態錶轉換為 upsert 流的過程。

  這些概念我們之前都已提到過。需要注意的是,在程式碼裡將動態錶轉換為 DataStream時,僅支援 Append 和 Retract 流。而向外部系統輸出動態表的 TableSink 介面,則可以有不 同的實現,比如之前我們講到的 ES,就可以有 Upsert 模式。

4 時間特性

  基於時間的操作(比如 Table API 和 SQL 中視窗操作),需要定義相關的時間語義和時間資料來源的資訊。所以,Table 可以提供一個邏輯上的時間欄位,用於在表處理程式中,指 示時間和訪問相應的時間戳。   時間屬性,可以是每個表 schema 的一部分。一旦定義了時間屬性,它就可以作為一個欄位引用,並且可以在基於時間的操作中使用。   時間屬性的行為類似於常規時間戳,可以訪問,並且進行計算。 4.1 處理時間(Processing Time)   處理時間語義下,允許表處理程式根據機器的本地時間生成結果。它是時間的最簡單概念。它既不需要提取時間戳,也不需要生成 watermark。   定義處理時間屬性有三種方法:在 DataStream 轉化時直接指定;在定義 Table Schema時指定;在建立表的 DDL 中指定。   1) DataStream 轉化成 Table 時指定   由 DataStream 轉換成表時,可以在後面指定欄位名來定義 Schema。在定義 Schema 期間,可以使用.proctime,定義處理時間欄位。   注意,這個 proctime 屬性只能通過附加邏輯欄位,來擴充套件物理 schema。因此,只能在schema 定義的末尾定義它。 程式碼如下:
// 定義好 DataStream
val inputStream: DataStream[String] = env.readTextFile("\\sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
 .map(data => {
 val dataArray = data.split(",")
 SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
 })
// 將 DataStream 轉換為 Table,並指定時間欄位
val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 
'timestamp, 'pt.proctime)
2) 定義 Table Schema 時指定   這種方法其實也很簡單,只要在定義 Schema 的時候,加上一個新的欄位,並指定成proctime 就可以了。   程式碼如下:
tableEnv.connect(
 new FileSystem().path("..\\sensor.txt"))
 .withFormat(new Csv())
 .withSchema(new Schema()
 .field("id", DataTypes.STRING())
 .field("timestamp", DataTypes.BIGINT())
 .field("temperature", DataTypes.DOUBLE())
 .field("pt", DataTypes.TIMESTAMP(3))
 .proctime() // 指定 pt 欄位為處理時間
 ) // 定義表結構
 .createTemporaryTable("inputTable") // 建立臨時表
3) 建立表的 DDL 中指定 在建立表的 DDL 中,增加一個欄位並指定成 proctime,也可以指定當前的時間欄位。 程式碼如下:
val sinkDDL: String =
 """
 |create table dataTable (
 | id varchar(20) not null,
 | ts bigint,
 | temperature double,
 | pt AS PROCTIME()
 |) with (
 | 'connector.type' = 'filesystem',
 | 'connector.path' = 'file:///D:\\..\\sensor.txt',
 | 'format.type' = 'csv'
 |)
 """.stripMargin
tableEnv.sqlUpdate(sinkDDL) // 執行 DDL
注意:執行這段 DDL,必須使用 Blink Planner。 4.2 事件時間(Event Time)   事件時間語義,允許表處理程式根據每個記錄中包含的時間生成結果。這樣即使在有亂序事件或者延遲事件時,也可以獲得正確的結果。   為了處理無序事件,並區分流中的準時和遲到事件;Flink 需要從事件資料中,提取時間戳,並用來推進事件時間的進展(watermark)。 1) DataStream 轉化成 Table 時指定 在DataStream轉換成Table,schema的定義期間,使用.rowtime可以定義事件時間屬性。注意,必須在轉換的資料流中分配時間戳和 watermark。   在將資料流轉換為表時,有兩種定義時間屬性的方法。根據指定的.rowtime 欄位名是否存在於資料流的架構中,timestamp 欄位可以:     ⚫ 作為新欄位追加到 schema     ⚫ 替換現有欄位 在這兩種情況下,定義的事件時間戳欄位,都將儲存 DataStream 中事件時間戳的值。 程式碼如下:
val inputStream: DataStream[String] = env.readTextFile("\\sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
 .map(data => {
 val dataArray = data.split(",")
 SensorReading(dataArray(0), dataArray(1).toLong, 
dataArray(2).toDouble)
 })
 .assignAscendingTimestamps(_.timestamp * 1000L)
// 將 DataStream 轉換為 Table,並指定時間欄位
val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 
'temperature)
// 或者,直接追加欄位
val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 
'timestamp, 'rt.rowtime)
2) 定義 Table Schema 時指定   這種方法只要在定義 Schema 的時候,將事件時間欄位,並指定成 rowtime 就可以了。   程式碼如下:
tableEnv.connect(
 new FileSystem().path("sensor.txt"))
 .withFormat(new Csv())
 .withSchema(new Schema()
 .field("id", DataTypes.STRING())
 .field("timestamp", DataTypes.BIGINT())
 .rowtime(
 new Rowtime()
 .timestampsFromField("timestamp") // 從欄位中提取時間戳
 .watermarksPeriodicBounded(1000) // watermark 延遲 1 秒
 )
 .field("temperature", DataTypes.DOUBLE())
 ) // 定義表結構
 .createTemporaryTable("inputTable") // 建立臨時表
3) 建立表的 DDL 中指定   事件時間屬性,是使用 CREATE TABLE DDL 中的 WARDMARK 語句定義的。watermark 語句,定義現有事件時間欄位上的 watermark 生成表示式,該表示式將事件時間欄位標記為事 件時間屬性。 程式碼如下:
val sinkDDL: String =
"""
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ),
| watermark for rt as rt - interval '1' second
|) with (
| 'connector.type' = 'filesystem',
| 'connector.path' = 'file:///D:\\..\\sensor.txt',
| 'format.type' = 'csv'
|)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL) // 執行 DDL
  這裡 FROM_UNIXTIME 是系統內建的時間函式,用來將一個整數(秒數)轉換成“YYYY-MM-DD hh:mm:ss”格式(預設,也可以作為第二個 String 引數傳入)的日期時間 字串(date time string);然後再用 TO_TIMESTAMP 將其轉換成 Timestamp。