1. 程式人生 > 其它 >flink叢集安裝部署

flink叢集安裝部署

 flink叢集安裝部署

yarn叢集模式 

  • 快速開始
  1. 在yarn上啟動一個一直執行的flink叢集
  2. 在yarn上執行一個flink job
  • flink yarn session
  1. 啟動flink session
  2. 提交任務到flink
  • 在yarn上執行一個獨立的flink job
  1. 使用者依賴jar包和classpath
  • flink on yarn的故障恢復
  • 除錯一個失敗的yarn session
  1. 日誌檔案
  2. yarn client控制檯和web介面
  • 針對指定的hadoop版本構建yarn client
  • 在yarn上執行flink使用防火牆
  • flink on yarn 內部實現

快速開始

在yarn上啟動一個一直執行的flink叢集

啟動一個yarn session使用4個taskmanager(每個節點4GB記憶體)
注意:如果自己的虛擬機器沒有這麼大記憶體的話,可以吧-n設定小一點,對應的後面的記憶體-jm -tm也設定小一點,否則,如果記憶體不夠用,會導致啟動失敗。

tar xvzf flink-1.4.2-bin-hadoop2.tgz
cd flink-1.4.2/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

通過-s引數指定每一個taskmanager分配多少個slots(處理程序)。我們建議設定為每個機器的CPU核數。

一旦session建立成功,你可以使用./bin/flink工具向叢集提交任務。

在yarn上執行一個flink job

tar xvzf flink-1.4.2-bin-hadoop2.tgz
cd flink-1.4.2/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

flink yarn session

yarn是一個叢集資源管理框架。它執行在叢集之上執行各種分散式應用程式。flink像其他程式一樣,也可以在yarn上執行。使用者不需要設定或者安裝任何東西,如果已經有一個安裝配置好的yarn。

必須的依賴

  • 至少是hadoop2.2
  • hdfs(或者是其它支援hadoop的分散式檔案系統)

如果你在使用flink yarn client的時候有什麼問題,可以到這裡查詢答案

啟動flink session

按照下面的步驟來學習如何在yarn叢集上啟動一個flink session

一個session將會包含所有必須的flink 服務(jobmanager和taskmanager),這樣你就可以向這個叢集提交程式了。注意:每個session會話你可以執行多個程式。

下載flink

下載hadoop2對應的flink安裝包,點此下載。它包含了必須的檔案。

使用下面命令解壓:

tar xvzf flink-1.4.2-bin-hadoop2.tgz
cd flink-1.4.2/

啟動一個session

使用下面命令啟動一個session

./bin/yarn-session.sh

這個命令將會輸出下面內容:

用法:
   必選
     -n,--container <arg>   分配多少個yarn容器 (=taskmanager的數量)
   可選
     -D <arg>                        動態屬性
     -d,--detached                   獨立執行
     -jm,--jobManagerMemory <arg>    JobManager的記憶體 [in MB]
     -nm,--name                     在YARN上為一個自定義的應用設定一個名字
     -q,--query                      顯示yarn中可用的資源 (記憶體, cpu核數)
     -qu,--queue <arg>               指定YARN佇列.
     -s,--slots <arg>                每個TaskManager使用的slots數量
     -tm,--taskManagerMemory <arg>   每個TaskManager的記憶體 [in MB]
     -z,--zookeeperNamespace <arg>   針對HA模式在zookeeper上建立NameSpace

請注意:client必須要設定YARN_CONF_DIR或者HADOOP_CONF_DIR環境變數,通過這個環境變數來讀取YARN和HDFS的配置資訊,否則啟動會失敗。

經試驗發現,其實如果配置的有HADOOP_HOME環境變數的話也是可以的。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一個即可。

例子:下面的命令會申請10個taskmanager,每個8G記憶體和32個solt

./bin/yarn-session.sh -n 10 -tm 8192 -s 32

該系統預設會使用這個配置檔案:conf/flink-conf.yaml。如果你想修改一些引數,請檢視我們的配置指南

flink on yarn模式將會覆蓋一些配置檔案 jobmanager.rpc.address(因為jobmanager總是分配在不同的機器),taskmanager.tmp.dirs(我們使用yarn提供的臨時目錄)和parallelism.default 如果solts的數量已經被指定。

如果你不想修改配置檔案去改變引數,有一個選擇是通過動態的引數-D 來指定。所以你可以傳遞引數:-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624

上面的例子將會啟動11個容器(即使僅請求10個容器),因為有一個額外的容器來啟動ApplicationMaster 和 job manager

一旦flink在你的yarn叢集上部署,它將會顯示job manager的連線詳細資訊。

停止yarn session通過停止unix程序(使用CTRL+C)或者在client中輸入stop。

Flink on yarn只會啟動請求的資源,如果叢集資源充足。大多數yarn排程器請求容器的記憶體,一些也會請求cpu。預設,cpu的核數等於slots的數量,通過-s引數指定。這個引數yarn.containers.vcores的值允許使用一個自定義值來進行覆蓋。

後臺 yarn session

如果你不希望flink yarn client一直執行,也可以啟動一個後臺執行的yarn session。使用這個引數:-d 或者 --detached

在這種情況下,flink yarn client將會只提交任務到叢集然後關閉自己。注意:在這種情況下,無法使用flink停止yarn session。

使用yarn 工具 來停止yarn session

yarn application -kill <applicationId>

附著到一個已存在的session

使用下面命令啟動一個session

./bin/yarn-session.sh

執行這個命令將會顯示下面內容:

用法:
   必須
     -id,--applicationId <yarnAppId>        YARN叢集上的任務id

正如前面提到的,YARN_CONF_DIR或者HADOOP_CONF_DIR環境變數必須是可以讀取到YARN和HDFS配置的。

例如:發出下面命令可以附著到一個執行中的flink yarn session

./bin/yarn-session.sh -id application_1463870264508_0029

附著到一個執行的session使用yarn resourcemanager來確定job Manager 的RPC埠。

停止yarn session通過停止unix程序(使用CTRL+C)或者在client中輸入stop 

提交任務到flink

使用下面的命令提交一個flink程式到yarn叢集

./bin/flink

請參考客戶端命令列操作文件

這個命令將會向你展示一個這樣一個幫助選單

"run" 引數可以編譯和執行一個程式

用法: run [OPTIONS] <jar-file> <arguments>
  "run" 操作引數:
     -c,--class <classname>           如果沒有在jar包中指定入口類,則需要在這裡通過這個引數指定
 
     -m,--jobmanager <host:port>      指定需要連線的jobmanager(主節點)地址
                                      使用這個引數可以指定一個不同於配置檔案中的jobmanager
 
     -p,--parallelism <parallelism>   指定程式的並行度。可以覆蓋配置檔案中的預設值。

使用run 命令向yarn叢集提交一個job。客戶端可以確定jobmanager的地址。當然,你也可以通過-m引數指定jobmanager。jobmanager的地址在yarn控制檯上可以看到。

例子:【注意:下面的命令官網文件提供的有問題,執行失敗】

 

wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
【注意:下面的命令官網文件提供的有問題,執行失敗】
./bin/flink run ./examples/batch/WordCount.jar \
        hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
檢視flink原始碼發現,wordCount.jar可以不提供引數,或者提供引數,提供引數的時候需要使用input和output引數指定:
上面的命令需要修改為如下格式才能正常執行[傳遞的兩個引數需要使用-input和 -output來指定]
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/LICENSE-2.0.txt -output hdfs://hostname:port/wordcount-result.txt
 

如果有以下錯誤,確保所有taskmanager是否已經啟動:

Exception in thread "main" org.apache.flink.compiler.CompilerException:
    Available instances could not be determined from job manager: Connection timed out.

你可以在jobmanager的web介面上檢查taskmanager的數量。這個web介面的地址會列印在yarn session的控制檯上。

如果沒有發現taskmanager,你應該通過日誌檔案來檢查問題。

 

在yarn上執行一個獨立的flink job

這個文件描述瞭如何在一個hadoop yarn環境中啟動flink叢集。也可以在yarn中啟動只執行單個任務的flink。

請注意:client期望設定-yn 引數(taskmanager的數量)

例子:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

yarn session命令列的選項也可以使用./bin/flink 工具獲得。它們都有一個y或者yarn的字首

注意:通過為每個任務設定不同的環境變數 FLINK_CONF_DIR,可以為每個任務使用不同的配置目錄。從 Flink 分發包中複製 conf 目錄,然後修改配置,例如,每個任務不同的日誌設定

使用者依賴jar包和classpath

預設情況下,當執行一個獨立的job的時候,這個flink job將包含使用者依賴的jar包。可以通過引數yarn.per-job-cluster.include-user-jar來控制。

當設定為 DISABLED ,flink將會包含使用者classpath下面的jar包。

使用者jar包在類路徑中的位置可以通過下面引數來控制:

* ORDER: (預設) 新增jar包到系統類路徑下面,按照字典順序.
* FIRST: 將jar新增到類路徑的前面.
* LAST: 將jar新增到類路徑的最後.

flink on yarn的故障恢復

flink 的 yarn 客戶端通過下面的配置引數來控制容器的故障恢復。這些引數可以通過conf/flink-conf.yaml 或者在啟動yarn session的時候通過-D引數來指定。

  • yarn.reallocate-failed:這個引數控制了flink是否應該重新分配失敗的taskmanager容器。預設是true。
  • yarn.maximum-failed-containers:applicationMaster可以接受的容器最大失敗次數,達到這個引數,就會認為yarn session失敗。預設這個次數和初始化請求的taskmanager數量相等(-n 引數指定的)。
  • yarn.application-attempts:applicationMaster重試的次數。如果這個值被設定為1(預設就是1),當application master失敗的時候,yarn session也會失敗。設定一個比較大的值的話,yarn會嘗試重啟applicationMaster。 

除錯一個失敗的yarn session

一個flink yarn session部署失敗可能會有很多原因。一個錯誤的hadoop配置(hdfs 許可權,yarn配置),版本不相容(使用cdh中的hadoop執行flink),或者其他的錯誤。

日誌檔案

在某種情況下,flink yarn session 部署失敗是由於它自身的原因,使用者必須依賴於yarn的日誌來進行分析。最有用的就是yarn log aggregation 。啟動它,使用者必須在yarn-site.xml檔案中設定yarn.log-aggregation-enable 屬性為true。一旦啟用了,使用者可以通過下面的命令來檢視一個失敗的yarn session的所有詳細日誌。

yarn logs -applicationId <application ID>

yarn client控制檯和web介面

flink yarn client也會列印一些錯誤資訊在控制檯上,如果錯誤發生在執行時(例如如果一個taskmanager停止工作了一段時間)

除此之外,yarn resource manager的web介面(預設埠是8088)。resource manager的埠是通過yarn.resourcemanager.webapp.address引數來配置的。

它執行在yarn 程式執行的時候檢視日誌和程式失敗的時候檢視日誌使用者查詢問題。

針對指定的hadoop版本構建yarn client

使用者可以使用hadoop發行版。例如,hortonworks,CDH或者MapR等版本去構建 flink。請參考構建指南獲取詳細資訊

在yarn上執行flink使用防火牆

一些yarn 叢集使用防火牆來控制叢集的網路和其他網路的通訊。在這種設定下,flink只能通過叢集的網路來提交任務到yarn session。針對生產環境下使用是不可行的,flink允許配置所有相關服務的埠範圍,通過這些埠範圍的配置,使用者也可以透過防火牆來提交flink job。

目前,兩個服務都需要提交任務:

  • jobmanager(yarn中的applicationMaster)
  • jobmanager內部執行的blobserver

當向flink提交一個任務的時候,blobserver將會把使用者的程式碼分發到所有工作節點(taskManagers)。jobmanager接收任務本身,並觸發執行。

以下兩個配置引數可以指定埠:

  • yarn.application-master.port
  • blob.server.port

這兩個配置選項接收單一的埠(例如:"50010"),區間("50000-50025"),或者同時指定多個("50010,50011,50020-50025,50050-50075")。

(hadoop也使用的是類似的機制,例如:yarn.app.mapreduce.am.job.client.port-range)

flink on yarn 內部實現

本節主要描述flink和yarn是如何互動的

YARN 客戶端需要訪問 Hadoop 配置,從而連線 YARN 資源管理器和 HDFS。可以使用下面的策略來決定 Hadoop 配置:

  • 測試 YARN_CONF_DIR, HADOOP_CONF_DIR 或 HADOOP_CONF_PATH 環境變數是否設定了(按該順序測試)。如果它們中有一個被設定了,那麼它們就會用來讀取配置。
  • 如果上面的策略失敗了(如果正確安裝了 YARN 的話,這不應該會發生),客戶端會使用 HADOOP_HOME 環境變數。如果該變數設定了,客戶端會嘗試訪問 $HADOOP_HOME/etc/hadoop (Hadoop 2) 和 $HADOOP_HOME/conf(Hadoop 1)。

當啟動一個新的 Flink YARN Client會話,客戶端首先會檢查所請求的資源(容器和記憶體)是否可用。之後,它會上傳包含了 Flink 配置和 jar檔案到 HDFS(步驟 1)。

客戶端的下一步是請求(步驟 2)一個 YARN 容器啟動 ApplicationMaster (步驟 3)。因為客戶端將配置和jar 檔案作為容器的資源註冊了,所以執行在特定機器上的 YARN 的 NodeManager 會負責準備容器(例如,下載檔案)。一旦這些完成了,ApplicationMaster (AM) 就啟動了。

JobManager 和 AM 執行在同一個容器中。一旦它們成功地啟動了,AM 知道 JobManager 的地址(它自己)。它會為 TaskManager 生成一個新的 Flink 配置檔案(這樣它們才能連上 JobManager)。該檔案也同樣會上傳到 HDFS。另外,AM 容器同時提供了 Flink 的 Web 介面服務。Flink 用來提供服務的埠是由使用者 + 應用程式 id 作為偏移配置的。這使得使用者能夠並行執行多個 Flink YARN 會話。

之後,AM 開始為 Flink 的 TaskManager 分配容器,這會從 HDFS 下載 jar 檔案和修改過的配置檔案。一旦這些步驟完成了,Flink 就安裝完成並準備接受任務了。