1. 程式人生 > 其它 >Apache Flink 介紹

Apache Flink 介紹

Apache Flink 是一個分散式大資料處理引擎,可對有限資料流和無限資料流進行有狀態或無狀態的計算,能夠部署在各種叢集環境,對各種規模大小的資料進行快速計算。

  • Streams:流,分為有限資料流與無限資料流,unbounded stream 是有始無終的資料流,即無限資料流;而bounded stream 是限定大小的有始有終的資料集合,即有限資料流,二者的區別在於無限資料流的資料會隨時間的推演而持續增加,計算持續進行且不存在結束的狀態,相對的有限資料流資料大小固定,計算最終會完成並處於結束的狀態。
  • State,狀態是計算過程中的資料資訊,在容錯恢復和Checkpoint 中有重要的作用,流計算在本質上是增量處理,因此需要不斷查詢保持狀態。
  • Time:分為Event time、Ingestion time、Processing time,Flink 的無限資料流是一個持續的過程,時間是判斷業務狀態是否滯後,資料處理是否及時的重要依據。

基礎架構

當 Flink 叢集啟動後,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再排程任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計資訊彙報給 JobManager。TaskManager 之間以流的形式進行資料的傳輸。上述三者均為獨立的 JVM 程序。

  • Client :提交 Job 的客戶端,可以是執行在任何機器上(與 JobManager 環境連通即可)。提交 Job 後,Client 可以結束程序(Streaming的任務),也可以不結束並等待結果返回。
  • JobManager(又稱為 JobMaster):協調 Task 的分散式執行,包括排程 Task、協調創 Checkpoint 以及當 Job failover 時協調各個 Task 從 Checkpoint 恢復等。
  • TaskManager(又稱為 Worker):執行 Dataflow 中的 Tasks,包括記憶體 Buffer 的分配、Data Stream 的傳遞等。

)Task Slot 是一個 TaskManager 中的最小資源分配單位,一個 TaskManager 中有多少個 Task Slot 就意味著能支援多少併發的 Task 處理。需要注意的是,一個 Task Slot 中可以執行多個 Operator,一般這些 Operator 是能被 Chain 在一起處理的。

啟動流程

最簡單的執行 Flink 應用的方法就是以單機 Standalone 的方式執行。

啟動flink:

[root@localhost flink-1.11.3]# ./bin/start-cluster.sh

開啟 http://ip:8081/ 就能看到 Flink 的 Web 介面。嘗試提交 Word Count 任務:

[root@localhost flink-1.11.3]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname 127.0.0.1 --port 9000

[root@localhost home]# nc -l 9000
qwer
asdf
zxcv

檢視TaskManager 的 stdout ,就可以看到 輸出結果。還可以通過--input引數指定我們自己的本地檔案作為輸入。

[root@localhost flink-1.11.3]# ./bin/flink run examples/streaming/WordCount.jar --input /tmp/z.txt

停止flink:

[root@localhost flink-1.11.3]# ./bin/stop-cluster.sh
常用配置介紹

workers 用於配置 TaskManager 的部署,預設配置下只會啟動一個 TaskManager 程序,如果想增加一個 TaskManager 程序的,只需要檔案中追加一行“localhost”。

也可以直接通過./bin/taskmanager.sh start這個命令來追加一個新的 TaskManager:

[root@localhost flink-1.11.3]# ./bin/taskmanager.sh start|start-foreground|stop|stop-all

flink-conf.yaml 用於配置 jobmanager和 taskmanager 的執行引數,常用配置:

jobmanager.rpc.address: localhost

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123

# The total process memory size for the JobManager.
jobmanager.memory.process.size: 1600m

# The total process memory size for the TaskManager.
taskmanager.memory.process.size: 1728m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1
日誌的檢視和配置

JobManager 和 TaskManager 的啟動日誌可以在 Flink目錄下的 log 子目錄中找到。

JobManager日誌:

  • flink-root-standalonesession-0-localhost.log:程式碼中的日誌輸出
  • flink-root-standalonesession-0-localhost.out:程序執行時的stdout輸出

TaskManager日誌:

  • flink-root-taskexecutor-0-localhost.log
  • flink-root-taskexecutor-0-localhost.out

日誌的配置檔案在 Flink binary 目錄的 conf 子目錄下,其中:

  • log4j-cli.properties:用 Flink 命令列時用的 log 配置,比如執行“ flink run”命令
  • log4j-yarn-session.properties:用 yarn-session.sh 啟動時命令列執行時用的 log 配置
  • log4j.properties:無論是 Standalone 還是 Yarn 模式,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties

這三個log4j.properties檔案分別有三個logback.xml檔案與之對應,如果想使用 Logback ,需要把與之對應的“log4j.*properties”檔案刪掉即可,對應關係如下:

  • log4j-cli.properties -> logback-console.xml
  • log4j-yarn-session.properties -> logback-yarn.xml
  • log4j.properties -> logback.xml