Flink on Yarn部署
環境資訊:
Hadoop版本:2.6.0
Flink版本:1.1.2
快速部署Flink on Yarn環境:
比如啟動一個有4個TaskManager(每個節點都有4GB堆記憶體)的Yarn會話:
1. 下載Flink的軟體包,如flink-1.1.2-bin-hadoop26-scala_2.11.tgz(因為我的Hadoop叢集版本為2.6.0,Flink的Binary包是包含Yarn客戶端的)
2. 解壓縮
su - hadoop
tar -zxvf flink-1.1.2-bin-hadoop26-scala_2.11.tgz
ln -s flink-1.1.2 flink
3. 啟動Flink Yarn Session
cd flink
bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -s 32
對引數說明:
-n,--container 指YARN container分配的個數(即TaskManagers的個數)
-jm,--jobManagerMemory 指JobManager Containe的記憶體大小,單位為MB
-tm,--taskManagerMemory 指每個TaskManagerContainer的記憶體大小,單位為MB
-s 指每個TaskManager的slot個數。
執行上面命令來分配 4個 TaskManager,每個都擁有 4GB 的記憶體和 32 個 slot,同時會請求啟動 5 個容器,因為對於 ApplicationMaster 和 JobManager 還需要一個額外的容器。
注:
A. Flink的JobManager和TaskManager的記憶體大小不要小於YARNContainer的最小值(yarn.scheduler.minimum-allocation-mb,預設值為1024MB)。
B. 請注意客戶端需要提前設定環境變數 YARN_CONF_DIR 或 HADOOP_CONF_DIR,用來讀取 YARN 和 HDFS 配置。
啟動的日誌中會連線Yarn的ResourceManager,如下:
org.apache.flink.yarn.YarnClusterDescriptor- Using values:
org.apache.flink.yarn.YarnClusterDescriptor- TaskManager count = 4
org.apache.flink.yarn.YarnClusterDescriptor- JobManager memory = 1024
org.apache.flink.yarn.YarnClusterDescriptor- TaskManager memory = 4096
org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at /192.168.1.128:9080
這裡省略將Flink的配置檔案和Jar等上傳到
hdfs://gpmaster:9000/user/hadoop/.flink/application_1474521395841_0004/目錄下的過程
org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1474521395841_0004
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl -Submitted application application_1474521395841_0004
org.apache.flink.yarn.YarnClusterDescriptor- Waiting for the cluster to be allocated
org.apache.flink.yarn.YarnClusterDescriptor- Deploying cluster, current state ACCEPTED
org.apache.flink.yarn.YarnClusterDescriptor - YARN application has been deployed successfully.
Flink JobManageris now running on 192.168.1.128:17642
JobManager Web Interface: http://gpmaster:8088/proxy/application_1474521395841_0004/
org.apache.flink.yarn.YarnClusterClient- Starting client actor system.
akka.event.slf4j.Slf4jLogger- Slf4jLogger started
Remoting - Starting remoting
Remoting - Remoting started; listening on addresses :[akka.tcp://[email protected]:18282]
org.apache.flink.yarn.YarnClusterClient - Start application client.
org.apache.flink.yarn.ApplicationClient - Notification about new leader address akka.tcp://[email protected]:17642/user/jobmanagerwith session ID null.
org.apache.flink.yarn.ApplicationClient - Received address of new leader akka.tcp://[email protected]:17642/user/jobmanager with sessionID null.
org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null.
org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://[email protected]:17642/user/jobmanager.
org.apache.flink.yarn.ApplicationClient - Successfully registered at the ResourceManager using JobManagerActor[akka.tcp://[email protected]:17642/user/jobmanager#-1966299512]
Number of connected TaskManagers changed to 1. Slots available: 32
Number of connected TaskManagers changed to 2. Slots available: 64
Number of connected TaskManagers changed to 4. Slots available: 128
此時的Flink YARN客戶端會一直執行,不會退出。如果你希望放到後臺執行,那麼可以使用-d或--detached引數,即:
bin/yarn-session.sh -d -n 4 -jm 1024 -tm 4096
在這種情況下,Flink YARN 客戶端只會提交 Flink 到叢集中然後關閉自己。注意在這種情況下,不能像上面這樣停止 YARN 會話了,必須手動停止,如下面日誌中提示的內容:
yarn application -kill application_1474521395841_0008
Flink YARN客戶端以detached模式啟動,我們也可以從這種啟動方式的日誌中檢視到如下內容:
org.apache.flink.yarn.cli.FlinkYarnSessionCli - The Flink YARN client has been started in detached mode.In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1474521395841_0008
Please also note that the temporary files of the YARN session in hdfs://gpmaster:9000/user/hadoop/.flink/application_1474521395841_0008 will not be removed.
既然Flink on Yarn模式啟動了,下面我們檢視一下相關的程序。
在Hadoop的ResourceManager節點檢視程序:
[[email protected]~]$ jps
9062 YarnTaskManager
8872 FlinkYarnSessionCli
8985 YarnApplicationMasterRunner
9196 Jps
5325 ResourceManager
4990 NameNode
5086 DataNode
5423 NodeManager
在Hadoop的NodeManager節點檢視程序:
[[email protected]~]$ jps
6208 YarnTaskManager
4336 DataNode
6327 Jps
4441 NodeManager
4. 執行Flink的example例項
上面我們已經基於Yarn啟動了Flink,我們來測試一個例子:
[[email protected] flink]$ bin/flink run ./examples/batch/WordCount.jar
返回的結果為:
(action,1)
(after,1)
(against,1)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
……
使用 run 操作來提交一個任務到 YARN。客戶端自己就能確定 JobManager 的地址。在遇到罕見的問題時,你可以使用 -m 引數傳入 JobManager 的地址。JobManager 的地址可以在 YARN 控制檯找到。
5. 在YARN上直接執行單個Flink任務
上面介紹了在 Hadoop YARN 環境中啟動一個 Flink 叢集。另外,也可以在 YARN 中啟動只執行單個任務的 Flink。請注意該客戶端需要提供 -yn 引數值(TaskManager 的數量)。
這裡不需要先啟動Flink服務後,再執行Flink程式。
[[email protected] flink]$ bin/flink run -m yarn-cluster -yn2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
輸出結果為:
(action,1)
(after,1)
(against,1)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
……
相關引數說明:
-m,--jobmanager <host:port> 指需要連線的JobManager地址
-yn,--yarncontainer 指分配的YARN container個數(等於TaskManagers個數)
-yjm,--yarnjobManagerMemory 指JobManager Container的記憶體大小,單位為MB
-ytm,--yarntaskManagerMemory 指每個TaskManagerContainer的記憶體大小,單位為MB
可以看到對於yarn-cluster模式來說,這些選項都帶了一個 y 或 yarn (長引數選項)的字首。
注:通過為每個任務設定不同的環境變數 FLINK_CONF_DIR,可以為每個任務使用不同的配置目錄。從 Flink 分發包中複製 conf 目錄,然後修改配置,例如,每個任務不同的日誌設定。
6. Flink YARN會話
YARN 是一個叢集資源管理框架。它允許在一個叢集之上執行多種分散式應用。Flink 與其他應用程式一同執行在 YARN 之上。如果使用者已經安裝 YARN,就不需要安裝其他東西了。
上面的操作中,我們在YARN叢集中啟動了一個Flink會話,此會話會啟動所有需要的 Flink 服務(JobManager 和 TaskManager),因此可以提交程式到叢集中。注意每個會話都可以執行多個程式。
Flink YARN會話啟動時,系統會使用 conf/flink-config.yaml 中的配置。
Flink on YARN 會覆蓋這些配置引數 jobmanager.rpc.address(因為JobManager 一直被分配在不同的機器上),taskmanager.tmp.dirs(我們使用 YARN 提供了的 tmp 目錄)和 parallelism.default(如果指定了 slot 數目)。
如果你不想通過修改配置檔案的方法來設定配置引數,你可以通過 -D 標記傳入動態屬性。所以你可以這樣傳遞引數,如下:
-Dfs.overwrite-files=true-Dtaskmanager.network.numberOfBuffers=16368
一旦 Flink 在 YARN 叢集中部署了,它會顯示 JobManager 連線的詳細資訊。
要停止 YARN 會話,可以通過結束 unix 程序(使用 CTRL+C)或者通過在客戶端中輸入'stop'。
7. Flink on YARN的恢復機制
Flink的YARN客戶端可以通過引數來控制容器出現故障情況下的行為,這些引數定義在conf/flink-conf.yaml來設定,當然也可以通過啟動YARN會話時通過-D引數進行設定。
yarn.reallocate-failed:該引數控制了 Flink 是否該重新分配失敗的 TaskManager容器。預設:true。
yarn.maximum-failed-containers:ApplicationMaster能接受最多的失敗容器的數量,直到 YARN 會話失敗。預設:初始請求的 TaskManager 個數(-n)。
yarn.application-attempts:ApplicationMaster(以及它的TaskManager 容器)的嘗試次數。預設值為 1,當 ApplicationMaster 失敗了,整個 YARN 會話也會失敗。可以通過設定更大的值來更改 YARN 重啟ApplicationMaster 的次數。
8. Flink on YARN內部實現
YARN 客戶端需要訪問 Hadoop 配置,從而連線 YARN 資源管理器和 HDFS。可以使用下面的策略來決定 Hadoop 配置:
- 測試 YARN_CONF_DIR, HADOOP_CONF_DIR 或 HADOOP_CONF_PATH 環境變數是否設定了(按該順序測試)。如果它們中有一個被設定了,那麼它們就會用來讀取配置。
- 如果上面的策略失敗了(如果正確安裝了 YARN 的話,這不應該會發生),客戶端會使用 HADOOP_HOME 環境變數。如果該變數設定了,客戶端會嘗試訪問 $HADOOP_HOME/etc/hadoop (Hadoop 2) 和 $HADOOP_HOME/conf(Hadoop 1)。
當啟動一個新的 Flink YARN Client會話,客戶端首先會檢查所請求的資源(容器和記憶體)是否可用。之後,它會上傳包含了 Flink 配置和 jar檔案到 HDFS(步驟 1)。
客戶端的下一步是請求(步驟 2)一個 YARN 容器啟動 ApplicationMaster (步驟 3)。因為客戶端將配置和jar 檔案作為容器的資源註冊了,所以執行在特定機器上的 YARN 的 NodeManager 會負責準備容器(例如,下載檔案)。一旦這些完成了,ApplicationMaster (AM) 就啟動了。
JobManager 和 AM 執行在同一個容器中。一旦它們成功地啟動了,AM 知道 JobManager 的地址(它自己)。它會為 TaskManager 生成一個新的 Flink 配置檔案(這樣它們才能連上 JobManager)。該檔案也同樣會上傳到 HDFS。另外,AM 容器同時提供了 Flink 的 Web 介面服務。Flink 用來提供服務的埠是由使用者 + 應用程式 id 作為偏移配置的。這使得使用者能夠並行執行多個 Flink YARN 會話。
之後,AM 開始為 Flink 的 TaskManager 分配容器,這會從 HDFS 下載 jar 檔案和修改過的配置檔案。一旦這些步驟完成了,Flink 就安裝完成並準備接受任務了。
相關推薦
Flink on Yarn部署
環境資訊:Hadoop版本:2.6.0Flink版本:1.1.2快速部署Flink on Yarn環境:比如啟動一個有4個TaskManager(每個節點都有4GB堆記憶體)的Yarn會話:1. 下載Flink的軟體包,如flink-1.1.2-bin-hadoop
flink部署操作-flink on yarn叢集安裝部署
flink叢集安裝部署 yarn叢集模式 Flink入門及實戰-上: Flink入門及實戰-下: 快速開始 在yarn上啟動一個一直執行的flink叢集 在yarn上執行一個flink job flink yarn session 啟動flink s
Flink on Yarn三部曲之二:部署和設定
### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; 本文是《
Flink1.6系列之—Flink on yarn流程詳解
端口 準備 -a 根據 images mas info 使用 臨時 本篇我們介紹下,Flink在YARN上運行流程: 當開始一個新的Flink yarn 會話時,客戶端首先檢查所請求的資源(containers和內存)是否可用。如果資源夠用,之後,上傳
Flink on Yarn模式啟動流程分析
cin XML images ont list action -i 多個 信息 此文已由作者嶽猛授權網易雲社區發布。歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。Flink On Yarn 架構Paste_Image.png前提條件首先需要配置YARN_CONF_DIR
Flink on Yarn模式啟動流程原始碼分析
此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從原始碼角度看下這個實現,可能有的地方理解有誤,請給予指正,多謝。 --> 1.命令列啟動yarn sessi
Flink on Yarn模式啟動流程源代碼分析
www and *** err wap `` dem 註冊 contex 此文已由作者嶽猛授權網易雲社區發布。歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從源碼角度看
flink on yarn部分原始碼解析 (FLIP-6 new mode)
我們在https://www.cnblogs.com/dongxiao-yang/p/9403427.html文章裡分析了flink提交single job到yarn叢集上的程式碼,flink在1.5版本後對整個框架的deploy方式重構了全新的流程(參考https://cwiki.apache.org/co
flink開發實戰之 flink on yarn
flink 執行模式 Flink 和spark一樣有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。 實戰開發主要使用Yarn Cluster模式,所以本文主要介紹yarn 模式下flink任務的執行和資源分配。 Ya
Flink on yarn的問題:Invalid AMRMToken
目前採用的Flink的版本是1.4.2,執行在yarn上,總是時不時的報錯“Invalid AMRMToken from appattempt”,導致AM掛掉。 簡而言之,就是AM和RM溝通的過程中,突然AM提供的Token不被認可,導致拒絕連線,進而AM掛掉。 後來發現早
Flink-on-yarn
解壓 stat master swd run rec 地址 abi man 介紹 官網下載 https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-hadoop28-scala_
flink on yarn叢集搭建
前面一篇部落格中已經搭建了flink Standalone的叢集,需要的可以進去看一下,今天主要來說一下flink on yarn 叢集的搭建以及怎麼提交任務.這篇部落格寫的比較詳細,內容較多,希望大家耐心看完,都是乾貨. 版本資訊: flink-1.6.0 zooke
Flink之三 flink on yarn
Flink的執行模式 flink的執行模式有local模式,cluster,yarn等模式;flink叢集層次結構 這一節我們主要一起了解flink on yarn 模式,flink on yarn 有兩種模式: 一:long-running F
Flink on YARN快速入門指南
Apache Flink是一個高效、分散式、基於Java和Scala(主要是由Java實現)實現的通用大資料分析引擎,它具有分散式 MapReduce一類平臺的高效性、靈活性和擴充套件性以及並行資料庫查詢優化方案,它支援批量和基於流的資料分析,且提供了基於Ja
flink on yarn模式
flink on yarn模式的相關知識點(重要):https://blog.csdn.net/xu470438000/article/details/79576989 在flink on yarn模式中,flink yarn-session的兩種提交方式 兩種提交方式 1
Flink on yarn
一: 配置: 1.配置yarn-site.xml <property><name>yarn.resourcemanager.am.max-attempts</name><value>4</value></property> 2.配置
Hadoop 分散式配置及Spark on yarn部署
配置Hadoop Hadoop的叢集部署模式需要修改Hadoop資料夾中/etc/hadoop/中的配置檔案,更多設定項可見官方說明,這裡只設置了常見的設定項:hadoop-env.sh,yarn-env.sh、core-site.xml、hdfs-site.xml、mapred-site.
最近寫Flink on Yarn程式遇到的一些問題
1.UDF造成的compile 編譯失敗 class GetDay() extends ScalarFunction{ // 這個變數千萬不能定義在這裡,否則除錯沒問題,on yarn執行會編譯出錯
Flink On Yarn 異常排除過程以及根據位元組碼名字獲取jar檔名字
最初學習Flink,寫了一個簡單的wordcount執行一下,發現報錯,異常資訊如下: The program finished with the following exception: java.lang.RuntimeException: Error deployin
storm on yarn 部署
1. 環境介紹 1.1 節點與服務對映關係 ip host 服務 192.168.40.132 master Namenode、NodeManager、DataNode、zookeeper 192.168.40.133 slave1 ResurceMa