Flink 系列(八)—— Flink Standalone 叢集部署
一、部署模式
Flink 支援使用多種部署模式來滿足不同規模應用的需求,常見的有單機模式,Standalone Cluster 模式,同時 Flink 也支援部署在其他第三方平臺上,如 YARN,Mesos,Docker,Kubernetes 等。以下主要介紹其單機模式和 Standalone Cluster 模式的部署。
二、單機模式
單機模式是一種開箱即用的模式,可以在單臺伺服器上執行,適用於日常的開發和除錯。具體操作步驟如下:
2.1 安裝部署
1. 前置條件
Flink 的執行依賴 JAVA 環境,故需要預先安裝好 JDK,具體步驟可以參考:Linux 環境下 JDK 安裝
2. 下載 & 解壓 & 執行
Flink 所有版本的安裝包可以直接從其官網進行下載,這裡我下載的 Flink 的版本為 1.9.1
,要求的 JDK 版本為 1.8.x +
。 下載後解壓到指定目錄:
tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C /usr/app
複製程式碼
不需要進行任何配置,直接使用以下命令就可以啟動單機版本的 Flink:
bin/start-cluster.sh
複製程式碼
3. WEB UI 介面
Flink 提供了 WEB 介面用於直觀的管理 Flink 叢集,訪問埠為 8081
:
Flink 的 WEB UI 介面支援大多數常用功能,如提交作業,取消作業,檢視各個節點執行情況,檢視作業執行情況等,大家可以在部署完成後,進入該頁面進行詳細的瀏覽。
2.2 作業提交
啟動後可以執行安裝包中自帶的詞頻統計案例,具體步驟如下:
1. 開啟埠
nc -lk 9999
複製程式碼
2. 提交作業
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
複製程式碼
該 JAR 包的原始碼可以在 Flink 官方的 GitHub 倉庫中找到,地址為 :SocketWindowWordCount ,可選傳參有 hostname, port,對應的詞頻資料需要使用空格進行分割。
3. 輸入測試資料
a a b b c c c a e
複製程式碼
4. 檢視控制檯輸出
可以通過 WEB UI 的控制檯檢視作業統執行情況:
也可以通過 WEB 控制檯檢視到統計結果:
2.3 停止作業
可以直接在 WEB 介面上點選對應作業的 Cancel Job
按鈕進行取消,也可以使用命令列進行取消。使用命令列進行取消時,需要先獲取到作業的 JobId,可以使用 flink list
命令檢視,輸出如下:
[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
複製程式碼
獲取到 JobId 後,就可以使用 flink cancel
命令取消作業:
bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc
複製程式碼
2.4 停止 Flink
命令如下:
bin/stop-cluster.sh
複製程式碼
三、Standalone Cluster
Standalone Cluster 模式是 Flink 自帶的一種叢集模式,具體配置步驟如下:
3.1 前置條件
使用該模式前,需要確保所有伺服器間都已經配置好 SSH 免密登入服務。這裡我以三臺伺服器為例,主機名分別為 hadoop001,hadoop002,hadoop003,其中 hadoop001 為 master 節點,其餘兩臺為 slave 節點,搭建步驟如下:
3.2 搭建步驟
修改 conf/flink-conf.yaml
中 jobmanager 節點的通訊地址為 hadoop001:
jobmanager.rpc.address: hadoop001
複製程式碼
修改 conf/slaves
配置檔案,將 hadoop002 和 hadoop003 配置為 slave 節點:
hadoop002
hadoop003
複製程式碼
將配置好的 Flink 安裝包分發到其他兩臺伺服器上:
scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app
複製程式碼
在 hadoop001 上使用和單機模式相同的命令來啟動叢集:
bin/start-cluster.sh
複製程式碼
此時控制檯輸出如下:
啟動完成後可以使用 Jps
命令或者通過 WEB 介面來檢視是否啟動成功。
3.3 可選配置
除了上面介紹的 jobmanager.rpc.address 是必選配置外,Flink h還支援使用其他可選引數來優化叢集效能,主要如下:
- jobmanager.heap.size:JobManager 的 JVM 堆記憶體大小,預設為 1024m 。
- taskmanager.heap.size:Taskmanager 的 JVM 堆記憶體大小,預設為 1024m 。
- taskmanager.numberOfTaskSlots:Taskmanager 上 slots 的數量,通常設定為 CPU 核心的數量,或其一半。
- parallelism.default:任務預設的並行度。
-
io.tmp.dirs:儲存臨時檔案的路徑,如果沒有配置,則預設採用伺服器的臨時目錄,如 LInux 的
/tmp
目錄。
更多配置可以參考 Flink 的官方手冊:Configuration
四、Standalone Cluster HA
上面我們配置的 Standalone 叢集實際上只有一個 JobManager,此時是存在單點故障的,所以官方提供了 Standalone Cluster HA 模式來實現叢集高可用。
4.1 前置條件
在 Standalone Cluster HA 模式下,叢集可以由多個 JobManager,但只有一個處於 active 狀態,其餘的則處於備用狀態,Flink 使用 ZooKeeper 來選舉出 Active JobManager,並依賴其來提供一致性協調服務,所以需要預先安裝 ZooKeeper 。
另外在高可用模式下,還需要使用分散式檔案系統來持久化儲存 JobManager 的元資料,最常用的就是 HDFS,所以 Hadoop 也需要預先安裝。關於 Hadoop 叢集和 ZooKeeper 叢集的搭建可以參考:
4.2 搭建步驟
修改 conf/flink-conf.yaml
檔案,增加如下配置:
# 配置使用zookeeper來開啟高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,採用zookeeper叢集時,可以使用逗號來分隔多個節點地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上儲存flink叢集元資訊的路徑
high-availability.zookeeper.path.root: /flink
# 叢集id
high-availability.cluster-id: /standalone_cluster_one
# 持久化儲存JobManager元資料的地址,zookeeper上儲存的只是指向該元資料的指標資訊
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery
複製程式碼
修改 conf/masters
檔案,將 hadoop001 和 hadoop002 都配置為 master 節點:
hadoop001:8081
hadoop002:8081
複製程式碼
確保 Hadoop 和 ZooKeeper 已經啟動後,使用以下命令來啟動叢集:
bin/start-cluster.sh
複製程式碼
此時輸出如下:
可以看到叢集已經以 HA 的模式啟動,此時還需要在各個節點上使用 jps
命令來檢視程式是否啟動成功,正常情況如下:
只有 hadoop001 和 hadoop002 的 JobManager 程式,hadoop002 和 hadoop003 上的 TaskManager 程式都已經完全啟動,才表示 Standalone Cluster HA 模式搭建成功。
4.3 常見異常
如果程式沒有啟動,可以通過檢視 log
目錄下的日誌來定位錯誤,常見的一個錯誤如下:
2019-11-05 09:18:35,877 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in
the classpath/dependencies.
......
複製程式碼
可以看到是因為在 classpath 目錄下找不到 Hadoop 的相關依賴,此時需要檢查是否在環境變數中配置了 Hadoop 的安裝路徑,如果路徑已經配置但仍然存在上面的問題,可以從 Flink 官網下載對應版本的 Hadoop 元件包:
下載完成後,將該 JAR 包上傳至所有 Flink 安裝目錄的 lib
目錄即可。
參考資料
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南