Flink 部署文檔
- Flink 部署文檔
- 1 先決條件
- 2 下載 Flink 二進制文件
- 3 配置 Flink
- 3.1 flink-conf.yaml
- 3.2 slaves
- 4 將配置好的 Flink 分發到其他節點
- 5 以 Standalone 模式啟動 Flink
- 6 以 Flink on YARN 模式啟動
- 6.1 Flink YARN Session
- 6.2 Single Flink job on YARN
- 7 參考
本文檔中的集群包含 192.168.105.10/11/12 三臺機器。三臺機器的 hostname 分別設為 ivic10/ivic11/ivic12,其中第一臺機器作為 master,後兩臺作為 slaves。
1 先決條件
在部署 Flink 之前,請確認集群的每個節點都符合以下條件:
已安裝 Java 1.8.x 或以上版本(推薦 1.8 版本)
節點兩兩之間可以 SSH 免密碼登陸
已部署 Hadoop(如果只是部署 Standalone Cluster 則不需要 Hadoop)
如果你已經按照 Hadoop 部署文檔成功建立了 Hadoop 集群,那麽以上條件均已滿足。
2 下載 Flink 二進制文件
在 Flink 的下載頁面中有多個版本可以選擇,因為之前選擇了 Hadoop 2.7.7 版本,所以這裏選擇與之對應的 Apache Flink 1.7.2 with Hadoop 2.7
cd ~/bigdata
#Apache網站上的鏡像太慢,從清華鏡像下載
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.12.tgz
#解壓到當前文件夾
tar -xzvf flink-1.7.2-bin-hadoop27-scala_2.12.tgz -C .
3 配置 Flink
註意:如果只需要部署 Flink on YARN,那麽可以跳過這一小節,因為 YARN 會幫你打理好一切。
#切換到flink配置路徑 cd ./flink-1.7.2/conf
3.1 flink-conf.yaml
將 jobmanager.rpc.address
指向 master 節點,其他配置項可以按照機器實際硬件情況酌情填寫,此處使用默認值。
# The host/IP of JobManager
jobmanager.rpc.address: ivic10
# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
3.2 slaves
向 slaves 文件寫入 slave 節點的 host/IP 地址
ivic11
ivic12
4 將配置好的 Flink 分發到其他節點
#需要先在11/12節點上建立 /homne/ivic/bigdata/ 文件夾
scp -r /home/ivic/bigdata/flink-1.7.2 192.168.105.11:/home/ivic/bigdata/flink-1.7.2
scp -r /home/ivic/bigdata/flink-1.7.2 192.168.105.12:/home/ivic/bigdata/flink-1.7.2
5 以 Standalone 模式啟動 Flink
cd $FLINK_HOME
./bin/start-cluster.sh
然後可以在 ivic10:8081 查看 Flink 集群的運行情況。
./examples
路徑下有許多打包好的示例程序,可以用於驗證 Flink 集群是否正常運行。
./bin/flink run ./examples/batch/WordCount.jar
上面的命令會向 Flink 集群提交一個 wordcount 任務,這個示例程序可以指定輸入和輸出路徑,這裏沒有指定,因此輸入文件為程序自帶的一小段文本,結果直接輸出在屏幕上。
如果 Flink 集群工作正常,應該會在屏幕上輸出以下結果:
# 省略前面的輸出
(wrong,1)
(you,1)
Program execution finished
Job with JobID d7df697505c1f68d4eda2828b6eb18e2 has finished.
Job Runtime: 3158 ms
Accumulator Results:
- 47b31488879a3449d67aca67f5b75188 (java.util.ArrayList) [170 elements]
6 以 Flink on YARN 模式啟動
把 Flink 運行在 YARN 上有兩種方式,第一種方式是建立一個長期運行的 Flink YARN Session,然後向這個 Session 提交 Flink Job,多個任務同時運行時會共享資源。第二種方式是為單個任務啟動一個 Flink 集群,這個任務會獨占 Flink 集群的所有資源,任務結束即代表集群被回收。
另外,Flink on YARN 模式需要系統中設置了 YARN_CONF_DIR 或 HADOOP_CONF_DIR 環境變量,如果未設置,請在 ~/.profile
中加入以下內容,然後使用 source ~/.profile
命令使修改立即生效。
#在這條命令前定義HADOOP_HOME環境變量
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
6.1 Flink YARN Session
使用下列命令來啟動一個擁有 2 個 TaskManager 的 Flink 集群,每個 TaskManager 有 2 GB 內存,2 個 slot。
./bin/YARN-session.sh -n 2 -tm 2048 -s 2
完整的參數列表如下
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode
啟動 YARN Session 以後會輸出 JobManager 的 Web Interface 地址,打開以後是這樣的:
仔細一看,Task Managers,Task Slots 怎麽都是 0 呢?難道是哪裏出了問題?其實並沒有問題,從某個版本開始 Flink 允許動態分配資源,在沒有任務的時候不分配 TaskManager。接下來我們就提交一個任務試試。
因為啟動 YARN Session 以後 Flink Client 會一直在前臺運行,所以先用 Ctrl + Z
快捷鍵把 Client 轉到後臺,然後再提交任務。
./bin/flink run ./examples/batch/WordCount.jar
在任務運行期間觀察 Web Interface,會發現 Task Managers 變為 1,Task Slots 變為 2 ,與啟動集群時指定的參數不符,這是因為 YARN 集群中只有兩個 NodeManager,ivic11 和 ivic12,其中一個作為 JobManager,因此只剩一個節點可以作為 TaskManager。
任務的運行結果和 Standalone 模式下完全一樣。
6.2 Single Flink job on YARN
下面這條命令會為 wordcount 任務啟動一個獨占的 Flink 集群,任務結束集群即被回收。其中 -m 選項指定 Flink 集群的啟動模式,-yn 選項指定 TaskManager 的數目。
./bin/flink run -m YARN-cluster -yn 2 ./examples/batch/WordCount.jar
任務的運行結果和 Standalone 模式下完全一樣。
7 參考
- Standalone Cluster Deployment
- YARN Setup
- Flink TaskManagers do not start in YARN Cluster
Flink 部署文檔