1. 程式人生 > 實用技巧 >Flink:簡介

Flink:簡介

目錄

特點
Use Case
Flink vs Spark
架構
執行模式
Layered APIs & Component Stack
DataStream 例子
DataSet 例子
狀態
Time、Watermark、Late Data
Windows
Checkpoint
DataStream 的 Sources、Transformations、Sinks
DataStream 的 Join
Process Function
非同步 IO
DataSet 的 Sources、Transformation、Sinks
Table API & SQL
Library CEP
Library Gelly



特點

  • 分散式平行計算框架
  • 強大的狀態管理,支援有狀態的計算
  • 支援流資料處理、和批資料處理,以流資料為基礎,批處理是流處理的特例
  • 支援基於資料自身攜帶的 event-time 進行處理,支援 Watermark 機制
  • 提供基於時間、數量、會話的強大的 window 機制
  • 提供精準一致語義 (exactly-once) 的保證機制
  • 提供 checkpoint、savepoint,支援錯誤恢復、升級、伸縮等機制
  • 提供流處理 DataStream API 和批處理 DataSet API,在此之上提供 SQL、Table API 統一流批介面
  • 基於 Java 實現,提供 Scala 支援,在 Table API、SQL 層面提供 Python 支援
  • 基於記憶體的計算、低延遲、高吞吐、高可用、可擴充套件、支援數千核心、支援 TB 級別的狀態管理
  • 多種部署方式:Standalone Cluster、Yarn、Mesos、K8S



Use Case

  • Event-Driven Applications:事件驅動程式,可以在收到一個或多個數據後實時做出響應,不需要等待 (相比較 Spark 就做不到,Spark 需要等待收到一批資料後才能響應,所以 Spark 的實時性不夠),適用於 Fraud detection、Anomaly detection、Rule-based alerting、monitoring 等有實時響應要求的應用
  • Data Analytics Applications:對有邊界資料做批資料分析、對無邊界資料做流資料分析 (實時地、持續地、增量地,獲取資料、分析資料、更新結果),Flink 提供 SQL 介面統一對批資料和流資料的操作
  • Data Pipeline Applications:類似 ETL,做資料的清理、擴充套件、入庫,區別在於傳統 ETL 是週期性啟動程式,Flink 可以是一個程式在持續執行



  • Flink 優點:有狀態管理 ( Spark 只有 checkpoint )、強大的視窗機制 ( Spark 只支援時間視窗,不支援事件視窗、會話視窗)、實時性更強 ( Spark 是通過微批處理,延時比較高,而且無法基於事件實時響應,Flink 原生就是基於資料流/事件流的 )、exactly-once 的實現比 Spark 要好
  • Spark 優點:流批統一得更好、批處理能力更強、MachineLearning 投入更多、原生 Scala 支援 Java/Python ( Flink 原生 Java,支援 Scala,只在 Table API、SQL 層面支援 Python,而 Scala 比 Java 簡潔 )



架構


JobManagers
也叫 masters,至少一個,也可以配置多個實現 HA,負責管理叢集、排程任務、協調分散式執行、協調 checkpoints 和故障恢復等等,JobManager 有 3 個主要的元件

  • ResourceManager:負責 Flink 叢集資源的管理和配置,管理 TaskManager 的 task slots,在不同的部署模式下(Standalone、Yarn、Mesos、K8S)會有不同的實現,在 Standalone 模式下,JobManager 無法根據需要自己啟動新的 TaskManager
  • Dispatcher:提供 REST 介面用以接受 Client 提交 Flink 程式,同時用於啟動 JobManager 程式,並且提供 WebUI 用於查詢 Job 資訊
  • JobMaster:用於管理一個 JobGraph 的執行,多個 Job 可以同時執行,每個 Job 都有自己的 JobMaster

TaskManagers
也叫 workers,至少要有一個,用於執行資料流的任務,快取和交換資料,TaskManagers 需要和 JobManagers 建議連結,報告自己的狀態,接受任務的分配

TaskManager 是一個 JVM 程序,管理著一個或多個 task slots,代表 TaskManager 最多能同時接收多少 task,每個 task slot 是一個 thread,每個 slot 會預留相應的記憶體不會被其他 slot 佔據,現在 slot 只預留記憶體資源,不預留其他資源比如 CPU,同一個 TaskManager 的不同 task 可以共享相同的 TCP 連結和心跳資訊,也可以共享資料集和資料結構,可以有效減少 overhead

Flink 預設允許不同 tasks 的 subtasks 共享 slots,只要這些 tasks 是來自同一個 Job,這樣有可能管理著一個 Job 的整個 pipeline,這樣可以有效提高資源利用率

Task slots 的數量最好配置成和 CPU 的核心數量一樣

Client
通過 flink run 命令準備並將任務提交給 JobManager,然後就可以退出了,也可以保持連結以接受 JobManager 返回的執行狀態



執行模式

  • Flink Session Cluster:有一個長期執行的、提前規劃好資源的叢集,所有 client 向同一個叢集提交 Job,缺點是有資源的競爭、JobManager 和 TaskManager 出錯的話會影響所有 Job,優點是啟動快,適合那些頻繁啟動、執行時間短、要求快速啟動響應的程式
  • Flink Job Cluster:每個 Job 都有獨立的叢集,需要提交 Job 到外部程式比如 Yarn、K8S 等,然後由這些外部程式先分配資源,然後用分配的資源啟動 JobManager,再根據 Job 的需求啟動 TaskManager,再執行程式,優點是每個 Job 都是獨立的互不影響,缺點是啟動時間長、缺乏統一管理,適合那些需要長期執行、對啟動時間不敏感的程式
  • Flink Application Cluster:將應用和依賴打包成一個可執行的 Jar 包直接執行,不需要先啟動叢集再提交 Job,而是一步到位直接啟動程式
  • Self-contained Flink Applications:there are efforts in the community towards fully enabling Flink-as-a-Library in the future.



Layered APIs & Component Stack

Flink 提供了不同 level 的 API 供程式使用








DataStream 例子

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        // env 的初始化和 DataSet 不一樣
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

更多資訊參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html



DataSet 例子

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        // env 的初始化和 DataStream 不一樣
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

更多資訊參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/



狀態

如果一個流處理程式,需要對多個數據計算才能得出結果,那麼通常這個程式就是有狀態的,需要儲存某個值,或某個中間結果,比如有個程式用於實時監控司機的駕駛狀態,如果連續駕駛 3 小時就要立刻出告警資訊,這就需要儲存每個司機的狀態,以司機為 key,儲存開始計時的時間,既 (Driver, StartTime),收到新資料後,如果顯示司機沒在駕駛,就更新 StartTime 為當前時間,如果在駕駛,就和 StartTime 比較,如果超過 3 小時,就發告警資訊,Flink 會自動幫我們管理並儲存這個狀態資訊,並且如果程式出錯重啟,能自動恢復這個狀態,這樣能繼續從出錯時讀取的資料開始繼續執行,而不需要回溯歷史資料重新計算,Flink 的狀態管理功能簡化了應用程式的編寫,讓應用程式更專注於業務上

Flink 的狀態管理功能包括:

  • Multiple State Primitives:支援多種資料型別的狀態,包括基本資料型別,list 型別,map 型別,等等
  • Pluggable State Backends:可插拔的狀態後端,狀態後端用於管理狀態,決定了一個狀態最多能有多大,非同步還是同步,用什麼方法儲存,儲存在什麼地方,Flink 可以通過配置就能改變所有 Job 的 State Backend,也可以每個 Job 的程式碼自己決定,Flink 提供了幾個 State Backend:
    1)MemoryStateBackend(預設,用於少量狀態或測試),Checkpoint 候把狀態儲存到 JobManager 的記憶體;
    2)FsStateBackend,Checkpoint 時把狀態儲存到檔案系統;
    3)RocksDBStateBackend,執行時和 Checkpoint 時都是把狀態落盤到 RocksDB,只支援非同步;
    4)使用者自定義的 State Backend
    更多資訊參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html
  • Exactly-once state consistency: 通過 Checkpoint 和恢復機制保證狀態的一致性,使失敗處理對程式透明
  • Very Large State: 通過非同步儲存和增量 checkpoint 演算法,Flink Job 可以維護 TB 級別的狀態
  • Scalable Applications: Flink supports scaling of stateful applications by redistributing the state to more or fewer workers.

Flink 有兩種基本的 State

  • Keyed State:和 key 相關的一種 state,只能用於 KeyedStream 型別資料集對應的 functions 和 operators 上,每個 Keyed State 和一個 Operator 和 Key 的組合繫結,Keyed State 通過 Key Groups 組織,Flink 以 Key Groups 為單位重新排程 Keyed State
  • Operator State:和 operator 的一個並行例項繫結的 state,Kafka Connector 就是一個使用 Operator State 的例子,Kafka consumer 的每個並行例項有一個 Operator State 用於維護一個 topic partitions 和 offsets 的 map

State 以兩種形式存在

  • 託管:就是資料結構是 Flink 知道的,比如 list,可以完全交給 Flink 管理
  • 原生:就是資料結構是自己定義的,Flink 不清楚,只當成二進位制資料,需要 operator 做進一步處理

Flink 能管理的型別包括:

  • ValueState
  • ListState
  • ReducingState
  • AggregatingState<IN, OUT>
  • FoldingState<T, ACC>
  • MapState<UK, UV>

更多資訊參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html

KeyedState 例子

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
// 第一個是 (3+5)/2,第二個是 (7+4)/2,接下來只剩一個不會觸發
// 這個例子都是相同的 key,如果是不同的 key 會按 key 聚合,sum 會自動和 key 關聯

Operator State 例子(通過 state 使得 sink operator 實現 offset 錯誤恢復機制)

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {
    private final int threshold;
    private transient ListState<Tuple2<String, Integer>> checkpointedState;
    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        // 和 key state 的差別,這裡呼叫 getOperatorStateStore,初始化方式不一樣,而 state 的定義可以一樣
        checkpointedState = context.getOperatorStateStore().getListState(descriptor);
        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

State-Backend 可以在 flink-conf.yaml 設定

# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

也可以在每個 Job 設定

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

State 還可以設定 TTL,超時的 State 會被清除



Time、Watermark、Late Data

Flink 流處理程式支援基於不同型別的時間進行操作(比如 Window 操作)

  • Event Time:基於資料自身攜待的時間
  • Ingestion Time:基於資料被 Flink 收到時的系統時間
  • Processing Time(預設):基於程式執行操作時的系統時間

指定時間型別

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream.keyBy( (event) -> event.getUser() )
      .timeWindow(Time.hours(1))
      .reduce( (a, b) -> a.add(b) )
      .addSink(...);

沒指定的話預設是 Processing Time


基於視窗的運算中,可能會出現資料的亂序和延遲,即某個視窗結束後,依然有屬於該視窗的資料到來,Flink 通過 Watermarking (水印)指定最多可容忍多久的延遲、控制觸發視窗計算的時機

Flink 可以自定義時間欄位,以及自定義 Watermark 時間,Flink 觸發視窗計算的條件:

  • Watermark 時間 >= 視窗的結束時間
  • 視窗的時間範圍內有資料

比如視窗大小是 10 分鐘,那麼(0,10)這個視窗在 Watermark 時間大於 10 的時候觸發

Flink 提供了統一的 DataStream.assignTimestampsAndWatermarks() 方法提取事件時間併產生 Watermark

assignTimestampsAndWatermarks() 接受的型別有

  • AssignerWithPeriodicWatermarks
  • AssignerWithPunctuatedWatermarks
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        // MyTimestampsAndWatermarks 是自己實現的 AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks 的子類
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

AssignerWithPeriodicWatermarks 是週期性產生 Watermarks
預設週期 200ms,通過ExecutionConfig.setAutoWatermarkInterval() 可以指定新的週期

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(200);

每次產生 Watermarks 的時候是通過呼叫 AssignerWithPeriodicWatermarks 的 getCurrentWatermark() 函式
時間的獲取則是通過 AssignerWithPeriodicWatermarks 的 extractTimestamp 函式實現

// 這個自定義的類,將所有收到的資料所攜待的時間的最大值,減去 maxOutOfOrderness 作為 Watermark
// 假設視窗大小是 10 分鐘,maxOutOfOrderness 是 3 分鐘,那麼
// (0,10)視窗需要在收到的資料時間大於等於 13 的時候才觸發
public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<MyEvent> {
    private final long maxOutOfOrderness = 3500; // 3.5 seconds
    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        // 每來一條資料就解析資料的時間,用以更新 Watermark,其中 MyEvent 是自定義的資料類
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // getCurrentWatermark 被週期性的呼叫
        // 如果返回的 Watermark 大於視窗的結束時間,那就觸發視窗的計算
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

Flink 提供了幾個內建的 AssignerWithPeriodicWatermarks 類

// AscendingTimestampExtractor
// 新資料的時間變大就直接將該時間作為 Watermark,否則進行異常處理
// 需要自定義 extractAscendingTimestamp 函式獲取時間    
    public abstract long extractAscendingTimestamp(T element);

    @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

    @Override
    public final Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }
// BoundedOutOfOrdernessTimestampExtractor
// 新資料的時間變大,就在該時間的基礎上減去一個閥值 maxOutOfOrderness 作為 Watermark
// 需要自定義 extractTimestamp 獲取資料時間
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
// IngestionTimeExtractor
// 基於系統時間生成 Watermark
    @Override
    public long extractTimestamp(T element, long previousElementTimestamp) {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return now;
    }

    @Override
    public Watermark getCurrentWatermark() {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return new Watermark(now - 1);
    }

AssignerWithPunctuatedWatermarks 是打點型 Watermark,就是每次收到資料,都會判斷是不是要立刻產生 Watermark

public class MyTimestampsAndWatermarks implements AssignerWithPunctuatedWatermarks<MyEvent> {
	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
                // checkAndGetNextWatermark 會在 extractTimestamp 之後被立刻呼叫
		return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
	}
}

打點型 Watermark 沒有內建類

如果 Window 的資料來源有多個,每個都有自己的 Watermark,那麼會選取最小的那個


如果希望處理在視窗被 Watermark 觸發後才到來的資料,一般有兩種方法

  • Allowed Lateness
  • Side Outputs
DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(Time.seconds(30))
    .<windowed transformation>(<window function>);

allowedLateness(Time.seconds(30)) 會在視窗計算結束後,依然保留 30s 不銷燬,這段時間內如果有屬於這個視窗的資料進來,還可以進行計算

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(Time.seconds(30))
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

sideOutputLateData(lateOutputTag) 將視窗結束後才到來的資料進行分流,可以對其單獨處理

Debugging Watermarks:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_event_time.html



Windows

大體上有兩種 Windows

Keyed Windows

stream
       .keyBy(...)                      <-  keyed versus non-keyed windows
       .window(...)                     <-  required: "assigner"
      [.trigger(...)]                   <-  optional: "trigger" (else default trigger)
      [.evictor(...)]                   <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]           <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)]        <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()   <-  required: "function"
      [.getSideOutput(...)]             <-  optional: "output tag"

Non-Keyed Windows

stream
       .windowAll(...)                  <-  required: "assigner"
      [.trigger(...)]                   <-  optional: "trigger" (else default trigger)
      [.evictor(...)]                   <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]           <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)]        <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()   <-  required: "function"
      [.getSideOutput(...)]             <-  optional: "output tag"

可以看到兩者的主要區別在於 .keyBy.window 還是 .windowAll

對於 keyed streams,計算由多個並行的 task 執行,相同的 key 的資料會被同一個 task 計算
對於 non-keyed streams,所有的計算由一個單獨的 task 執行


Window Assigners 決定了如何將資料分配給 Window

Flink 定義了幾種常用的視窗機制:tumbling windows,sliding windows,session windows,global windows
除了 global windows 其他幾種 Windows 都是基於時間,可以是 processing time 或者 event time
Flink 也執行使用者通過繼承 WindowAssigner 類自定義視窗機制

Tumbling Windows:翻滾視窗,固定視窗大小,且視窗沒有重疊

DataStream<T> input = ...;

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))   // 用於調整時區
    .<windowed transformation>(<window function>);

Sliding Windows:滑動視窗,視窗大小固定,可以有重疊,比如大小 20 滑動距離 5 的兩個視窗(0,20)和(5,25)

DataStream<T> input = ...;

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

Session Windows:沒有固定的視窗大小,當一定時間內沒收到新資料時,就將之前收到的資料作為一個視窗

DataStream<T> input = ...;

// event-time session windows with static gap(10 分鐘內沒收到資料就觸發視窗)
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap(自定義視窗間隔時間)
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with static gap(10 分鐘內沒收到資料就觸發視窗)
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap(自定義視窗間隔時間)
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

Global Windows:將相同 key 的資料作為一個視窗,需要自定義視窗觸發機制 Trigger(比如收多少個數據後觸發)

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .trigger(CountEvictor.of(size))
    .<windowed transformation>(<window function>);

Flink 還提供了 timeWindow 和 countWindow 函式做了封裝

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(SlidingProcessingTimeWindows.of(size, slide));
		} else {
			return window(SlidingEventTimeWindows.of(size, slide));
		}
	}

	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
		return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
	}

	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
		return window(GlobalWindows.create())
				.evictor(CountEvictor.of(size))
				.trigger(CountTrigger.of(slide));
	}

如果有特殊要求就自定義 WindowAssigner 子類


Triggers 決定了 Windows 如何被觸發

每個 Trigger 主要有下面幾個函式

  • onElement:有資料新增到視窗時執行
  • onEventTime:觸發 event time 計時器時執行
  • onProcessingTime:觸發 processing time 計時器時執行
  • onMerge:視窗合併時執行
  • clear:清理視窗時執行

前三個函式返回 TriggerResult 告知 Flink 需要對視窗執行什麼操作,主要有

  • CONTINUE:什麼都不用做
  • FIRE:觸發視窗計算
  • PURGE:清理視窗資料
  • FIRE_AND_PURGE:觸發並清理視窗

內建的 Trigger 都只觸發 FIRE 不會觸發 PURGE

Flink 內建的 Triggers

  • EventTimeTrigger:將 event-time 作為視窗時間,當 watermark 超過視窗 end time 時觸發
  • ProcessingTimeTrigger:將 processing-time 作為視窗時間,當 watermark 超過視窗 end time 時觸發
  • CountTrigger:視窗內的資料量超過上限時觸發
  • PurgingTrigger:Trigger 的包裝類,被包裝的 Trigger 返回 FIRE 時,將返回值改為 FIRE_AND_PURGE

如果有特殊需求可以自定義 Trigger 子類


Window Functions

  • ReduceFunction:兩個輸入,產生一個輸出
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
  • AggregateFunction:聚合函式,需要定義三個資料,輸入資料型別,中間值型別,輸出值型別
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  // add 函式依據每個輸入資料,更新中間值
  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  // getResult 函式依據中間值,計算輸出值
  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  // merge 函式合併兩個中間值
  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
  • FoldFunction:直接依據輸入資料,更新輸出結果
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         // 在前面的 acc 基礎上,根據輸入資料產生新的 acc,並且 acc 會作為最終輸出,acc 的初始值由 fold 函式指定
         return acc + value.f1;
       }
    });
  • ProcessWindowFunction:最靈活的方式,可以獲取視窗的所有資料,但更耗效能和資源
DataStream<Tuple2<String, Long>> input = ...;
input.keyBy(t -> t.f0)
     .timeWindow(Time.minutes(5))
     .process(new MyProcessWindowFunction());

public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

ProcessWindowFunction 還可以和 ReduceFunction、AggregateFunction、FoldFunction 等函式結合

DataStream<SensorReading> input = ...;
input.keyBy(<key selector>)
     .timeWindow(<duration>)
     .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

private static class MyReduceFunction implements ReduceFunction<SensorReading> {
  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      // 視窗結束後,如果只有 reduce 會直接把 reduce 的結果輸出
      // 結合 ProcessWindowFunction 後會把 reduce 結果再傳給 ProcessWindowFunction 再進一步處理
      // 這裡在 reduce 的基礎上把 window 的 start time 加入到輸出
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}

有的地方可以用舊版本的 WindowFunction


Evictors

用於在視窗執行之前或視窗執行之後刪除資料,Evictor 類主要由兩個函式實現

void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

Flink 有三個內建的 Evictors

  • CountEvictor:指定保留最近的多少個數據,其餘丟棄,可以參考前面滑動計數視窗 countWindow 的例子
  • DeltaEvictor:自定義 DeltaFunction 和 threshold,和最後一個數據的 delta 超過 threshold 的會被丟棄
  • TimeEvictor:找到所有資料的最大時間戳 max_ts,把時間戳小於 max_ts - interval 的資料刪除

內建 Evictors 預設都是在 window function 前執行
Flink 不保證 Window 內元素的順序,即先來到的資料不一定排在視窗的前面


Allowed Lateness & Side Output

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(Time.seconds(30))
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

講 Watermark 的時候也提到了,就是允許視窗計算結束後依然等待一段時間,如果還有屬於這個視窗的資料到來,還可以觸發新的計算,也可以將遲到資料放到另一個數據流處理



Checkpoint

Checkpoint 用於定期儲存 data source 被消費的位置,以及應用程式自己使用的 state(參考前面狀態部分)

實現 Checkpoint 的前提條件

  • data source 的資料能儲存一定時間,可以從指定的位置重複消費,比如訊息佇列(Kafka、RabbitMQ、Amazon Kinesis、Google PubSub 等)或檔案系統(HDFS、S3、GFS、NFS、Ceph 等)
  • 永久儲存系統,通常都是分散式檔案系統,比如 HDFS、S3、GFS、NFS、Ceph 等

Checkpoint 預設是關閉的,需要在程式碼裡設定開啟

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

conf/flink-conf.yaml 檔案裡更多相應的設定可以參考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/checkpointing.html#related-config-options

要在 Iterative Jobs(檢視 Iterate Transformation 部分)開啟 Checkpoint 需要多指定一個引數,並且有可能丟資料

env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)

Checkpoint 可以非同步和增量的執行,這樣可以減少對資料處理的影響

Checkpoint 只要保證最新的就行,舊的可以被自動刪除

Savepoint 和 Checkpoint 一樣,區別在於 Savepoint 是認為觸發的,生成的 Savepoint 不會超時
比如通過 flink stop 停止 Job 時就會生成

Suspending job "c2cbdb0bca115d2375706b2c95d8a323" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f

可以從指定的 Savepoint 恢復程式

flink run -s /tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f flink-app.jar

更多資訊參考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/checkpointing.html



DataStream 的 Sources、Transformations、Sinks

StreamExecutionEnvironment 提供一些函式用於接入資料來源

readTextFile(String filePath)
readTextFile(String filePath, String charsetName)

readFile(FileInputFormat<OUT> inputFormat, String filePath)
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, 
         long interval, TypeInformation<OUT> typeInformation)
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, 
         long interval, FilePathFilter filter)

socketTextStream(String hostname, int port)
socketTextStream(String hostname, int port, String delimiter, long maxRetry)

fromCollection(Collection<OUT> data)
fromCollection(Iterator<OUT> data, Class<OUT> type)
fromElements(OUT... data)
fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type)

generateSequence(long from, long to)

也可以通過 StreamExecutionEnvironment 的 addSource 函式新增 connectors(預定義或自定義的)

addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties)
addSource(new FlinkKinesisConsumer<>("kinesis_stream_name", new SimpleStringSchema(), consumerConfig))
addSource(new TwitterSource(props))
addSource(new RMQSource<String>(connectionConfig, "queueName", true, new SimpleStringSchema()))
addSource(new NiFiSource(clientConfig))

DataStream Transformations 的主要函式有

map
flatMap
filter
keyBy
reduce
fold         // 用於將所有資料合成一個結果
sum
min
max
minBy
maxBy
keyBy(0).window
windowAll
windowedStream.apply
allWindowedStream.apply
union
dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>)
          .window(TumblingEventTimeWindows.of(Time.seconds(3)))
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2))
          .upperBoundExclusive(true).lowerBoundExclusive(true)
dataStream.coGroup(otherStream).where(0).equalTo(1)
          .window(TumblingEventTimeWindows.of(Time.seconds(3)))
someStream.connect(otherStream)
split & select
stream.assignTimestamps        // 用於從資料中解析出時間戳,這個時間戳可以和 window 配合
initialStream.iterate()        // 後面細講
dataStream.partitionCustom
dataStream.shuffle
dataStream.rebalance

Iterate Transformation

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();      // iterate 可以帶引數指定最大迭代次數

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

// 把 stillGreaterThanZero 裡面的值扔回去,從 iteration.map 開始繼續計算
// 直到 stillGreaterThanZero 為空,或是達到最大迭代次數
iteration.closeWith(stillGreaterThanZero);

// 取出不再需要迭代的資料
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

env.execute()

資料的傳輸是有 buffer 的,可以設定 buffer 的超時時間,buffer 滿了或超時才會發出去

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

DataStream 提供一些方法用於將資料輸出

writeAsText(String path)
writeAsCsv(String path)
print()
writeUsingOutputFormat(OutputFormat<T> format)
writeToSocket(String hostName, int port, SerializationSchema<T> schema)

DataStream 也提供 addSink 函式新增 connectors(預定義或自定義的)

stream.addSink(new FlinkKafkaProducer011<String>("localhost:9092", "my-topic", new SimpleStringSchema()))
stream.addSink(new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig))
stream.addSink(new BucketingSink<String>("/base/path"))     // 可用於 HDFS
stream.addSink(new RMQSink<String>(connectionConfig, "queueName", new SimpleStringSchema()))

streamExecEnv.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...}));

CassandraSink.addSink(result)
        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
        .setHost("127.0.0.1")
        .build();

stream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
    
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));

更多資訊檢視 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html



DataStream 的 Join

Window Join(同一個視窗的兩個 Stream 的資料兩兩傳到 join 函式,比如兩個視窗各 10 個數據,join 函式執行 100 次)

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

Interval Join

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

// 針對 orangeStream 的每個資料 a,都會尋找 greenStream 內時間戳和 a 的差在(-2,1)範圍內的資料
// 將這些資料分別和 a 組合傳給 join 函式
orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){
        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html



Process Function

低階函式,可以對資料進行操作處理

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


DataStream<Tuple2<String, String>> stream = ...;

DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

public class CountWithTimestamp {
    public String key;
    public long count;
    public long lastModified;
}


public class CountWithTimeoutFunction 
        extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {

    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        current.count++;
        current.lastModified = ctx.timestamp();

        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        CountWithTimestamp result = state.value();

        if (timestamp == result.lastModified + 60000) {
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html



非同步 IO

class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

DataStream<String> stream = ...;

DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html



DataSet 的 Sources、Transformation、Sinks

ExecutionEnvironment 提供一些函式用以獲取資料來源

readTextFile(String filePath)
readTextFileWithValue(String filePath)
readCsvFile(String filePath)
readFileOfPrimitives(String filePath, Class<X> typeClass)
readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass)

fromCollection(Collection<X> data)
fromCollection(Iterator<X> data, Class<X> type)
fromElements(X... data)
fromParallelCollection(SplittableIterator<X> iterator, Class<X> type)

generateSequence(long from, long to)

readFile(FileInputFormat<X> inputFormat, String filePath)

// createInput 是一個通用函式,可以通過實現 InputFormat 類接入各種資料來源
createInput(InputFormat<X, ?> inputFormat)
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

DataSet 支援的 Transformation 主要有

map
flatMap
mapPartition
filter
reduce
reduceGroup
aggregate
distinct
join
leftOuterJoin
coGroup
cross
union
rebalance
partitionByHash
partitionByRange
partitionCustom
sortPartition
first
groupBy
project

DataSet 提供的 Sink 函式主要有

writeAsText(String filePath)
writeAsFormattedText(String filePath, TextFormatter<T> formatter) 
writeAsCsv(String filePath)
print()
write(FileOutputFormat<T> outputFormat, String filePath)

// output 是通用函式,通過實現 outputFormat 可以將資料寫入不同的 Sink
output(OutputFormat<T> outputFormat)
myResult.output(
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

Iteration

// 初始資料為 0,最多迭代 10000 次
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);

DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// 當 iteration 為空或迭代次數達到最大時停止迭代,這裡 iteration 永遠有資料,所以只能等最大迭代次數
DataSet<Integer> count = initial.closeWith(iteration);

// 跳出迭代後的操作
count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();

env.execute("Iterative Pi Example");

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/



Table API & SQL

Table API 和 SQL 把 DataStream 和 DataSet 當成表,可以用 SQL 執行,可以統一 stream 和 batch 的操作
Flink SQL 基於?Apache Calcite 實現

Table API 和 SQL 有兩個不同的 Planner:Old-Planner 和 Blink-Planner
兩者的一些區別參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners
https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html

Table API 和 SQL 支援 Java、Scala、Python

使用 Python 需要先按照 PyFlink,要求 Python 版本是 3.5,3.6,3.7

python -m pip install apache-flink

Flink 和 Blink 建立 TableEnvironment 的方式

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

Table API 和 SQL 的基本用法

// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// create a Table
tableEnv.connect(...).createTemporaryTable("table1");
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable");

// create a Table object from a Table API query
Table tapiResult = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");

// execute
tableEnv.execute("java_job");

建立表

// 為一個經過 SQL 生成的 Table 類建立表名
Table projTable = tableEnv.from("X").select(...);
tableEnv.createTemporaryView("projectedTable", projTable);

// 連結外部資料系統(資料庫,檔案系統,訊息佇列,等等)並建立表名
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

建立表的時候除了表名,還可以指定 catalog 和 database

Table table = ...;

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("exampleView", table);

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table);

// register the view named 'View' in the catalog named 'custom_catalog' in the
// database named 'custom_database'. 'View' is a reserved keyword and must be escaped.  
tableEnv.createTemporaryView("`View`", table);

// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("`example.View`", table);

// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

Table API 查詢表

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query

SQL 查詢表

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// emit or convert Table
// execute query
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// execute query

寫入資料表

// create an output Table
final Schema schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG());

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable");

// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// register TableSink
tableEnv.registerTableSink(
  "outputTable",               // table name
  new String[]{...},           // field names
  new TypeInformation[]{...},  // field types
  sink);                       // table sink

// emit result Table via a TableSink
result.insertInto("outputTable");

將 DataStream 和 DataSet 轉換為表

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);

// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

// convert the DataStream into a Table with swapped fields and field names "myString" and "myLong"
Table table3 = tableEnv.fromDataStream(stream, "f1 as myString, f0 as myLong");
// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...

// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");

將錶轉換為 DataStream 和 DataSet

// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// convert the Table into an append DataStream of Tuple2<String, Integer> 
//   via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);

Connector:可以通過 connector 以 table 的方式讀寫外部系統(檔案系統,資料庫,訊息佇列,等等)
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html

tableEnvironment
  .connect(
    new Kafka()
      .version("0.10")
      .topic("test-input")
      .startFromEarliest()
      // .startFromLatest()
      // .startFromSpecificOffsets(...)
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
      // .sinkPartitionerFixed()
      // .sinkPartitionerRoundRobin()
  )
  .withFormat(
    new Avro()
      .avroSchema(
        "{" +
        "  \"namespace\": \"org.myorganization\"," +
        "  \"type\": \"record\"," +
        "  \"name\": \"UserMessage\"," +
        "    \"fields\": [" +
        "      {\"name\": \"timestamp\", \"type\": \"string\"}," +
        "      {\"name\": \"user\", \"type\": \"long\"}," +
        "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
        "    ]" +
        "}"
      )
  )
  .withSchema(
    new Schema()
      .field("rowtime", DataTypes.TIMESTAMP(3))
        .rowtime(new Rowtime()    // optional: declares this field as a event-time attribute
                                  // also can declares this field as a processing-time attribute via .proctime()
          .timestampsFromField("timestamp")
          .watermarksPeriodicBounded(60000)   // also can be .watermarksPeriodicAscending()
                                                       // or .watermarksFromSource()
        )
      .field("user", DataTypes.BIGINT())
      .field("message", DataTypes.STRING())
  )
  .createTemporaryTable("MyUserTable");
.connect(
  new FileSystem()
    .path("file:///path/to/whatever")    // required: path to a file or directory
)
.withFormat(                             // required: file system connector requires to specify a format,
  ...                                    // currently only OldCsv format is supported.
)                                        // Please refer to old CSV format part of Table Formats section for more details.
.connect(
  new Elasticsearch()
    .version("6")                      // required: valid connector versions are "6"
    .host("localhost", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
    .index("MyUsers")                  // required: Elasticsearch index
    .documentType("user")              // required: Elasticsearch document type

    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" by default)
                              //   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    .keyNullLiteral("n/a")    // optional: representation for null fields in keys ("null" by default)

    // optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
    .failureHandlerFail()          // optional: throws an exception if a request fails and causes a job failure
    .failureHandlerIgnore()        //   or ignores failures and drops the request
    .failureHandlerRetryRejected() //   or re-adds requests that have failed due to queue capacity saturation
    .failureHandlerCustom(...)     //   or custom failure handling with a ActionRequestFailureHandler subclass

    // optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    .disableFlushOnCheckpoint()    // optional: disables flushing on checkpoint (see notes below!)
    .bulkFlushMaxActions(42)       // optional: maximum number of actions to buffer for each bulk request
    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered actions in bytes per bulk request
                                   //   (only MB granularity is supported)
    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in milliseconds)

    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
    .bulkFlushBackoffExponential() //   or use an exponential backoff type
    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)

    // optional: connection properties to be used during REST communication to Elasticsearch
    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in milliseconds) between retries
    .connectionPathPrefix("/v1")   // optional: prefix string to be added to every REST communication
)
.withFormat(                      // required: Elasticsearch connector requires to specify a format,
  ...                             // currently only Json format is supported.
                                  // Please refer to Table Formats section for more details.
)    
.connect(
  new HBase()
    .version("1.4.3")                      // required: currently only support "1.4.3"
    .tableName("hbase_table_name")         // required: HBase table name
    .zookeeperQuorum("localhost:2181")     // required: HBase Zookeeper quorum configuration
    .zookeeperNodeParent("/test")          // optional: the root dir in Zookeeper for HBase cluster.
                                           // The default value is "/hbase".
    .writeBufferFlushMaxSize("10mb")       // optional: writing option, determines how many size in memory of buffered
                                           // rows to insert per round trip. This can help performance on writing to JDBC
                                           // database. The default value is "2mb".
    .writeBufferFlushMaxRows(1000)         // optional: writing option, determines how many rows to insert per round trip.
                                           // This can help performance on writing to JDBC database. No default value,
                                           // i.e. the default flushing is not depends on the number of buffered rows.
    .writeBufferFlushInterval("2s")        // optional: writing option, sets a flush interval flushing buffered requesting
                                           // if the interval passes, in milliseconds. Default value is "0s", which means
                                           // no asynchronous flush thread will be scheduled.
)
// JDBC 似乎只支援 SQL 的方式匯入,不支援 API 的方式匯入
CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'jdbc', -- required: specify this table type is jdbc
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
  'connector.table' = 'jdbc_table_name',  -- required: jdbc table name
  'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of the JDBC driver to use to connect to this URL. 
                                                -- If not set, it will automatically be derived from the URL.
  'connector.username' = 'name', -- optional: jdbc user name and password
  'connector.password' = 'password',
  
  -- scan options, optional, used when reading from table

  -- These options must all be specified if any of them is specified. In addition, partition.num must be specified. They
  -- describe how to partition the table when reading in parallel from multiple tasks. partition.column must be a numeric,
  -- date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide
  -- the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
  -- This option applies only to reading.
  'connector.read.partition.column' = 'column_name', -- optional, name of the column used for partitioning the input.
  'connector.read.partition.num' = '50', -- optional, the number of partitions.
  'connector.read.partition.lower-bound' = '500', -- optional, the smallest value of the first partition.
  'connector.read.partition.upper-bound' = '1000', -- optional, the largest value of the last partition.
  
  'connector.read.fetch-size' = '100', -- optional, Gives the reader a hint as to the number of rows that should be fetched
                                       -- from the database when reading per round trip. If the value specified is zero, then
                                       -- the hint is ignored. The default value is zero.

  -- lookup options, optional, used in temporary join
  'connector.lookup.cache.max-rows' = '5000', -- optional, max number of rows of lookup cache, over this value, the oldest rows will
                                              -- be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any
                                              -- of them is specified. Cache is not enabled as default.
  'connector.lookup.cache.ttl' = '10s', -- optional, the max time to live for each rows in lookup cache, over this time, the oldest rows
                                        -- will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of
                                        -- them is specified. Cache is not enabled as default.
  'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup database failed

  -- sink options, optional, used when writing into table
  'connector.write.flush.max-rows' = '5000', -- optional, flush max size (includes all append, upsert and delete records), 
                                             -- over this number of records, will flush data. The default value is "5000".
  'connector.write.flush.interval' = '2s', -- optional, flush interval mills, over this time, asynchronous threads will flush data.
                                           -- The default value is "0s", which means no asynchronous flush thread will be scheduled. 
  'connector.write.max-retries' = '3' -- optional, max retry times if writing records to database failed
)
.connect(...)
  .inAppendMode()   
  // inAppendMode : a dynamic table and an external connector only exchange INSERT messages.
  // inUpsertMode : a dynamic table and an external connector exchange UPSERT and DELETE messages.
  // inRetractMode : a dynamic table and an external connector exchange ADD and RETRACT messages.

Table Format

.withFormat(
  new Csv()

.withFormat(
  new Json()

.withFormat(
  new Avro()

explain

Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);

String explanation = tEnv.explain(table);
System.out.println(explanation);
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : collect elements with CollectionInputFormat

Stage 2 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 3 : Operator
		content : from: (count, word)
		ship_strategy : REBALANCE

		Stage 4 : Operator
			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
			ship_strategy : FORWARD

			Stage 5 : Operator
				content : from: (count, word)
				ship_strategy : REBALANCE

Table & SQL 裡處理 Event-Time 和 Process-Time
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html

Joins in Continuous Queries
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html

Data Type
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/types.html

Table API & SQL
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/

SQL:Detecting Patterns in Tables

# 將 MyTable 表按 userid 分類,再按 proctime 排序
# 如果連續三條資料的 name 分別是 a b c,就將這三條資料的 id 分別作為 aid,bid,cid 輸出
SELECT T.aid, T.bid, T.cid
FROM MyTable
    MATCH_RECOGNIZE (
      PARTITION BY userid
      ORDER BY proctime
      MEASURES
        A.id AS aid,
        B.id AS bid,
        C.id AS cid
      PATTERN (A B C)
      DEFINE
        A AS name = 'a',
        B AS name = 'b',
        C AS name = 'c'
    ) AS T

這個 Detecting Patterns 的功能看起來挺強的
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/match_recognize.html

SQL Function
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/



Library CEP

Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.

// 假設有一條資料的 id 是 42
// 並且接下來相連的一條資料的 volume 大於等於 10
// 並且後面還有一條 name 是 end 的資料(不需要和第二條相連)
// 把滿足這個 Pattern 的資料找出來
DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.process(
    new PatternProcessFunction<Event, Alert>() {
        @Override
        public void processMatch(
                Map<String, List<Event>> pattern,
                Context ctx,
                Collector<Alert> out) throws Exception {
            out.collect(createAlertFrom(pattern));
        }
    });

指定資料出現的次數

 Pattern<Event, ?> start = Pattern.<Event>begin("start").where(...)

 // expecting 4 occurrences
 start.times(4);

 // expecting 0 or 4 occurrences
 start.times(4).optional();

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();

 // expecting 1 or more occurrences
 start.oneOrMore();

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();

 // expecting 0 or more occurrences
 start.oneOrMore().optional();

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();

 // expecting 2 or more occurrences
 start.timesOrMore(2);

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy();

指定條件

// Iterative Conditions
    .where(new IterativeCondition<SubEvent>() {
        @Override
        public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
            if (!value.getName().startsWith("foo")) {
                return false;
            }
    
            double sum = value.getPrice();
            for (Event event : ctx.getEventsForPattern("middle")) {
                sum += event.getPrice();
            }
            return Double.compare(sum, 5.0) < 0;
        }
    });

// Simple Conditions
.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});

// Combining Conditions
.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});

// Stop condition
.oneOrMore().until(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});

// Defines a subtype condition for the current pattern.
// An event can only match the pattern if it is of this subtype:
pattern.subtype(SubEvent.class);

組合

next()             // 相連的下一條資料,比如(a,b,c),可以指定匹配(a,b),但不能指定匹配(a,c)
followedBy()       // 在後面出現的資料,比如(a,b,c),可以指定匹配(a,b)或(a,c)都可以
followedByAny()    // 在後面出現的所有滿足要求的資料,比如(a,b,c,c),指定匹配(a,c)時
                   // 會返回兩個滿足條件的(a,c)組合,而 followedBy 只會返回一個

notNext()          // 相連的下一條不能出現的資料
notFollowedBy()    // 在後面不能出現的資料

next.within(Time.seconds(10));   // 指定時間範圍

Groups of patterns

// strict contiguity
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();

一個數據有可能被多次命中匹配,可以設定如何處理
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/cep.html#after-match-skip-strategy

更多 CEP 資訊參考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/cep.html



Library Gelly

Gelly is a Graph API for Flink
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/gelly/