1. 程式人生 > 實用技巧 >如何實現生產環境中的Flink高可用

如何實現生產環境中的Flink高可用

如何實現生產環境中的Flink高可用

叢集高可用(High Availability,HA)配置是大資料領域經典的一個問題。通常HA用來描述一個系統經過專門的設計,從而減少停工時間,而保持其服務的高度可用性。JobManager扮演的角色是叢集管理者的角色,負責排程任務、協調Checkpoints、協調故障恢復、收集Job的狀態資訊,並管理Flink叢集的從節點TaskManager。
在預設情況下,每個叢集都只有一個JobManager例項,如果這個JobManager崩潰了,那麼將會導致作業執行失敗,並且無法提交新的任務。
因此,在生產環境中叢集該如何配置達到高可用的目的呢?針對不同模式進行部署的叢集,需要不同的配置。

原始碼分析

Flink中的JobManager、WebServer等元件都需要高可用保證,並且Flink還需要進行Checkpoint元資料的持久化操作。與Flink HA相關的類圖如下。

HihgAvailabilityMode中定義了三種高可用模式列舉,如下:

  • NONE:非HA模式
  • ZOOKEEPER:基於ZK實現HA
  • FACTORY_CLASS:自定義HA工廠類,該類需要實現HighAvailabilityServiceFactory介面

具體的高可用例項物件建立則在HighAvailabilityServicesUtils類中有體現。

建立HighAvailabilityServices的例項方法如下:

public static HighAvailabilityServices createAvailableOrEmbeddedServices(
		Configuration config,
		Executor executor) throws Exception {
		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);

		switch (highAvailabilityMode) {
			case NONE:
				return new EmbeddedHaServices(executor);

			case ZOOKEEPER:
				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

				return new ZooKeeperHaServices(
					ZooKeeperUtils.startCuratorFramework(config),
					executor,
					config,
					blobStoreService);

			case FACTORY_CLASS:
				return createCustomHAServices(config, executor);

			default:
				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
		}
	}

HighAvailabilityServices介面中定義了HA服務類應當實現的方法,實現類主要有StandaloneHaServices(非HA)、ZookeeperHaServices、YarnHighAvailabilityServices。
ZookeeperHaServices主要提供了建立LeaderRetrivalService和LeaderElectionService等方法,並給出各個服務元件使用的ZK節點名稱。
ZookeeperLeaderElectionService實現了LeaderElectionService中leader選舉和獲取leader的方法。

public interface LeaderElectionService {

	/**
	 * Starts the leader election service. This method can only be called once.
	 *
	 * @param contender LeaderContender which applies for the leadership
	 * @throws Exception
	 */
	void start(LeaderContender contender) throws Exception;

	/**
	 * Stops the leader election service.
	 * @throws Exception
	 */
	void stop() throws Exception;

	/**
	 * Confirms that the {@link LeaderContender} has accepted the leadership identified by the
	 * given leader session id. It also publishes the leader address under which the leader is
	 * reachable.
	 *
	 * <p>The rational behind this method is to establish an order between setting the new leader
	 * session ID in the {@link LeaderContender} and publishing the new leader session ID as well
	 * as the leader address to the leader retrieval services.
	 *
	 * @param leaderSessionID The new leader session ID
	 * @param leaderAddress The address of the new leader
	 */
	void confirmLeadership(UUID leaderSessionID, String leaderAddress);

	/**
	 * Returns true if the {@link LeaderContender} with which the service has been started owns
	 * currently the leadership under the given leader session id.
	 *
	 * @param leaderSessionId identifying the current leader
	 *
	 * @return true if the associated {@link LeaderContender} is the leader, otherwise false
	 */
	boolean hasLeadership(@Nonnull UUID leaderSessionId);
}

Standalone叢集高可用配置

簡介

如果叢集是Standalone模式,那麼此時需要對JobManager做主備,一般推薦一個主JobManager和多個備用的JobManagers。當主JobManager傳送故障時,備用的JobManager會接管叢集,以保證任務政策執行。需要注意:主和備JobManager只是我們認為的區分,實際上並沒有區別,每個JobManager都可以當作主或者備。

Standalone模式下的HA配置,Flink依賴Zookeeper實現。Zookeeper叢集獨立於Flink叢集之外,主要被用來進行Leader選舉和輕量級狀態的一致性儲存。

檔案配置

Flink自帶一個簡單的Zookeeper叢集,並且提供了一鍵啟動的腳步。在實際生產環境中建議自己搭建Zookeeper叢集,方便進行配置管理。

假設3臺虛擬機器之間搭建Standalone叢集,並且進行高可用配置:

IP hostname 備註
192.168.2.100 master 主節點:ZK01
192.168.2.101 slave01 從節點:ZK02
192.168.2.102 slave02 從節點:ZK03

在3臺機器上同時修改Flink配置檔案的master檔案:

master:8081
slave01:8081
slave02:8081

表示指定Zookeeper叢集的訪問地址
然後,修改conf/flink-conf.yaml檔案,與高可用的配置相關的幾個參賽,如下所示:

#========================================================

# High Availability

#=====================================================================

high-availability: zookeeper

high-availability.zookeeper.quorum: localhost:2181

high-availability.zookeeper.path.root: /flink

high-availability.cluster-id: /cluster_one

high-availability.storageDir: hdfs:///flink/recovery
  • high-availability:高可用模式設定為zookeeper,用來開啟高可用模式;
  • high-availability.zookeeper.quorum:指定一組zookeeper伺服器,提供分散式協調服務,Flink可以在指定的地址和埠訪問zookeeper;
  • high-availability.zookeeper.path.root:指定zookeeper的根結點,並且在該節點下放置所有叢集節點
  • high-availability.cluster-id:為每個叢集指定一個ID,用來儲存該叢集的相關資料
  • high-availability.storageDir:高可用儲存目錄,JobManager的元資料儲存在檔案系統StorageDir中,一般都是HDFS的地址

對於flink-conf.yaml檔案中的配置,除了jobManager.rpc.address和jobManager.web.address 都各自配置自己機器的IP之外,其他的關於高可用的配置一模一樣。high-availability、high-availability.zookeeper.quorum、high-availability.storageDir這個三個配置是必須配置的,high-availability.zookeeper.path.root、high-availability.cluster-id這兩個配置是可選的,建議手動配置。

Yarn叢集高可用配置

與Standalone叢集不同的是,Flink on Yarn的高可用配置只需要一個JobManager。當JobManager傳送失效時,Yarn負責將其重新啟動。

修改yarn-site.xml 檔案中的配置,如下所示:

<property>

  <name>yarn.resourcemanager.am.max-attempts</name>

  <value>4</value>

  <description>

    The maximum number of application master execution attempts.

  </description>

</property>

yarn.resourcemanager.am.max-attempts表示Yarn的application master的最大重試次數。除了上述HA配置之外,還需要配置flink-conf.yaml 中的最大重試次數:

yarn.application-attempts:10

預設情況下,該配置的為2

從Flink官網中可以查到,當yarn.application-attempts設定為10的時候,如果程式啟動失敗,YARN會再重試9次(9次重試+1次啟動),如果YARN啟動10次作業還失敗,則YARN才會將該任務的狀態職位失敗。如果發生程序搶佔,節點硬體故障或重啟,NodeManager重新同步等,Yarn會繼續嘗試啟動應用。這些重啟不計入yarn.application-attempts次數中。

不同Yarn版本的容器關閉行為不同,需要我們特別注意。

  • YARN 2.3.0 < YARN版本 < YARN 2.4.0 :如果application master程序失敗,則所有container都會重啟
  • YARN 2.4.0 < YARN版本 < YARN 2.6.0 :TaskManager container在application master故障期間,會繼續工作,這樣的優點是啟動時間更快,且縮短了所有task manager啟動時申請資源的時間
  • YARN 2.6.0 <= YARN版本 :失敗重試的間隔會被設定為Akka的超時時間,在一次時間間隔內達到最大失敗重試次數才會被置為失敗。

如果Zookeeper叢集使用kerberos安全模式執行,需要新增配置:

zookeeper.sasl.service-name

zookeeper.sasl.login-context-name

如果你不想搭建自己的 ZooKeeper 叢集或者簡單地進行本地測試,你可以使用 Flink 自帶的 ZooKeeper 叢集,但是並不推薦,我們建議讀者搭建自己的 ZooKeeper 叢集。