1. 程式人生 > >Apache Flink 漫談系列(13) - Table API 概述

Apache Flink 漫談系列(13) - Table API 概述

什麼是Table API

《Apache Flink 漫談系列(08) - SQL概覽》中我們概要的向大家介紹了什麼是好SQL,SQL和Table API是Apache Flink中的同一層次的API抽象,如下圖所示:
image

Apache Flink 針對不同的使用者場景提供了三層使用者API,最下層ProcessFunction API可以對State,Timer等複雜機制進行有效的控制,但使用者使用的便捷性很弱,也就是說即使很簡單統計邏輯,也要較多的程式碼開發。第二層DataStream API對視窗,聚合等運算元進行了封裝,使用者的便捷性有所增強。最上層是SQL/Table API,Table API是Apache Flink中的宣告式,可被查詢優化器優化的高階分析API。

Table API的特點

Table API和SQL都是Apache Flink中最高層的分析API,SQL所具備的特點Table API也都具有,如下:

  • 宣告式 - 使用者只關心做什麼,不用關心怎麼做;
  • 高效能 - 支援查詢優化,可以獲取最好的執行效能;
  • 流批統一 - 相同的統計邏輯,既可以流模式執行,也可以批模式執行;
  • 標準穩定 - 語義遵循SQL標準,語法語義明確,不易變動。

當然除了SQL的特性,因為Table API是在Flink中專門設計的,所以Table API還具有自身的特點:

  • 表達方式的擴充套件性 - 在Flink中可以為Table API開發很多便捷性功能,如:Row.flatten(), map/flatMap 等
  • 功能的擴充套件性 - 在Flink中可以為Table API擴充套件更多的功能,如:Iteration,flatAggregate 等新功能
  • 編譯檢查 - Table API支援java和scala語言開發,支援IDE中進行編譯檢查。

說明:上面說的map/flatMap/flatAggregate都是Apache Flink 社群 FLIP-29 中規劃的新功能。

HelloWorld

在介紹Table API所有運算元之前我們先編寫一個簡單的HelloWorld來直觀瞭解如何進行Table API的開發。

Maven 依賴

在pom檔案中增加如下配置,本篇以flink-1.7.0功能為準進行後續介紹。

<properties>
    <table.version>1.7.0</table.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

  </dependencies>

程式結構

在編寫第一Flink Table API job之前我們先簡單瞭解一下Flink Table API job的結構,如下圖所示:
image

  1. 外部資料來源,比如Kafka, Rabbitmq, CSV 等等;
  2. 查詢計算邏輯,比如最簡單的資料匯入select,雙流Join,Window Aggregate 等;
  3. 外部結果儲存,比如Kafka,Cassandra,CSV等。

說明:1和3 在Apache Flink中統稱為Connector。

主程式

我們以一個統計單詞數量的業務場景,編寫第一個HelloWorld程式。
根據上面Flink job基本結構介紹,要Table API完成WordCount的計算需求,我們需要完成三部分程式碼:

  • TableSoruce Code - 用於建立資料來源的程式碼
  • Table API Query - 用於進行word count統計的Table API 查詢邏輯
  • TableSink Code - 用於儲存word count計算結果的結果表程式碼

執行模式選擇

一個job我們要選擇是Stream方式執行還是Batch模式執行,所以任何統計job的第一步是進行執行模式選擇,如下我們選擇Stream方式執行。

// Stream執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

構建測試Source

我們用最簡單的構建Source方式進行本次測試,程式碼如下:

 // 測試資料
 val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
 // 最簡單的獲取Source方式
val source = env.fromCollection(data).toTable(tEnv, 'word)

WordCount 統計邏輯

WordCount核心統計邏輯就是按照單詞分組,然後計算每個單詞的數量,統計邏輯如下:

 // 單詞統計核心邏輯
 val result = source
   .groupBy('word) // 單詞分組
   .select('word, 'word.count) // 單詞統計

定義Sink

將WordCount的統計結果寫入Sink中,程式碼如下:

// 自定義Sink
    val sink = new RetractSink // 自定義Sink(下面有完整程式碼)
    // 計算結果寫入sink
    result.toRetractStream[(String, Long)].addSink(sink)

完整的HelloWord程式碼

為了方便大家執行WordCount查詢統計,將完整的程式碼分享大家(基於flink-1.7.0),如下:

import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

import scala.collection.mutable

object HelloWord {

  def main(args: Array[String]): Unit = {
    // 測試資料
    val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")

    // Stream執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    // 最簡單的獲取Source方式
    val source = env.fromCollection(data).toTable(tEnv, 'word)

    // 單詞統計核心邏輯
    val result = source
      .groupBy('word) // 單詞分組
      .select('word, 'word.count) // 單詞統計

    // 自定義Sink
    val sink = new RetractSink
    // 計算結果寫入sink
    result.toRetractStream[(String, Long)].addSink(sink)

    env.execute
  }
}

class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
  private var resultSet: mutable.Set[(String, Long)] = _

  override def open(parameters: Configuration): Unit = {
    // 初始化記憶體儲存結構
    resultSet = new mutable.HashSet[(String, Long)]
  }

  override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
    if (v._1) {
      // 計算資料
      resultSet.add(v._2)
    }
    else {
      // 撤回資料
      resultSet.remove(v._2)
    }
  }

  override def close(): Unit = {
    // 列印寫入sink的結果資料
    resultSet.foreach(println)
  }
}

執行結果如下:

雖然上面用了較長的紙墨介紹簡單的WordCount統計邏輯,但source和sink部分都是可以在學習後面運算元中被複用的。本例核心的統計邏輯只有一行程式碼:
source.groupBy('word).select('word, 'word.count)
所以Table API開發技術任務非常的簡潔高效。

Table API 運算元

雖然Table API與SQL的運算元語義一致,但在表達方式上面SQL以文字的方式展現,Table API是以java或者scala語言的方式進行開發。為了大家方便閱讀,即便是在《Apache Flink 漫談系列(08) - SQL概覽》中介紹過的運算元,在這裡也會再次進行介紹,當然對於Table API和SQL不同的地方會進行詳盡介紹。

示例資料及測試類

測試資料

  • customer_tab 表 - 客戶表儲存客戶id,客戶姓名和客戶描述資訊。欄位及測試資料如下:
c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
  • order_tab 表 - 訂單表儲存客戶購買的訂單資訊,包括訂單id,訂單時間和訂單描述資訊。 欄位節測試資料如下:
o_id c_id o_time o_desc
o_oo1 c_002 2018-11-05 10:01:01 iphone
o_002 c_001 2018-11-05 10:01:55 ipad
o_003 c_001 2018-11-05 10:03:44 flink book
  • Item_tab
    商品表, 攜帶商品id,商品型別,出售時間,價格等資訊,具體如下:
itemID itemType onSellTime price
ITEM001 Electronic 2017-11-11 10:01:00 20
ITEM002 Electronic 2017-11-11 10:02:00 50
ITEM003 Electronic 2017-11-11 10:03:00 30
ITEM004 Electronic 2017-11-11 10:03:00 60
ITEM005 Electronic 2017-11-11 10:05:00 40
ITEM006 Electronic 2017-11-11 10:06:00 20
ITEM007 Electronic 2017-11-11 10:07:00 70
ITEM008 Clothes 2017-11-11 10:08:00 20
  • PageAccess_tab
    頁面訪問表,包含使用者ID,訪問時間,使用者所在地域資訊,具體資料如下:
region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00
  • PageAccessCount_tab
    頁面訪問表,訪問量,訪問時間,使用者所在地域資訊,具體資料如下:
region userCount accessTime
ShangHai 100 2017.11.11 10:01:00
BeiJing 86 2017.11.11 10:01:00
BeiJing 210 2017.11.11 10:06:00
BeiJing 33 2017.11.11 10:10:00
ShangHai 129 2017.11.11 12:10:00
  • PageAccessSession_tab
    頁面訪問表,訪問量,訪問時間,使用者所在地域資訊,具體資料如下:
region userId accessTime
ShangHai U0011 2017-11-11 10:01:00
ShangHai U0012 2017-11-11 10:02:00
ShangHai U0013 2017-11-11 10:03:00
ShangHai U0015 2017-11-11 10:05:00
ShangHai U0011 2017-11-11 10:10:00
BeiJing U0110 2017-11-11 10:10:00
ShangHai U2010 2017-11-11 10:11:00
ShangHai U0410 2017-11-11 12:16:00

測試類

我們建立一個TableAPIOverviewITCase.scala 用於接下來介紹Flink Table API運算元的功能體驗。程式碼如下:

import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class Table APIOverviewITCase {

  // 客戶表資料
  val customer_data = new mutable.MutableList[(String, String, String)]
  customer_data.+=(("c_001", "Kevin", "from JinLin"))
  customer_data.+=(("c_002", "Sunny", "from JinLin"))
  customer_data.+=(("c_003", "JinCheng", "from HeBei"))


  // 訂單表資料
  val order_data = new mutable.MutableList[(String, String, String, String)]
  order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
  order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
  order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))

  // 商品銷售表資料
  val item_data = Seq(
    Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
    Right((1510365720000L)),
    Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
    Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
    Right((1510365780000L)),
    Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
    Right((1510365900000L)),
    Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
    Right((1510365960000L)),
    Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
    Right((1510366020000L)),
    Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
    Right((151036608000L)))

  // 頁面訪問表資料
  val pageAccess_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
    Right((1510365660000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
    Right((1510366260000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
    Right((1510373400000L)))

  // 頁面訪問量表資料2
  val pageAccessCount_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
    Right((1510365660000L)),
    Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
    Right((1510366200000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
    Right((1510373400000L)))

  // 頁面訪問表資料3
  val pageAccessSession_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
    Right((1510365720000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
    Right((1510365720000L)),
    Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
    Right((1510365900000L)),
    Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
    Right((1510366260000L)),
    Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
    Right((1510373760000L)))

  val _tempFolder = new TemporaryFolder

  // Streaming 環境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = TableEnvironment.getTableEnvironment(env)
  env.setParallelism(1)
  env.setStateBackend(getStateBackend)

  def getProcTimeTables(): (Table, Table) = {
    // 將order_tab, customer_tab 註冊到catalog
    val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
    val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
    (customer, order)
  }


  def getEventTimeTables():  (Table, Table, Table, Table) = {
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 將item_tab, pageAccess_tab 註冊到catalog
    val item =
      env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
      .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)

    val pageAccess =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    val pageAccessCount =
      env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
      .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)

    val pageAccessSession =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    (item, pageAccess, pageAccessCount, pageAccessSession)
  }


  @Rule
  def tempFolder: TemporaryFolder = _tempFolder

  def getStateBackend: StateBackend = {
    new MemoryStateBackend()
  }

  def procTimePrint(result: Table): Unit = {
    val sink = new RetractingSink
    result.toRetractStream[Row].addSink(sink)
    env.execute()
  }

  def rowTimePrint(result: Table): Unit = {
    val sink = new RetractingSink
    result.toRetractStream[Row].addSink(sink)
    env.execute()
  }

  @Test
  def testProc(): Unit = {
    val (customer, order) = getProcTimeTables()
    val result = ...// 測試的查詢邏輯
    procTimePrint(result)
  }

  @Test
  def testEvent(): Unit = {
    val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables()
    val result = ...// 測試的查詢邏輯
    procTimePrint(result)
  }

}

// 自定義Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
  var retractedResults: ArrayBuffer[String] = null


  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    retractedResults = mutable.ArrayBuffer.empty[String]
  }

  def invoke(v: (Boolean, Row)) {
    retractedResults.synchronized {
      val value = v._2.toString
      if (v._1) {
        retractedResults += value
      } else {
        val idx = retractedResults.indexOf(value)
        if (idx >= 0) {
          retractedResults.remove(idx)
        } else {
          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
                                       "This is probably an incorrectly implemented test. " +
                                       "Try to set the parallelism of the sink to 1.")
        }
      }
    }

  }

  override def close(): Unit = {
    super.close()
    retractedResults.sorted.foreach(println(_))
  }
}

// Water mark 生成器
class EventTimeSourceFunction[T](
  dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }

  override def cancel(): Unit = ???
}

SELECT

SELECT 用於從資料集/流中選擇資料,語義是關係代數中的投影(Projection),對關係進行垂直分割,消去或增加某些列, 如下圖所示:
image

Table API 示例

customer_tab選擇使用者姓名,並用內建的CONCAT函式拼接客戶資訊,如下:

val result = customer
  .select('c_name, concat_ws('c_name, " come ", 'c_desc))

Result

c_name desc
Kevin Kevin come from JinLin
Sunny Sunny come from JinLin
Jincheng Jincheng come from HeBei

特別說明

大家看到在 SELECT 不僅可以使用普通的欄位選擇,還可以使用ScalarFunction,當然也包括User-Defined Function,同時還可以進行欄位的alias設定。其實SELECT可以結合聚合,在GROUPBY部分會進行介紹,一個比較特殊的使用場景是去重的場景,示例如下:

Table API示例

在訂單表查詢所有的客戶id,消除重複客戶id, 如下:

val result = order
  .groupBy('c_id)
  .select('c_id)

Result

c_id
c_001
c_002

WHERE

WHERE 用於從資料集/流中過濾資料,與SELECT一起使用,語義是關係代數的Selection,根據某些條件對關係做水平分割,即選擇符合條件的記錄,如下所示:
image

Table API 示例

customer_tab查詢客戶id為c_001c_003的客戶資訊,如下:

 val result = customer
   .where("c_id = 'c_001' || c_id = 'c_003'")
   .select( 'c_id, 'c_name, 'c_desc)

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

特別說明

我們發現WHERE是對滿足一定條件的資料進行過濾,WHERE支援=, <, >, <>, >=, <=以及&&||等表示式的組合,最終滿足過濾條件的資料會被選擇出來。 SQL中的INNOT IN在Table API裡面用intersectminus描述(flink-1.7.0版本)。

Intersect 示例

Intersect只在Batch模式下進行支援,Stream模式下我們可以利用雙流JOIN來實現,如:在customer_tab查詢已經下過訂單的客戶資訊,如下:

 // 計算客戶id,並去重
 val distinct_cids = order
   .groupBy('c_id) // 去重
   .select('c_id as 'o_c_id)

 val result = customer
   .join(distinct_cids, 'c_id === 'o_c_id)
   .select('c_id, 'c_name, 'c_desc)

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin

Minus 示例

Minus只在Batch模式下進行支援,Stream模式下我們可以利用雙流JOIN來實現,如:在customer_tab查詢沒有下過訂單的客戶資訊,如下:

 // 查詢下過訂單的客戶id,並去重
 val distinct_cids = order
   .groupBy('c_id)
   .select('c_id as 'o_c_id)
   
// 查詢沒有下過訂單的客戶資訊
val result = customer
  .leftOuterJoin(distinct_cids, 'c_id === 'o_c_id)
  .where('o_c_id isNull)
  .select('c_id, 'c_name, 'c_desc)
  

說明上面實現邏輯比較複雜,我們後續考慮如何在流上支援更簡潔的方式。

Result

c_id c_name c_desc
c_003 JinCheng from HeBei

Intersect/Minus與關係代數

如上介紹Intersect是關係代數中的Intersection, Minus是關係代數的Difference, 如下圖示意:

  • Intersect(Intersection)
    image
  • Minus(Difference)
    image

GROUP BY

GROUP BY 是對資料進行分組的操作,比如我需要分別計算一下一個學生表裡面女生和男生的人數分別是多少,如下:
image

Table API 示例

將order_tab資訊按c_id分組統計訂單數量,簡單示例如下:

val result = order
  .groupBy('c_id)
  .select('c_id, 'o_id.count)

Result

c_id o_count
c_001 2
c_002 1

特別說明

在實際的業務場景中,GROUP BY除了按業務欄位進行分組外,很多時候使用者也可以用時間來進行分組(相當於劃分視窗),比如統計每分鐘的訂單數量:

Table API 示例

按時間進行分組,查詢每分鐘的訂單數量,如下:

val result = order
  .select('o_id, 'c_id, 'o_time.substring(1, 16) as 'o_time_min)
  .groupBy('o_time_min)
  .select('o_time_min, 'o_id.count)

Result

o_time_min o_count
2018-11-05 10:01 2
2018-11-05 10:03 1

說明:如果我們時間欄位是timestamp型別,建議使用內建的 DATE_FORMAT 函式。

UNION ALL

UNION ALL 將兩個表合併起來,要求兩個表的欄位完全一致,包括欄位型別、欄位順序,語義對應關係代數的Union,只是關係代數是Set集合操作,會有去重複操作,UNION ALL 不進行去重,如下所示:
image

Table API 示例

我們簡單的將customer_tab查詢2次,將查詢結果合併起來,如下:

val result = customer.unionAll(customer)

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION ALL 對結果資料不進行去重,如果想對結果資料進行去重,傳統資料庫需要進行UNION操作。

UNION

UNION 將兩個流給合併起來,要求兩個流的欄位完全一致,包括欄位型別、欄位順序,並其UNION 不同於UNION ALL,UNION會對結果資料去重,與關係代數的Union語義一致,如下:
image

Table API 示例

我們簡單的將customer_tab查詢2次,將查詢結果合併起來,如下:

val result = customer.union(customer)

我們發現完全一樣的表資料進行 UNION之後,資料是被去重的,UNION之後的資料並沒有增加。

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION 對結果資料進行去重,在實際的實現過程需要對資料進行排序操作,所以非必要去重情況請使用UNION ALL操作。

JOIN

JOIN 用於把來自兩個表的行聯合起來形成一個寬表,Apache Flink支援的JOIN型別:

  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN
    JOIN與關係代數的Join語義相同,具體如下:

image

Table API 示例 (JOIN)

INNER JOIN只選擇滿足ON條件的記錄,我們查詢customer_taborder_tab表,將有訂單的客戶和訂單資訊選擇出來,如下:

val result = customer
  .join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

Result

c_id c_name c_desc o_id o_c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone

Table API 示例 (LEFT JOIN)

LEFT JOININNER JOIN的區別是當右表沒有與左邊相JOIN的資料時候,右邊對應的欄位補NULL輸出,語義如下:
image

對應的SQL語句如下(LEFT JOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ; 
  • 細心的讀者可能發現上面T2.ColC是添加了字首T2了,這裡需要說明一下,當兩張表有欄位名字一樣的時候,我需要指定是從那個表裡面投影的。

我們查詢customer_taborder_tab表,將客戶和訂單資訊選擇出來如下:

 val result = customer
   .leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone
c_003 JinCheng from HeBei NULL NULL NULL NULL

特別說明

RIGHT JOIN 相當於 LEFT JOIN 左右兩個表互動一下位置。FULL JOIN相當於 RIGHT JOINLEFT JOIN 之後進行UNION ALL操作。

Time-Interval JOIN

Time-Interval JOIN 相對於UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條資料會與另一條流上的不同時間區域的資料進行JOIN。對應Apache Flink官方文件的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。 Time-Interval JOIN的語義和實現原理詳見《Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN》。其Table API核心的語法示例,如下:

...
val result = left
  .join(right)
  // 定義Time Interval
  .where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
  ...

Lateral JOIN

Apache Flink Lateral JOIN 是左邊Table與一個UDTF進行JOIN,詳細的語義和實現原理請參考《Apache Flink 漫談系列(10) - JOIN LATERAL》。其Table API核心的語法示例,如下:

...
val udtf = new UDTF
val result = source.join(udtf('c) as ('d, 'e))
...

Temporal Table JOIN

Temporal Table JOIN 是左邊表與右邊一個攜帶版本資訊的表進行JOIN,詳細的語法,語義和實現原理詳見《Apache Flink 漫談系列(11) - Temporal Table JOIN》,其Table API核心的語法示例,如下:

...
val rates = tEnv.scan("versonedTable").createTemporalTableFunction('rowtime, 'r_currency)
val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)
...

Window

在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統資料庫的標準開窗,每一個元素都對應一個視窗。一種是GroupWindow,目前在SQL中GroupWindow都是基於時間進行視窗劃分的。

Over Window

Apache Flink中對OVER Window的定義遵循標準SQL的定義語法。
按ROWS和RANGE分類是傳統資料庫的標準分類方法,在Apache Flink中還可以根據時間型別(ProcTime/EventTime)和視窗的有限和無限(Bounded/UnBounded)進行分類,共計8種類型。為了避免大家對過細分類造成困擾,我們按照確定當前行的不同方式將OVER Window分成兩大類進行介紹,如下:

  • ROWS OVER Window - 每一行元素都視為新的計算行,即,每一行都是一個新的視窗。
  • RANGE OVER Window - 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個視窗。

Bounded ROWS OVER Window

Bounded ROWS OVER Window 每一行元素都視為新的計算行,即,每一行都是一個新的視窗。

語義

我們以3個元素(2 PRECEDING)的視窗為例,如下圖:
image

上圖所示視窗 user 1 的 w5和w6, user 2的 視窗 w2 和 w3,雖然有元素都是同一時刻到達,但是他們仍然是在不同的視窗,這一點有別於RANGE OVER Window。

Table API 示例

利用item_tab測試資料,我們統計同類商品中當前和當前商品之前2個商品中的最高價格。

val result = item
  .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
  .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)

Result

itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 50
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 60
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

Bounded RANGE OVER Window

Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個視窗。

語義

我們以3秒中資料(INTERVAL '2' SECOND)的視窗為例,如下圖:
image

注意: 上圖所示視窗 user 1 的 w6, user 2的 視窗 w3,元素都是同一時刻到達,他們是在同一個視窗,這一點有別於ROWS OVER Window。

Tabel API 示例

我們統計同類商品中當前和當前商品之前2分鐘商品中的最高價格。

 val result = item
   .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.minute following CURRENT_RANGE as 'w)
   .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
Result(Bounded RANGE OVER Window)
itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 60
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 40
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

特別說明

OverWindow最重要是要理解每一行資料都確定一個視窗,同時目前在Apache Flink中只支援按時間欄位排序。並且OverWindow開窗與GroupBy方式資料分組最大的不同在於,GroupBy資料分組統計時候,在SELECT中除了GROUP BY的key,不能直接選擇其他非key的欄位,但是OverWindow沒有這個限制,SELECT可以選擇任何欄位。比如一張表table(a,b,c,d)4個欄位,如果按d分組求c的最大值,兩種寫完如下:

  • GROUP BY - tab.groupBy('d).select(d, MAX(c))
  • OVER Window = tab.window(Over.. as 'w).select('a, 'b, 'c, 'd, c.max over 'w)
    如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然可以選擇 a,b,c欄位。但在GROUPBY中,SELECT 只能選擇 d 欄位。

Group Window

根據視窗資料劃分的不同,目前Apache Flink有如下3種Bounded Winodw:

  • Tumble - 滾動視窗,視窗資料有固定的大小,視窗資料無疊加;
  • Hop - 滑動視窗,視窗資料有固定大小,並且有固定的視窗重建頻率,視窗資料有疊加;
  • Session - 會話視窗,視窗資料沒有固定的大小,根據視窗資料活躍程度劃分視窗,視窗資料無疊加。

說明: Aapche Flink 還支援UnBounded的 Group Window,也就是全域性Window,流上所有資料都在一個窗口裡面,語義非常簡單,這裡不做詳細介紹了。

Tumble

語義

Tumble 滾動視窗有固定size,視窗資料不重疊,具體語義如下:
image

Table API 示例

利用pageAccess_tab測試資料,我們需要按不同地域統計每2分鐘的淘寶首頁的訪問量(PV)。

 val result = pageAccess
   .window(Tumble over 2.minute on 'rowtime as 'w)
   .groupBy('w, 'region)
   .select('region, 'w.start, 'w.end, 'region.count as 'pv)
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

Hop

Hop 滑動視窗和滾動視窗類似,視窗有固定的size,與滾動視窗不同的是滑動視窗可以通過slide引數控制滑動視窗的新建頻率。因此當slide值小於視窗size的值的時候多個滑動視窗會重疊。

語義

Hop 滑動視窗語義如下所示:
image

Table API 示例

利用pageAccessCount_tab測試資料,我們需要每5分鐘統計近10分鐘的頁面訪問量(PV).

val result = pageAccessCount
  .window(Slide over 10.minute every 5.minute on 'rowtime as 'w)
  .groupBy('w)
  .select('w.start, 'w.end, 'accessCount.sum as 'accessCount)
Result
winStart winEnd accessCount
2017-11-11 01:55:00.0 2017-11-11 02:05:00.0 186
2017-11-11 02:00:00.0 2017-11-11 02:10:00.0 396
2017-11-11 02:05:00.0 2017-11-11 02:15:00.0 243
2017-11-11 02:10:00.0 2017-11-11 02:20:00.0 33
2017-11-11 04:05:00.0 2017-11-11 04:15:00.0 129
2017-11-11 04:10:00.0 2017-11-11 04:20:00.0 129

Session

Seeeion 會話視窗 是沒有固定大小的視窗,通過session的活躍度分組元素。不同於滾動視窗和滑動視窗,會話視窗不重疊,也沒有固定的起止時間。一個會話視窗在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話視窗 分配器通過配置session gap來指定非活躍週期的時長.

語義

Session 會話視窗語義如下所示:
image

val result = pageAccessSession
  .window(Session withGap 3.minute on 'rowtime as 'w)
  .groupBy('w, 'region)
  .select('region, 'w.start, 'w.end, 'region.count as 'pv)
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:13:00.0 1
ShangHai 2017-11-11 02:01:00.0 2017-11-11 02:08:00.0 4
ShangHai 2017-11-11 02:10:00.0 2017-11-11 02:14:00.0 2
ShangHai 2017-11-11 04:16:00.0 2017-11-11 04:19:00.0 1

巢狀Window

在Window之後再進行Window劃分也是比較常見的統計需求,那麼在一個Event-Time的Window之後,如何再寫一個Event-Time的Window呢?一個Window之後再描述一個Event-Time的Window最重要的是Event-time屬性的傳遞,在Table API中我們可以利用'w.rowtime來傳遞時間屬性,比如:Tumble Window之後再接一個Session Window 示例如下:

 ... 
 val result = pageAccess
   .window(Tumble over 2.minute on 'rowtime as 'w1)
   .groupBy('w1)
   .select('w1.rowtime as 'rowtime, 'col1.count as 'cnt)
   .window(Session withGap 3.minute on 'rowtime as 'w2)
   .groupBy('w2)
   .select('cnt.sum)
...

Source&Sink

上面我們介紹了Apache Flink Table API核心運算元的語義和具體示例,這部分將選取Bounded EventTime Tumble Window為例為大家編寫一個完整的包括Source和Sink定義的Apache Flink Table API Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,使用者ID和訪問時間。我們需要按不同地域統計每2分鐘的淘寶首頁的訪問量(PV)。具體資料如下:

region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00

Source 定義

自定義Apache Flink Stream Source需要實現StreamTableSource, StreamTableSource中通過StreamExecutionEnvironmentaddSource方法獲取DataStream, 所以我們需要自定義一個 SourceFunction, 並且要支援產生WaterMark,也就是要實現DefinedRowtimeAttributes介面。

Source Function定義

支援接收攜帶EventTime的資料集合,Either的資料結構,Right表示WaterMark和Left表示資料:

class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }
  override def cancel(): Unit = ???
}

定義 StreamTableSource

我們自定義的Source要攜帶我們測試的資料,以及對應的WaterMark資料,具體如下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(
    Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
    fieldNames)

  // 頁面訪問表資料 rows with timestamps and watermarks
  val data = Seq(
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
    Right(1510365660000L),
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
    Right(1510365660000L),
    Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
    Right(1510366200000L),
    Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
    Right(1510366260000L),
    Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
    Right(1510373400000L)
  )

  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
    Collections.singletonList(new RowtimeAttributeDescriptor(
      "accessTime",
      new ExistingField("accessTime"),
      PreserveWatermarks.INSTANCE))
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
     execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
  }

  override def getReturnType: TypeInformation[Row] = rowType

  override def getTableSchema: TableSchema = schema

}

Sink 定義

我們簡單的將計算結果寫入到Apache Flink內建支援的CSVSink中,定義Sink如下:

def getCsvTableSink: TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    // 列印sink的檔案路徑,方便我們檢視執行結果
    println("Sink path : " + tempFile)
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(
      Array[String]("region", "winStart", "winEnd", "pv"),
      Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  }

構建主程式

主程式包括執行環境的定義,Source/Sink的註冊以及統計查SQL的執行,具體如下:

def main(args: Array[String]): Unit = {
    // Streaming 環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 設定EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //方便我們查出輸出資料
    env.setParallelism(1)

    val sourceTableName = "mySource"
    // 建立自定義source資料結構
    val tableSource = new MyTableSource

    val sinkTableName = "csvSink"
    // 建立CSV sink 資料結構
    val tableSink = getCsvTableSink

    // 註冊source
    tEnv.registerTableSource(sourceTableName, tableSource)
    // 註冊sink
    tEnv.registerTableSink(sinkTableName, tableSink)

    val result = tEnv.scan(sourceTableName)
      .window(Tumble over 2.minute on 'accessTime as 'w)
      .groupBy('w, 'region)
      .select('region, 'w.start, 'w.end, 'region.count as 'pv)

    result.insertInto(sinkTableName)
    env.execute()
  }
  

執行並檢視執行結果

執行主程式後我們會在控制檯得到Sink的檔案路徑,如下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

Cat 方式檢視計算結果,如下:

jinchengsunjcdeMacBook-Pro:FlinkTable APIDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1

表格化如上結果:

region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

上面這個端到端的完整示例也可以應用到本篇前面介紹的其他運算元示例中,只是大家根據Source和Sink的Schema不同來進行相應的構建即可!

小結

本篇首先向大家介紹了什麼是Table API, Table API的核心特點,然後以此介紹Table API的核心運算元功能,並附帶了具體的測試資料和測試程式,最後以一個End-to-End的示例展示瞭如何編寫Apache Flink Table API的Job收尾。希望對大家學習Apache Flink Table API 過程中有所幫助。

關於點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收穫的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!