flink開發實戰之 flink on yarn
flink 執行模式
Flink 和spark一樣有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。
實戰開發主要使用Yarn Cluster模式,所以本文主要介紹yarn 模式下flink任務的執行和資源分配。
Yarn Cluster 模式
在圖中可以看出,Flink 與 Yarn 的關係與 MapReduce 和 Yarn 的關係是一樣的。Flink 通過 Yarn 的介面實現了自己的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用自己的 Container 來啟動 Flink 的 JobManager
flink 任務提交到yarn上的全流程
第一步:
向資源管理器(ResourceManager)請求,要執行一個程式。中獲取新的作業ID(jobId),以及程式資源儲存路徑
第二步
ResourceManager檢查作業的輸出說明,然後返回一個存放程式資源的路徑以及jobId,這個路徑在hdfs的tmp資料夾中,如果程式中沒有指定輸出目錄或指定的輸出目錄已經存在,作業就不提交,錯誤返回給flink程式
就是這個路徑存放程式資源,包括程式的jar包,job的配置檔案等。
第三步
將作業資源(包括JAR、配置和資訊)複製到HDFS。
第五步:
通過呼叫資源管理器上的submitApplication()方法提交作業。
第六步
資源管理器收到呼叫它的submitApplication()訊息後,如果容器不夠,任務會現在等待佇列中等待,之後便將請求傳遞給排程器(Schedule),排程器分配一個容器,然後資源管理器在節點管理器的管理下在容器中啟動應用程式master程序也就是MRAPPMaster。
flink作業的application master是一個Java應用程式,它的主類是MRAPPMaster他對作業進行初始化,通過建立多個薄記物件以保持對作業進度的跟蹤,因為他將接受來自任務的進度和完成報告
第七步
MRAPPMaster根據配置資訊,獲知要啟動多少個TaskManger,向ResourceManager請求容器
第八步
一旦資源管理器的排程器為任務分配了容器,MRAPPMaster(application master) 就通過與節點管理器NodeManager通訊來啟動容器向已獲得容器的TaskManger發從啟動命令,也就是主類為YarnChild程式
Flink在yarn上執行兩種的模式
第一種:
一種是讓 Yarn 直接啟動 JobManager 和 TaskManager
在yarn上執行一個flink job
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar
第二種:
是在執行 Flink Workload 的時候啟動 Flink 的模組。前者相當於讓 Flink 的模組處於 Standby 的狀態。這裡,我也主要介紹下前者。
在下載和解壓 Flink 的安裝包之後,需要在環境中增加環境變數 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目錄。
export HADOOP_CONF_DIR=/etc/hadoop/conf
這是因為 Flink 實現了 Yarn 的 Client,因此需要 Yarn 的一些配置和 Jar 包。在配置好環境變數後,只需簡單的執行如下的指令碼,Yarn 就會啟動 Flink 的 JobManager 和 TaskManager。
先啟動叢集:
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -s 2
上面的意思是:向 Yarn 申請 2 個 Container 啟動 TaskManager(-n 2),每個 TaskManager 擁有兩個 Task Slot(-s 2),並且向每個 TaskManager 的 Container 申請 1024 的記憶體
再提交任務
./bin/flink run com.demo.florian.WordCount ./flink-demo-1.0-SNAPSHOT.jar
啟動session的指令引數
必選
-n,--container 分配多少個yarn容器 (=taskmanager的數量)
可選
-d,--detached 獨立執行
-jm,--jobManagerMemory JobManager的記憶體 [in MB]
-nm,--name 在YARN上為一個自定義的應用設定一個名字
-q,--query 顯示yarn中可用的資源 (記憶體, cpu核數)
-qu,--queue 指定YARN佇列.
-s,--slots 每個TaskManager使用的slots數量
-tm,--taskManagerMemory 每個TaskManager的記憶體 [in MB]
-z,--zookeeperNamespace 針對HA模式在zookeeper上建立NameSpace
run 提交任務的指令引數
-c,--class <classname> 如果沒有在jar包中指定入口類,則需要在這裡通過這個引數指定
-m,--jobmanager <host:port> 指定需要連線的jobmanager(主節點)地址
使用這個引數可以指定一個不同於配置檔案中的jobmanager
-p,--parallelism <parallelism> 指定程式的並行度。可以覆蓋配置檔案中的預設值。
slot和parallelism
1.slot是指taskmanager的併發執行能力
在hadoop 1.x 版本中也有slot的概念,有興趣的讀者可以瞭解一下
taskmanager.numberOfTaskSlots:3
每一個taskmanager中的分配3個TaskSlot,3個taskmanager一共有9個TaskSlot
2.parallelism是指taskmanager實際使用的併發能力
parallelism.default:1
執行程式預設的並行度為1,9個TaskSlot只用了1個,有8個空閒。設定合適的並行度才能提高效率。
3.parallelism是可配置、可指定的
1.可以通過修改$FLINK_HOME/conf/flink-conf.yaml檔案的方式更改並行度。
2.可以通過設定$FLINK_HOME/bin/flink 的-p引數修改並行度
3.可以通過設定executionEnvironmentk的方法修改並行度
4.可以通過設定flink的程式設計API修改過並行度
5.這些並行度設定優先順序從低到高排序,排序為api>env>p>file.
6.設定合適的並行度,能提高運算效率
7.parallelism不能多與slot個數。
4.slot和parallelism總結
1.slot是靜態的概念,是指taskmanager具有的併發執行能力
2.parallelism是動態的概念,是指程式執行時實際使用的併發能力
3.設定合適的parallelism能提高運算效率,太多了和太少了都不行
4.設定parallelism有多中方式,優先順序為api>env>p>file