1. 程式人生 > >Apache Flink 漫談系列(11) - Temporal Table JOIN

Apache Flink 漫談系列(11) - Temporal Table JOIN

什麼是Temporal Table

在《Apache Flink 漫談系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家詳細介紹什麼是Temporal Table JOIN。
ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的資料庫廠商也先後實現了這個標準。Temporal Table記錄了歷史上任何時間點所有的資料改動,Temporal Table的工作流程如下:
image
上圖示意Temporal Table具有普通table的特性,有具體獨特的DDL/DML/QUERY語法,時間是其核心屬性。歷史意味著時間,意味著快照Snapshot。

ANSI-SQL 2011 Temporal Table示例

我們以一個DDL和一套DML示例說明Temporal Table的原理,DDL定義PK是可選的,下面的示例我們以不定義PK的為例進行說明:

  • DDL 示例
CREATE TABLE Emp
ENo INTEGER,
Sys_Start TIMESTAMP(12) GENERATED
ALWAYS AS ROW Start,
Sys_end TIMESTAMP(12) GENERATED
ALWAYS AS ROW END,
EName VARCHAR(30),
PERIOD FOR SYSTEM_TIME (Sys_Start,Sys_end)
) WITH SYSTEM VERSIONING
  • DML 示例
  1. INSERT
INSERT INTO Emp (ENo, EName) VALUES (22217, 'Joe')

image

說明: 其中Sys_Start和Sys_End是資料庫系統預設填充的。

  1. UPDATE
UPDATE Emp SET EName = 'Tom' WHERE ENo = 22217 

image
 
說明: 假設是在 2012-02-03 10:00:00 執行的UPDATE,執行之後上一個值"Joe"的Sys_End值由9999-12-31 23:59:59 變成了 2012-02-03 10:00:00, 也就是下一個值"Tom"生效的開始時間。可見我們執行的是UPDATE但是資料庫裡面會存在兩條資料,資料值和有效期不同,也就是版本不同。

  1. DELETE (假設執行DELETE之前的表內容如下)
    image
DELETE FROM Emp WHERE ENo = 22217

image

說明: 假設我們是在2012-06-01 00:00:00執行的DELETE,則Sys_End值由9999-12-31 23:59:59 變成了 2012-06-01 00:00:00, 也就是在執行DELETE時候沒有真正的刪除符合條件的行,而是系統將符合條件的行的Sys_end修改為執行DELETE的操作時間。標識資料的有效期到DELETE執行那一刻為止。

  1. SELECT
SELECT ENo,EName,Sys_Start,Sys_End FROM Emp 
FOR SYSTEM_TIME AS OF TIMESTAMP '2011-01-02 00:00:00'

說明: 這個查詢會返回所有Sys_Start <= 2011-01-02 00:00:00 並且 Sys_end > 2011-01-02 00:00:00 的記錄。

SQLServer Temporal Table 示例

DDL

CREATE TABLE Department
(
DeptID int NOT NULL PRIMARY KEY CLUSTERED
, DeptName varchar(50) NOT NULL
, ManagerID INT NULL
, ParentDeptID int NULL
, SysStartTime datetime2 GENERATED ALWAYS AS ROW Start NOT NULL
, SysEndTime datetime2 GENERATED ALWAYS AS ROW END NOT NULL
, PERIOD FOR SYSTEM_TIME (SysStartTime,SysEndTime)
)
WITH (SYSTEM_VERSIONING = ON);

執行上面的語句,在資料庫會建立當前表和歷史表,如下圖:
image
Department 顯示是有版本控制的,歷史表是預設的名字,我也可以指定名字如:SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DepartmentHistory)。

DML

  • INSERT - 插入列不包含SysStartTime和SysEndTime列
INSERT INTO [dbo].[Department]([DeptID] ,[DeptName] ,[ManagerID] ,[ParentDeptID])
VALUES(10, 'Marketing', 101, 1);

執行之後我們分別查詢當前表和歷史表,如下圖:
image

我們第一條INSERT語句資料值的有效時間是操作那一刻2018-06-06 05:50:20.7913985 到永遠 9999-12-31 23:59:59.9999999,但這時刻歷史表還沒有任何資訊。我們接下來進行更新操作。

  • UPDATE
UPDATE [dbo].[Department] SET [ManagerID] = 501 WHERE [DeptID] = 10

執行之後當前表資訊會更新並在歷史表裡面產生一條歷史資訊,如下:

image

注意當前表的SysStartTime意見發生了變化,歷史表產生了一條記錄,SyStartTIme是原當前表記錄的SysStartTime,SysEndTime是當前表記錄的SystemStartTime。我們再更新一次:

UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10

image

到這裡我們瞭解到SQLServer裡面關於Temporal Table的邏輯是有當前表和歷史表來儲存資料,並且資料庫內部以StartTime和EndTime的方式管理資料的版本。

  • SELECT 
SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime]
FROM [dbo].[Department]
FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ;

image
SELECT語句查詢的是Department的表,實際返回的資料是從歷史表裡面查詢出來的,查詢的底層邏輯就是 SysStartTime <= '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000' 。

Apache Flink Temporal Table

我們不止一次的提到Apache Flink遵循ANSI-SQL標準,Apache Flink中Temporal Table的概念也源於ANSI-2011的標準語義,但目前的實現在語法層面和ANSI-SQL略有差別,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的語法,目前Apache Flink中使用 LATERAL TABLE(TemporalTableFunction)的語法。這一點後續需要推動社群進行改進。

為啥需要 Temporal Table

我們以具體的查詢示例來說明為啥需要Temporal Table,假設我們有一張實時變化的匯率表(RatesHistory),如下:

rowtime(ts) currency(pk) rate
09:00:00 US Dollar 102
09:00:00 Euro 114
09:00:00 Yen 1
10:45:00 Euro 116
11:15:00 Euro 119
11:49:00 Pounds 108

RatesHistory代表了Yen匯率(Yen匯率為1),是不斷變化的Append only的匯率表。例如,Euro兌Yen匯率從09:00至10:45的匯率為114。從10點45分到11點15分是116。

假設我們想在10:58輸出所有當前匯率,我們需要以下SQL查詢來計算結果表:

SELECT *
FROM RatesHistory AS r
WHERE r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = r.currency
  AND r2.rowtime <= '10:58');

相應Flink程式碼如下:

  • 定義資料來源-genRatesHistorySource
def genRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "rowtime ,currency   ,rate",
    "09:00:00   ,US Dollar  , 102",
    "09:00:00   ,Euro       , 114",
    "09:00:00  ,Yen        ,   1",
    "10:45:00   ,Euro       , 116",
    "11:15:00   ,Euro       , 119",
    "11:49:00   ,Pounds     , 108"
    )
    // 測試資料寫入臨時檔案
    val tempFilePath =
 writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")

    // 建立Source connector
    new CsvTableSource(
      tempFilePath,
      Array("rowtime","currency","rate"),
      Array(
        Types.STRING,Types.STRING,Types.STRING
      ),
      fieldDelim = ",",
      rowDelim = "$",
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }
  
  def writeToTempFile(
    contents: String,
    filePrefix: String,
    fileSuffix: String,
    charset: String = "UTF-8"): String = {
    val tempFile = File.createTempFile(filePrefix, fileSuffix)
    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
    tmpWriter.write(contents)
    tmpWriter.close()
    tempFile.getAbsolutePath
  }
  • 主程式程式碼
def main(args: Array[String]): Unit = {
    // Streaming 環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    //方便我們查出輸出資料
    env.setParallelism(1)

    val sourceTableName = "RatesHistory"
    // 建立CSV source資料結構
    val tableSource = CsvTableSourceUtils.genRatesHistorySource
    // 註冊source
    tEnv.registerTableSource(sourceTableName, tableSource)

    // 註冊retract sink
    val sinkTableName = "retractSink"
    val fieldNames = Array("rowtime", "currency", "rate")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING)

    tEnv.registerTableSink(
      sinkTableName,
      fieldNames,
      fieldTypes,
      new MemoryRetractSink)

    val SQL =
      """
        |SELECT *
        |FROM RatesHistory AS r
        |WHERE r.rowtime = (
        |  SELECT MAX(rowtime)
        |  FROM RatesHistory AS r2
        |  WHERE r2.currency = r.currency
        |  AND r2.rowtime <= '10:58:00'  )
      """.stripMargin

    // 執行查詢
    val result = tEnv.SQLQuery(SQL)

    // 將結果插入sink
    result.insertInto(sinkTableName)
    env.execute()
  }
  • 執行結果如下圖:

結果表格化一下:

rowtime(ts) currency(pk) rate
09:00:00 US Dollar 102
09:00:00 Yen 1
10:45:00 Euro 116

Temporal Table的概念旨在簡化此類查詢,加速它們的執行。Temporal Table是Append Only表上的引數化檢視,它把Append Only的表變化解釋為表的Changelog,並在特定時間點提供該表的版本(時間版本)。將Applend Only表解釋為changelog需要指定主鍵屬性和時間戳屬性。主鍵確定覆蓋哪些行,時間戳確定行有效的時間,也就是資料版本,與上面SQL Server示例的有效期的概念一致。

在上面的示例中,currency是RatesHistory表的主鍵,而rowtime是timestamp屬性。

如何定義Temporal Table

在Apache Flink中擴充套件了TableFunction的介面,在TableFunction介面的基礎上添加了時間屬性和pk屬性。

  • 內部TemporalTableFunction定義如下:
class TemporalTableFunction private(
    @transient private val underlyingHistoryTable: Table,
    // 時間屬性,相當於版本資訊
    private val timeAttribute: Expression,
    // 主鍵定義
    private val primaryKey: String,
    private val resultType: RowTypeInfo)
  extends TableFunction[Row] {
  ...
}
  • 使用者建立TemporalTableFunction方式
    Table中添加了createTemporalTableFunction方法,該方法需要傳入時間屬性和主鍵,介面定義如下:
// Creates TemporalTableFunction backed up by this table as a history table.

def createTemporalTableFunction(
      timeAttribute: Expression,
      primaryKey: Expression): TemporalTableFunction = {
   ...
}

使用者通過如下方式呼叫就可以得到一個TemporalTableFunction的例項,程式碼如下:

val tab = ...
val temporalTableFunction = tab.createTemporalTableFunction('time, 'pk)
...

案例程式碼

  • 需求描述
    假設我們有一張訂單表Orders和一張匯率表Rates,那麼訂單來自於不同的地區,所以支付的幣種各不一樣,那麼假設需要統計每個訂單在下單時候Yen幣種對應的金額。
  • Orders 資料
amount currency order_time
2 Euro 2
1 US Dollar 3
50 Yen 4
3 Euro 5
  • Rates 資料
currency rate rate_time
US Dollar 102 1
Euro 114 1
Yen 1 1
Euro 116 5
Euro 117 7
  • 統計需求對應的SQL
SELECT o.currency, o.amount, r.rate
  o.amount * r.rate AS yen_amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
  • 預期結果
currency amount rate yen_amount
US Dollar 1 102 102
Yen 50 1 50
Euro 2 114 228
Euro 3 116 348

Without connnector 實現程式碼

object TemporalTableJoinTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    env.setParallelism(1)
// 設定時間型別是 event-time  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 構造訂單資料
    val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
    ordersData.+=((2L, "Euro", new Timestamp(2L)))
    ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
    ordersData.+=((50L, "Yen", new Timestamp(4L)))
    ordersData.+=((3L, "Euro", new Timestamp(5L)))

    //構造匯率資料
    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))

// 進行訂單表 event-time 的提取
    val orders = env
      .fromCollection(ordersData)
      .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]())
      .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)

// 進行匯率表 event-time 的提取
    val ratesHistory = env
      .fromCollection(ratesHistoryData)
      .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]())
      .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)

// 註冊訂單表和匯率表
    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("RatesHistory", ratesHistory)
    val tab = tEnv.scan("RatesHistory");
// 建立TemporalTableFunction
    val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency)
//註冊TemporalTableFunction
tEnv.registerFunction("Rates",temporalTableFunction)

    val SQLQuery =
      """
        |SELECT o.currency, o.amount, r.rate,
        |  o.amount * r.rate AS yen_amount
        |FROM
        |  Orders AS o,
        |  LATERAL TABLE (Rates(o.rowtime)) AS r
        |WHERE r.currency = o.currency
        |""".stripMargin

    tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery))

    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    // 列印查詢結果
    result.print()
    env.execute()
  }

}

在執行上面程式碼之前需要注意上面程式碼中對EventTime時間提取的過程,也就是說Apache Flink的TimeCharacteristic.EventTime 模式,需要呼叫assignTimestampsAndWatermarks方法設定EventTime的生成方式,這種方式也非常靈活,使用者可以控制業務資料的EventTime的值和WaterMark的產生,WaterMark相關內容可以查閱《Apache Flink 漫談系列(03) - Watermark》。 在本示例中提取EventTime的完整程式碼如下:

import java.SQL.Timestamp

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time

class OrderTimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

檢視執行結果:

With CSVConnector 實現程式碼

在實際的生產開發中,都需要實際的Connector的定義,下面我們以CSV格式的Connector定義來開發Temporal Table JOIN Demo。

  • genEventRatesHistorySource
def genEventRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#rate",
      "1#US Dollar#102",
      "1#Euro#114",
      "1#Yen#1",
      "3#Euro#116",
      "5#Euro#119",
      "7#Pounds#108"
    )
    // 測試資料寫入臨時檔案
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")

    // 建立Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency","rate"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }
  • genRatesOrderSource
def genRatesOrderSource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#amount",
      "2#Euro#10",
      "4#Euro#10"
    )
    // 測試資料寫入臨時檔案
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")

    // 建立Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency", "amount"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }
  • 主程式程式碼
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.book.connectors

import java.io.File

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.book.utils.{CommonUtils, FileUtils}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row

object CsvTableSourceUtils {

  def genWordCountSource: CsvTableSource = {
    val csvRecords = Seq(
      "words",
      "Hello Flink",
      "Hi, Apache Flink",
      "Apache FlinkBook"
    )
    // 測試資料寫入臨時檔案
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")

    // 建立Source connector
    new CsvTableSource(
      tempFilePath,
      Array("words"),
      Array(
        Types.STRING
      ),
      fieldDelim = "#",
      rowDelim = "$",
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }


  def genRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "rowtime ,currency   ,rate",
    "09:00:00   ,US Dollar  , 102",
    "09:00:00   ,Euro       , 114",
    "09:00:00  ,Yen        ,   1",
    "10:45:00   ,Euro       , 116",
    "11:15:00   ,Euro       , 119",
    "11:49:00   ,Pounds     , 108"
    )
    // 測試資料寫入臨時檔案
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")

    // 建立Source connector
    new CsvTableSource(
      tempFilePath,
      Array("rowtime","currency","rate"),
      Array(
        Types.STRING,Types.STRING,Types.STRING
      ),
      fieldDelim = ",",
      rowDelim = "$",
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

  def genEventRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#rate",
      "1#US Dollar#102",
      "1#Euro#114",
      "1#Yen#1",
      "3#Euro#116",
      "5#Euro#119",
      "7#Pounds#108"
    )
    // 測試資料寫入臨時檔案
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")

    // 建立Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency","rate"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

  def genRatesOrderSource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#amount",
      "2#Euro#10",
      "4#Euro#10"
    )
    // 測試資料寫入臨時檔案
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")

    // 建立Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency", "amount"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }


  /**
    * Example:
    * genCsvSink(
    *   Array[String]("word", "count"),
    *   Array[TypeInformation[_] ](Types.STRING, Types.LONG))
    */
  def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes)
  }

}

執行結果如下 :
evernotecid://DF540B8F-9ABE-4939-8B19-D32F619CF5B7/appyinxiangcom/7029820/ENResource/p399image

內部實現原理

我們還是以訂單和匯率關係示例來說明Apache Flink內部實現Temporal Table JOIN的原理,如下圖所示:
image

Temporal Table JOIN vs 雙流JOIN vs Lateral JOIN

在《Apache Flink 漫談系列(09) - JOIN運算元》中我們介紹了雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL 》中我們介紹了 JOIN LATERAL(TableFunction),那麼本篇介紹的Temporal Table JOIN和雙流JOIN/JOIN LATERAL(TableFunction)有什麼本質區別呢?

  • 雙流JOIN - 雙流JOIN本質很明確是 Stream JOIN Stream,雙流驅動。
  • LATERAL JOIN - Lateral JOIN的本質是Steam JOIN Table Function, 是單流驅動。
  • Temporal Table JOIN - Temporal Table JOIN 的本質就是 Stream JOIN Temporal Table 或者 Stream JOIN Table with snapshot。Temporal Table JOIN 特點
    單流驅動,Temporal Table 是被動查詢。

Temporal Table JOIN vs  LATERAL JOIN

從功能上說Temporal Table JOIN和 LATERAL JOIN都是由左流一條資料獲取多行資料,也就是單流驅動,並且都是被動查詢,那麼Temporal JOIN和LATERAL JOIN最本質的區別是什麼呢?這裡我們說最關鍵的一點是 State 的管理,LATERAL JOIN是一個TableFunction,不具備state的管理能力,資料不具備版本特性。而Temporal Table JOIN是一個具備版本資訊的資料表。

Temporal Table JOIN vs 雙流 JOIN

Temporal Table JOIN 和 雙流 JOIN都可以管理State,那麼他們的本質區別是什麼? 那就是計算驅動的差別,Temporal Table JOIN是單邊驅動,Temporal Table是被動的查詢,而雙流JOIN是雙邊驅動,兩邊都是主動的進行JOIN計算。

Temporal Table JOIN改進

個人認為Apache Flink的Temporal Table JOIN功能不論在語法和語義上面都要遵循ANSI-SQL標準,後期會推動社群在Temporal Table上面支援ANSI-SQL的FOR SYSTEM_TIME AS OF標準語法。改進後的處理邏輯示意圖:
image

其中cache是一種效能考慮的優化,詳細內容待社群完善後再細述。

小結

本篇結合ANSI-SQL標準和SQL Server對Temporal Table的支援來開篇,然後介紹目前Apache Flink對Temporal Table的支援現狀,以程式碼示例和內部處理邏輯示意圖的方式讓大家直觀體驗Temporal Table JOIN的語法和語義。

關於點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收穫的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!