1. 程式人生 > 程式設計 >Flink 系列(八)—— Flink Standalone 叢集部署

Flink 系列(八)—— Flink Standalone 叢集部署

一、部署模式

Flink 支援使用多種部署模式來滿足不同規模應用的需求,常見的有單機模式,Standalone Cluster 模式,同時 Flink 也支援部署在其他第三方平臺上,如 YARN,Mesos,Docker,Kubernetes 等。以下主要介紹其單機模式和 Standalone Cluster 模式的部署。

二、單機模式

單機模式是一種開箱即用的模式,可以在單臺伺服器上執行,適用於日常的開發和除錯。具體操作步驟如下:

2.1 安裝部署

1. 前置條件

Flink 的執行依賴 JAVA 環境,故需要預先安裝好 JDK,具體步驟可以參考:Linux 環境下 JDK 安裝

2. 下載 & 解壓 & 執行

Flink 所有版本的安裝包可以直接從其官網進行下載,這裡我下載的 Flink 的版本為 1.9.1 ,要求的 JDK 版本為 1.8.x +。 下載後解壓到指定目錄:

tar -zxvf flink-1.9.1-bin-scala_2.12.tgz  -C /usr/app
複製程式碼

不需要進行任何配置,直接使用以下命令就可以啟動單機版本的 Flink:

bin/start-cluster.sh
複製程式碼

3. WEB UI 介面

Flink 提供了 WEB 介面用於直觀的管理 Flink 叢集,訪問埠為 8081

https://github.com/heibaiying

Flink 的 WEB UI 介面支援大多數常用功能,如提交作業,取消作業,檢視各個節點執行情況,檢視作業執行情況等,大家可以在部署完成後,進入該頁面進行詳細的瀏覽。

2.2 作業提交

啟動後可以執行安裝包中自帶的詞頻統計案例,具體步驟如下:

1. 開啟埠

nc -lk 9999
複製程式碼

2. 提交作業

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
複製程式碼

該 JAR 包的原始碼可以在 Flink 官方的 GitHub 倉庫中找到,地址為 :SocketWindowWordCount ,可選傳參有 hostname, port,對應的詞頻資料需要使用空格進行分割。

3. 輸入測試資料

a a b b c c c a e
複製程式碼

4. 檢視控制檯輸出

可以通過 WEB UI 的控制檯檢視作業統執行情況:

https://github.com/heibaiying

也可以通過 WEB 控制檯檢視到統計結果:

https://github.com/heibaiying

2.3 停止作業

可以直接在 WEB 介面上點選對應作業的 Cancel Job 按鈕進行取消,也可以使用命令列進行取消。使用命令列進行取消時,需要先獲取到作業的 JobId,可以使用 flink list 命令檢視,輸出如下:

[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
複製程式碼

獲取到 JobId 後,就可以使用 flink cancel 命令取消作業:

bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc
複製程式碼

2.4 停止 Flink

命令如下:

bin/stop-cluster.sh
複製程式碼

三、Standalone Cluster

Standalone Cluster 模式是 Flink 自帶的一種叢集模式,具體配置步驟如下:

3.1 前置條件

使用該模式前,需要確保所有伺服器間都已經配置好 SSH 免密登入服務。這裡我以三臺伺服器為例,主機名分別為 hadoop001,hadoop002,hadoop003,其中 hadoop001 為 master 節點,其餘兩臺為 slave 節點,搭建步驟如下:

3.2 搭建步驟

修改 conf/flink-conf.yaml 中 jobmanager 節點的通訊地址為 hadoop001:

jobmanager.rpc.address: hadoop001
複製程式碼

修改 conf/slaves 配置檔案,將 hadoop002 和 hadoop003 配置為 slave 節點:

hadoop002
hadoop003
複製程式碼

將配置好的 Flink 安裝包分發到其他兩臺伺服器上:

 scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
 scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app
複製程式碼

在 hadoop001 上使用和單機模式相同的命令來啟動叢集:

bin/start-cluster.sh
複製程式碼

此時控制檯輸出如下:

https://github.com/heibaiying

啟動完成後可以使用 Jps 命令或者通過 WEB 介面來檢視是否啟動成功。

3.3 可選配置

除了上面介紹的 jobmanager.rpc.address 是必選配置外,Flink h還支援使用其他可選引數來優化叢集效能,主要如下:

  • jobmanager.heap.size:JobManager 的 JVM 堆記憶體大小,預設為 1024m 。
  • taskmanager.heap.size:Taskmanager 的 JVM 堆記憶體大小,預設為 1024m 。
  • taskmanager.numberOfTaskSlots:Taskmanager 上 slots 的數量,通常設定為 CPU 核心的數量,或其一半。
  • parallelism.default:任務預設的並行度。
  • io.tmp.dirs:儲存臨時檔案的路徑,如果沒有配置,則預設採用伺服器的臨時目錄,如 LInux 的 /tmp 目錄。

更多配置可以參考 Flink 的官方手冊:Configuration

四、Standalone Cluster HA

上面我們配置的 Standalone 叢集實際上只有一個 JobManager,此時是存在單點故障的,所以官方提供了 Standalone Cluster HA 模式來實現叢集高可用。

4.1 前置條件

在 Standalone Cluster HA 模式下,叢集可以由多個 JobManager,但只有一個處於 active 狀態,其餘的則處於備用狀態,Flink 使用 ZooKeeper 來選舉出 Active JobManager,並依賴其來提供一致性協調服務,所以需要預先安裝 ZooKeeper 。

另外在高可用模式下,還需要使用分散式檔案系統來持久化儲存 JobManager 的元資料,最常用的就是 HDFS,所以 Hadoop 也需要預先安裝。關於 Hadoop 叢集和 ZooKeeper 叢集的搭建可以參考:

4.2 搭建步驟

修改 conf/flink-conf.yaml 檔案,增加如下配置:

# 配置使用zookeeper來開啟高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,採用zookeeper叢集時,可以使用逗號來分隔多個節點地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上儲存flink叢集元資訊的路徑
high-availability.zookeeper.path.root: /flink
# 叢集id
high-availability.cluster-id: /standalone_cluster_one
# 持久化儲存JobManager元資料的地址,zookeeper上儲存的只是指向該元資料的指標資訊
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery
複製程式碼

修改 conf/masters 檔案,將 hadoop001 和 hadoop002 都配置為 master 節點:

hadoop001:8081
hadoop002:8081
複製程式碼

確保 Hadoop 和 ZooKeeper 已經啟動後,使用以下命令來啟動叢集:

bin/start-cluster.sh
複製程式碼

此時輸出如下:

https://github.com/heibaiying

可以看到叢集已經以 HA 的模式啟動,此時還需要在各個節點上使用 jps 命令來檢視程式是否啟動成功,正常情況如下:

https://github.com/heibaiying

只有 hadoop001 和 hadoop002 的 JobManager 程式,hadoop002 和 hadoop003 上的 TaskManager 程式都已經完全啟動,才表示 Standalone Cluster HA 模式搭建成功。

4.3 常見異常

如果程式沒有啟動,可以通過檢視 log 目錄下的日誌來定位錯誤,常見的一個錯誤如下:

2019-11-05 09:18:35,877 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint      
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file 
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no 
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in 
the classpath/dependencies.
......
複製程式碼

可以看到是因為在 classpath 目錄下找不到 Hadoop 的相關依賴,此時需要檢查是否在環境變數中配置了 Hadoop 的安裝路徑,如果路徑已經配置但仍然存在上面的問題,可以從 Flink 官網下載對應版本的 Hadoop 元件包:

https://github.com/heibaiying

下載完成後,將該 JAR 包上傳至所有 Flink 安裝目錄的 lib 目錄即可。

參考資料

更多大資料系列文章可以參見 GitHub 開源專案大資料入門指南