1. 程式人生 > >Flink 部署文檔

Flink 部署文檔

download pla 任務 wordcount 需要 dynamic lin clas nload

  • Flink 部署文檔
    • 1 先決條件
    • 2 下載 Flink 二進制文件
    • 3 配置 Flink
      • 3.1 flink-conf.yaml
      • 3.2 slaves
    • 4 將配置好的 Flink 分發到其他節點
    • 5 以 Standalone 模式啟動 Flink
    • 6 以 Flink on YARN 模式啟動
      • 6.1 Flink YARN Session
      • 6.2 Single Flink job on YARN
    • 7 參考

本文檔中的集群包含 192.168.105.10/11/12 三臺機器。三臺機器的 hostname 分別設為 ivic10/ivic11/ivic12,其中第一臺機器作為 master,後兩臺作為 slaves。

1 先決條件

在部署 Flink 之前,請確認集群的每個節點都符合以下條件:

  1. 已安裝 Java 1.8.x 或以上版本(推薦 1.8 版本)

  2. 節點兩兩之間可以 SSH 免密碼登陸

  3. 已部署 Hadoop(如果只是部署 Standalone Cluster 則不需要 Hadoop)

如果你已經按照 Hadoop 部署文檔成功建立了 Hadoop 集群,那麽以上條件均已滿足。

在 Flink 的下載頁面中有多個版本可以選擇,因為之前選擇了 Hadoop 2.7.7 版本,所以這裏選擇與之對應的 Apache Flink 1.7.2 with Hadoop 2.7

版本,Scala 版本選擇最新的 2.12。

cd ~/bigdata
#Apache網站上的鏡像太慢,從清華鏡像下載
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.12.tgz
#解壓到當前文件夾
tar -xzvf flink-1.7.2-bin-hadoop27-scala_2.12.tgz -C .

註意:如果只需要部署 Flink on YARN,那麽可以跳過這一小節,因為 YARN 會幫你打理好一切。

#切換到flink配置路徑
cd ./flink-1.7.2/conf

jobmanager.rpc.address 指向 master 節點,其他配置項可以按照機器實際硬件情況酌情填寫,此處使用默認值。

# The host/IP of JobManager
jobmanager.rpc.address: ivic10
# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

3.2 slaves

向 slaves 文件寫入 slave 節點的 host/IP 地址

ivic11
ivic12
#需要先在11/12節點上建立 /homne/ivic/bigdata/ 文件夾
scp -r /home/ivic/bigdata/flink-1.7.2 192.168.105.11:/home/ivic/bigdata/flink-1.7.2
scp -r /home/ivic/bigdata/flink-1.7.2 192.168.105.12:/home/ivic/bigdata/flink-1.7.2
cd $FLINK_HOME
./bin/start-cluster.sh

然後可以在 ivic10:8081 查看 Flink 集群的運行情況。

技術分享圖片

./examples 路徑下有許多打包好的示例程序,可以用於驗證 Flink 集群是否正常運行。

./bin/flink run ./examples/batch/WordCount.jar

上面的命令會向 Flink 集群提交一個 wordcount 任務,這個示例程序可以指定輸入和輸出路徑,這裏沒有指定,因此輸入文件為程序自帶的一小段文本,結果直接輸出在屏幕上。
如果 Flink 集群工作正常,應該會在屏幕上輸出以下結果:

# 省略前面的輸出
(wrong,1)
(you,1)
Program execution finished
Job with JobID d7df697505c1f68d4eda2828b6eb18e2 has finished.
Job Runtime: 3158 ms
Accumulator Results:
- 47b31488879a3449d67aca67f5b75188 (java.util.ArrayList) [170 elements]

把 Flink 運行在 YARN 上有兩種方式,第一種方式是建立一個長期運行的 Flink YARN Session,然後向這個 Session 提交 Flink Job,多個任務同時運行時會共享資源。第二種方式是為單個任務啟動一個 Flink 集群,這個任務會獨占 Flink 集群的所有資源,任務結束即代表集群被回收。

另外,Flink on YARN 模式需要系統中設置了 YARN_CONF_DIR 或 HADOOP_CONF_DIR 環境變量,如果未設置,請在 ~/.profile 中加入以下內容,然後使用 source ~/.profile 命令使修改立即生效。

#在這條命令前定義HADOOP_HOME環境變量
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

使用下列命令來啟動一個擁有 2 個 TaskManager 的 Flink 集群,每個 TaskManager 有 2 GB 內存,2 個 slot。

./bin/YARN-session.sh -n 2 -tm 2048 -s 2

完整的參數列表如下

Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <arg>                        Dynamic properties
     -d,--detached                   Start detached
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -nm,--name                      Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode

啟動 YARN Session 以後會輸出 JobManager 的 Web Interface 地址,打開以後是這樣的:

技術分享圖片

仔細一看,Task Managers,Task Slots 怎麽都是 0 呢?難道是哪裏出了問題?其實並沒有問題,從某個版本開始 Flink 允許動態分配資源,在沒有任務的時候不分配 TaskManager。接下來我們就提交一個任務試試。

因為啟動 YARN Session 以後 Flink Client 會一直在前臺運行,所以先用 Ctrl + Z 快捷鍵把 Client 轉到後臺,然後再提交任務。

./bin/flink run ./examples/batch/WordCount.jar

在任務運行期間觀察 Web Interface,會發現 Task Managers 變為 1,Task Slots 變為 2 ,與啟動集群時指定的參數不符,這是因為 YARN 集群中只有兩個 NodeManager,ivic11 和 ivic12,其中一個作為 JobManager,因此只剩一個節點可以作為 TaskManager。

任務的運行結果和 Standalone 模式下完全一樣。

下面這條命令會為 wordcount 任務啟動一個獨占的 Flink 集群,任務結束集群即被回收。其中 -m 選項指定 Flink 集群的啟動模式,-yn 選項指定 TaskManager 的數目。

./bin/flink run -m YARN-cluster -yn 2 ./examples/batch/WordCount.jar

任務的運行結果和 Standalone 模式下完全一樣。

7 參考

  1. Standalone Cluster Deployment
  2. YARN Setup
  3. Flink TaskManagers do not start in YARN Cluster

Flink 部署文檔