如何實現生產環境中的Flink高可用
如何實現生產環境中的Flink高可用
叢集高可用(High Availability,HA)配置是大資料領域經典的一個問題。通常HA用來描述一個系統經過專門的設計,從而減少停工時間,而保持其服務的高度可用性。JobManager扮演的角色是叢集管理者的角色,負責排程任務、協調Checkpoints、協調故障恢復、收集Job的狀態資訊,並管理Flink叢集的從節點TaskManager。
在預設情況下,每個叢集都只有一個JobManager例項,如果這個JobManager崩潰了,那麼將會導致作業執行失敗,並且無法提交新的任務。
因此,在生產環境中叢集該如何配置達到高可用的目的呢?針對不同模式進行部署的叢集,需要不同的配置。
原始碼分析
Flink中的JobManager、WebServer等元件都需要高可用保證,並且Flink還需要進行Checkpoint元資料的持久化操作。與Flink HA相關的類圖如下。
HihgAvailabilityMode中定義了三種高可用模式列舉,如下:
- NONE:非HA模式
- ZOOKEEPER:基於ZK實現HA
- FACTORY_CLASS:自定義HA工廠類,該類需要實現HighAvailabilityServiceFactory介面
具體的高可用例項物件建立則在HighAvailabilityServicesUtils類中有體現。
建立HighAvailabilityServices的例項方法如下:
public static HighAvailabilityServices createAvailableOrEmbeddedServices( Configuration config, Executor executor) throws Exception { HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); switch (highAvailabilityMode) { case NONE: return new EmbeddedHaServices(executor); case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(config), executor, config, blobStoreService); case FACTORY_CLASS: return createCustomHAServices(config, executor); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); } }
HighAvailabilityServices介面中定義了HA服務類應當實現的方法,實現類主要有StandaloneHaServices(非HA)、ZookeeperHaServices、YarnHighAvailabilityServices。
ZookeeperHaServices主要提供了建立LeaderRetrivalService和LeaderElectionService等方法,並給出各個服務元件使用的ZK節點名稱。
ZookeeperLeaderElectionService實現了LeaderElectionService中leader選舉和獲取leader的方法。
public interface LeaderElectionService {
/**
* Starts the leader election service. This method can only be called once.
*
* @param contender LeaderContender which applies for the leadership
* @throws Exception
*/
void start(LeaderContender contender) throws Exception;
/**
* Stops the leader election service.
* @throws Exception
*/
void stop() throws Exception;
/**
* Confirms that the {@link LeaderContender} has accepted the leadership identified by the
* given leader session id. It also publishes the leader address under which the leader is
* reachable.
*
* <p>The rational behind this method is to establish an order between setting the new leader
* session ID in the {@link LeaderContender} and publishing the new leader session ID as well
* as the leader address to the leader retrieval services.
*
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*/
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
/**
* Returns true if the {@link LeaderContender} with which the service has been started owns
* currently the leadership under the given leader session id.
*
* @param leaderSessionId identifying the current leader
*
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
Standalone叢集高可用配置
簡介
如果叢集是Standalone模式,那麼此時需要對JobManager做主備,一般推薦一個主JobManager和多個備用的JobManagers。當主JobManager傳送故障時,備用的JobManager會接管叢集,以保證任務政策執行。需要注意:主和備JobManager只是我們認為的區分,實際上並沒有區別,每個JobManager都可以當作主或者備。
Standalone模式下的HA配置,Flink依賴Zookeeper實現。Zookeeper叢集獨立於Flink叢集之外,主要被用來進行Leader選舉和輕量級狀態的一致性儲存。
檔案配置
Flink自帶一個簡單的Zookeeper叢集,並且提供了一鍵啟動的腳步。在實際生產環境中建議自己搭建Zookeeper叢集,方便進行配置管理。
假設3臺虛擬機器之間搭建Standalone叢集,並且進行高可用配置:
IP | hostname | 備註 |
---|---|---|
192.168.2.100 | master | 主節點:ZK01 |
192.168.2.101 | slave01 | 從節點:ZK02 |
192.168.2.102 | slave02 | 從節點:ZK03 |
在3臺機器上同時修改Flink配置檔案的master檔案:
master:8081
slave01:8081
slave02:8081
表示指定Zookeeper叢集的訪問地址
然後,修改conf/flink-conf.yaml檔案,與高可用的配置相關的幾個參賽,如下所示:
#========================================================
# High Availability
#=====================================================================
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one
high-availability.storageDir: hdfs:///flink/recovery
- high-availability:高可用模式設定為zookeeper,用來開啟高可用模式;
- high-availability.zookeeper.quorum:指定一組zookeeper伺服器,提供分散式協調服務,Flink可以在指定的地址和埠訪問zookeeper;
- high-availability.zookeeper.path.root:指定zookeeper的根結點,並且在該節點下放置所有叢集節點
- high-availability.cluster-id:為每個叢集指定一個ID,用來儲存該叢集的相關資料
- high-availability.storageDir:高可用儲存目錄,JobManager的元資料儲存在檔案系統StorageDir中,一般都是HDFS的地址
對於flink-conf.yaml檔案中的配置,除了jobManager.rpc.address和jobManager.web.address 都各自配置自己機器的IP之外,其他的關於高可用的配置一模一樣。high-availability、high-availability.zookeeper.quorum、high-availability.storageDir這個三個配置是必須配置的,high-availability.zookeeper.path.root、high-availability.cluster-id這兩個配置是可選的,建議手動配置。
Yarn叢集高可用配置
與Standalone叢集不同的是,Flink on Yarn的高可用配置只需要一個JobManager。當JobManager傳送失效時,Yarn負責將其重新啟動。
修改yarn-site.xml 檔案中的配置,如下所示:
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
yarn.resourcemanager.am.max-attempts表示Yarn的application master的最大重試次數。除了上述HA配置之外,還需要配置flink-conf.yaml 中的最大重試次數:
yarn.application-attempts:10
預設情況下,該配置的為2
從Flink官網中可以查到,當yarn.application-attempts設定為10的時候,如果程式啟動失敗,YARN會再重試9次(9次重試+1次啟動),如果YARN啟動10次作業還失敗,則YARN才會將該任務的狀態職位失敗。如果發生程序搶佔,節點硬體故障或重啟,NodeManager重新同步等,Yarn會繼續嘗試啟動應用。這些重啟不計入yarn.application-attempts次數中。
不同Yarn版本的容器關閉行為不同,需要我們特別注意。
- YARN 2.3.0 < YARN版本 < YARN 2.4.0 :如果application master程序失敗,則所有container都會重啟
- YARN 2.4.0 < YARN版本 < YARN 2.6.0 :TaskManager container在application master故障期間,會繼續工作,這樣的優點是啟動時間更快,且縮短了所有task manager啟動時申請資源的時間
- YARN 2.6.0 <= YARN版本 :失敗重試的間隔會被設定為Akka的超時時間,在一次時間間隔內達到最大失敗重試次數才會被置為失敗。
如果Zookeeper叢集使用kerberos安全模式執行,需要新增配置:
zookeeper.sasl.service-name
zookeeper.sasl.login-context-name
如果你不想搭建自己的 ZooKeeper 叢集或者簡單地進行本地測試,你可以使用 Flink 自帶的 ZooKeeper 叢集,但是並不推薦,我們建議讀者搭建自己的 ZooKeeper 叢集。