1. 程式人生 > 其它 >Flink常用 API 詳解

Flink常用 API 詳解

Flink分成API

Flink 根據抽象程度分層,提供了三種不同的 API。每一種 API 在簡潔性和表達力上有著不同的側重,並且針對不同的應用場景。

  • ProcessFunction 是 Flink 所提供最底層介面。ProcessFunction 可以處理一或兩條輸入資料流中的單個事件或者歸入一個特定視窗內的多個事件。它提供了對於時間和狀態的細粒度控制。開發者可以在其中任意地修改狀態,也能夠註冊定時器用以在未來的某一時刻觸發回撥函式。因此,你可以利用 ProcessFunction 實現許多有狀態的事件驅動應用所需要的基於單個事件的複雜業務邏輯。
  • DataStream API 為許多通用的流處理操作提供了處理原語
    。這些操作包括視窗、逐條記錄的轉換操作,在處理事件時進行外部資料庫查詢等。DataStream API 支援 Java 和Scala 語言,預先定義了例如 map()、reduce()、aggregate() 等函式。你可以通過擴充套件實現預定義介面或使用 Java、Scala 的 lambda 表示式實現自定義的函式
  • SQL & Table API:Flink 支援兩種關係型的 API,Table API 和 SQL。這兩個 API 都是批處理和流處理統一的 API,這意味著在無邊界的實時資料流和有邊界的歷史記錄資料流上,關係型 API 會以相同的語義執行查詢,併產生相同的結果。Table API 和 SQL藉助了 Apache Calcite 來進行查詢的解析,校驗以及優化。它們可以與 DataStream 和DataSet API 無縫整合,並支援使用者自定義的標量函式,聚合函式以及表值函式。
  • 擴充套件庫
    • 複雜事件處理(CEP):模式檢測是事件流處理中的一個非常常見的用例。Flink 的 CEP庫提供了 API,使使用者能夠以例如正則表示式或狀態機的方式指定事件模式。CEP 庫與Flink 的 DataStream API 整合,以便在 DataStream 上評估模式。CEP 庫的應用包括網路入侵檢測,業務流程監控和欺詐檢測。
    • DataSet API:DataSet API 是 Flink 用於批處理應用程式的核心 API。DataSet API 所提供的基礎運算元包括 map、reduce、(outer) join、co-group、iterate 等。所有運算元都有相應的演算法和資料結構支援,對記憶體中的序列化資料進行操作。如果資料大小超過預留記憶體,則過量資料將儲存到磁碟。Flink 的 DataSet API 的資料處理演算法借鑑了傳統資料庫演算法的實現,例如混合雜湊連線(hybrid hash-join)和外部歸併排序(external merge-sort)。
    • Gelly:Gelly 是一個可擴充套件的圖形處理和分析庫。Gelly 是在 DataSet API 之上實現的,並與 DataSet API 整合。因此,它能夠受益於其可擴充套件且健壯的操作符。Gelly 提供了內建演算法,如 label propagation、triangle enumeration 和 page rank 演算法,也提供了一個簡化自定義圖演算法實現的 Graph API。

Flink的程式設計模型

DataStream 的程式設計模型包括四個部分:Environment,DataSource,Transformation,Sink。

Flink的開發步驟

Flink使用 DataSet 和 DataStream 代表資料集。DateSet 用於批處理,代表資料是有限的;而 DataStream 用於流資料,代表資料是無界的。資料集中的資料是不可以變的,也就是說不能對其中的元素增加或刪除。我們通過資料來源建立 DataSet 或者 DataStream ,通過 map,filter 等轉換(transform)操作對資料集進行操作產生新的資料集。

編寫 Flink 程式一般經過一下幾個步驟:

  • step1:準備環境-env
  • step2:準備資料-source
  • step3:處理資料-transformation
  • step4:輸出結果-sink
  • step5:觸發執行-execute

準備execution環境

Flink 提供了以下三種方式:

getExecutionEnvironment() //推薦使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

我們以第一個為例建立 execution 環境,程式碼如下: