1. 程式人生 > >Flink HDFS Sink 如何保證 exactly-once 語義

Flink HDFS Sink 如何保證 exactly-once 語義

本文將從原始碼層面來分析在Flink中是如何保證sink資料到HDFS的exactly-once語義的。

Flink中sink資料到HDFS是由BucketingSink來完成。BucketingSink產生的HDFS檔案路徑格式如下,

/{base/path}/{bucket/path}/{part_prefix}-{parallel_task_index}-{count}{part_suffix}

其中,

  • {base/path},構造BucketingSink時指定的base路徑;
  • {bucket/path},分桶路徑,BucketingSink可以對資料進行分桶(也可以理解為分割槽),可以根據系統時間進行分桶,也可以根據資料進行分桶,通過實現
    Bucketer
    可以自定義分桶規則;
  • {part_prefix}{part_suffix},可自定義的字串,{part_prefix}預設為part{part_suffix}預設為空;
  • {parallel_task_index},因為存在並行的sink task,因此寫出去的檔案需要用task index來區分;
  • {count},一個sink task在一個分桶下不可能只允許寫入一個檔案,BucketingSink會對檔案進行滾動操作,有兩種rolling策略,檔案大小以及時間(與日誌檔案rolling策略類似)。通過BucketingSink#setBatchSizeBucketingSink#setBatchRolloverInterval
    可以分別設定該大小及時間。預設大小為384MB,時間為Long.MAX_VALUE{count}從0開始,根據檔案rolling遞增;

為了實現exactly-once語義BucketingSink產生的檔案有3種不同的狀態:

  • in-progress,分桶下正在被寫入的檔案,一個分桶只會有一個。檔名格式為{in_progress_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{in_progress_suffix}
  • pendingin-progress狀態的檔案關閉後進入pending狀態,檔案重新命名,等待Checkpoint。檔名格式為{pending_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{pending_suffix}
  • finished,Checkpoint成功之後,pending狀態的檔案即可置為finished狀態,檔案重新命名,該狀態的檔案即為最終產生的可用檔案,檔名格式之前已經描述過了;

另外,in-progress狀態檔案關閉,進入pending狀態,由兩種機制觸發,一個是上文已經提到過的rolling策略,另一個則是分桶inactive觸發的檔案關閉。BucketingSink會定期掃描所有分桶,當某個分桶超過一定時間沒有寫入,則會關閉該分桶下in-progress狀態的檔案,進入pending狀態。通過BucketingSink#setInactiveBucketCheckIntervalBucketingSink#setInactiveBucketThreshold即可分別設定掃描週期以及分桶inactive時間閾值,預設都是60秒。

接下來開始進入原始碼分析,主要是這幾個方法:

  • initializeState
  • open
  • invoke
  • snapshotState
  • notifyCheckpointComplete

先說明下為什麼是這幾個方法。每個Flink程式都會被轉換成JobGraph,在執行時最終部署成一個個的StreamTask,每個StreamTask執行自己負責的OperatorChain。而所有的SinkFunction都是由StreamSink這個operator來執行。

/**
 * A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}.
 */
@Internal
public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>> 
		implements OneInputStreamOperator<IN, Object> { ... }

StreamTask的執行是通過其invoke方法。invoke方法做的事情大致如下,

 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()

其中,initialize operator states,open operators以及run是這裡主要關注的三個操作,對應的程式碼塊如下,

			synchronized (lock) {

				// both the following operations are protected by the lock
				// so that we avoid race conditions in the case that initializeState()
				// registers a timer, that fires before the open() is called.

				initializeState();
				openAllOperators();
			}

			// final check to exit early before starting to run
			if (canceled) {
				throw new CancelTaskException();
			}

			// let the task do its work
			isRunning = true;
			run();

initializeState方法會呼叫到BucketingSink#initializeStateopenAllOperators方法會呼叫到BucketingSink#openrun方法會呼叫到BucketingSink#invoke

另外兩個方法,snapshotState以及notifyCheckpointComplete是跟Checkpoint相關的,StreamTask執行Checkpoint時,會呼叫operator的snapshotState方法,最終會呼叫到BucketingSink#snapshotState。整個Job Checkpoint成功後會傳送通知BucketingSink#notifyCheckpointComplete會被呼叫。

下面就來看下這些方法的具體實現。

initializeState

initializeState主要做了兩件事:

  1. 呼叫initFileSystem初始化Hadoop的FileSystem
  2. 呼叫handleRestoredBucketState從Checkpoint/Savepoint中恢復狀態資訊;

我們往下先看看其他方法再回過頭來看狀態恢復的實現,即handleRestoredBucketState方法。

open

open方法也比較簡單,主要是利用ProcessingTimeService註冊定時器,定時檢查上文提到的inactive的分桶。

invoke

invoke方法主要做了四件事,如下注釋,

	@Override
	public void invoke(T value) throws Exception {
	    /******* 第一步,使用Bucketer獲取當前資料所屬的分桶 *******/
		Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);

		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();

	    /******* 第二步,獲取分桶狀態,若分桶不存在,則生成分桶狀態資訊 *******/
		BucketState<T> bucketState = state.getBucketState(bucketPath);
		if (bucketState == null) {
			bucketState = new BucketState<>(currentProcessingTime);
			state.addBucketState(bucketPath, bucketState);
		}

	    /******* 第三步,對分桶當前正在寫入的檔案執行rolling策略 *******/
		if (shouldRoll(bucketState, currentProcessingTime)) {
			openNewPartFile(bucketPath, bucketState);
		}

	    /******* 第四步,資料寫入檔案 *******/
		bucketState.writer.write(value);
		bucketState.lastWrittenToTime = currentProcessingTime;
	}
  • currentFile,該分桶當前正在被寫入,即in-progress狀態的檔案;
  • currentFileValidLength,該檔案的有效長度;
  • creationTime,該檔案的建立時間;
  • lastWrittenToTime,該分桶最後一次寫入的時間;
  • partCounter,上文提到的檔名稱格式中的{count}值;
  • pendingFiles,該分桶下處於pending狀態的檔案;
  • pendingFilesPerCheckpoint,等待Checkpoint成功通知的檔案;

來看下檔案新建跟寫入的實現。由openNewPartFile新建檔案,該方法主要做兩件事:

  1. 呼叫closeCurrentPartFile方法,如果當前分桶有處於in-progress狀態的檔案,則呼叫Writer#close方法關閉該檔案,並且將該檔案重新命名,置為pending狀態,並修改分桶狀態的pendingFiles資訊;
  2. 構造新的in-progress狀態的檔名,呼叫Writer#open開啟檔案,修改分桶狀態的currentFilecreationTime等資訊;

檔案的寫入則是呼叫Writer#write來完成。可以看到這裡分別使用了Writeropenwriteclose方法來實現檔案的建立,寫入以及關閉。Writer的預設實現為StringWriter,下面來看下該實現。

StringWriter

  • open,呼叫由BucketingSink初始化的FileSystemcreate方法來建立檔案,得到FSDataOutputStream
  • write,呼叫FSDataOutputStream#write方法寫入資料;
  • close,首先根據配置(syncOnFlush)呼叫FSDataOutputStreamhsync或者hflush來flush資料(二者區別請參考API文件),然後呼叫FSDataOutputStream#close方法關閉檔案;

snapshotState

snapshotState主要是以下三步,

  1. 對所有分桶呼叫Writer#flush將快取的資料寫出去,並記錄檔案長度,更新currentFileValidLength資訊;
  2. 修改所有分桶的pendingFilesPerCheckpoint資訊,記錄本次Checkpoint該分桶對應的pendingFiles,並將pendingFiles置空;

notifyCheckpointComplete

notifyCheckpointComplete主要做的事情就是根據snapshotState中記錄的pendingFilesPerCheckpoint資訊,將所有等待Checkpoint成功的pending狀態的檔案重新命名,置為最終的,也是可用的finished狀態。

handleRestoredBucketState

現在可以回過頭來看下,狀態恢復是如何實現,從而保證了exactly-once語義的。狀態恢復實現,上文我們已經提到,是在handleRestoredBucketState方法。

	private void handleRestoredBucketState(State<T> restoredState) {
		Preconditions.checkNotNull(restoredState);

		for (BucketState<T> bucketState : restoredState.bucketStates.values()) {

			/******* Checkpoint成功時`pendingFiles`應該是空的 *******/
			// we can clean all the pending files since they were renamed to
			// final files after this checkpoint was successful
			// (we re-start from the last **successful** checkpoint)
			bucketState.pendingFiles.clear();

			/******* 處理上一次Checkpoint成功時處於`in-progress`狀態的檔案 *******/
			handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);

			// Now that we've restored the bucket to a valid state, reset the current file info
			bucketState.currentFile = null;
			bucketState.currentFileValidLength = -1;
			bucketState.isWriterOpen = false;

			/******* 處理分桶狀態的`pendingFilesPerCheckpoint`資訊 *******/
			handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);

			bucketState.pendingFilesPerCheckpoint.clear();
		}
	}

可以看到主要是根據Checkpoint成功後持久化下來的分桶狀態資訊進行處理,in-progress狀態的檔案以及pendingFilesPerCheckpoint資訊。下面來看下對應的handlePendingInProgressFile以及handlePendingFilesForPreviousCheckpoints這兩個方法。

handlePendingInProgressFile

處於in-progress狀態的檔案,在該次Checkpoint成功之後,故障發生之前(這裡假設狀態恢復是由於發生故障引起),有以下幾種可能,

  • 仍然處於in-progress狀態,沒有繼續寫入,檔案有效長度仍然是Checkpoint時由snapshotState方法記錄下的currentFileValidLength
  • 仍然處於in-progress狀態,發生了寫入,檔案有效長度大於記錄下的currentFileValidLength
  • 已經關閉並處於pending狀態;
  • 已經處於finished狀態;

無論處於以上哪種狀態,現在需要做的,就是將檔案的有效長度恢復到Checkpoint成功時記錄的currentFileValidLength。因為恢復後的in-progress狀態的檔案不再繼續寫入(產生新檔案來寫入後續資料),因此先將該檔案重新命名置為finished狀態(已經是finished狀態則省略這一步),然後有兩種做法,

  1. 如果檔案系統支援truncate,則直接將檔案truncate到currentFileValidLength這個有效長度即可;
  2. 不支援truncate的話,則新建一個{valid_length_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{valid_length_suffix}檔案,將currentFileValidLength寫入該檔案,後續在讀取資料檔案時需要先讀取這個記錄了該檔案有效長度的檔案以確保資料的有效性,否則可能讀取到重複的資料,這樣就不能保證exactly-once語義了;

handlePendingFilesForPreviousCheckpoints

這個方法主要是處理分桶狀態的pendingFilesPerCheckpoint資訊,當Checkpoint成功(snapshotState方法)持久化下來的pendingFilesPerCheckpoint,儲存的是等待Checkpoint成功通知的處於pending狀態的檔案,這些檔案在Checkpoint成功之後,故障發生之前,有下面兩種可能:

  1. Checkpoint成功,但是Checkpoint成功通知之前發生了故障。那麼此時這些檔案應該仍處於pending狀態,因為從pengding轉為finished狀態是在Checkpoint成功通知到才會發生(也是就成功執行了notifyCheckpointComplete方法)。針對這種情況,現在需要將這些檔案置為finished狀態;
  2. Checkpoint成功,也成功通知到之後才發生了故障。此時這些檔案已經處於finished狀態,無需額外操作;

這裡需要額外說明,Checkpoint是否成功,在傳送通知,即呼叫notifyCheckpointComplete之前就已經確定了。notifyCheckpointComplete即使發生了異常也不會導致Checkpoint失敗,參考CheckpointListener的API文件,

This method is called as a notification once a distributed checkpoint has been completed. Note that any exception during this method will not cause the checkpoint to fail any more.

exactly-once語義

經過原始碼層面的分析可以看到,對exactly-once語義的保證,是通過引入中間狀態(in-progresspending)和最終可用狀態(finished)來實現的,是一種兩階段提交(2PC)的方案。當故障發生時,對處於中間狀態的資料進行回滾或者提交(initializeState方法)以保證資料的有效性。資料只有流轉(Checkpoint成功)到最終狀態才是可用的。

結語

本文通過對原始碼的分析來了解BucketingSink對exactly-once語義的保證。另外,這裡記錄下一個潛在的問題,就是資料寫入的效能問題,當前的設計,每個分桶下只有一個正在寫入,即in-progress狀態的檔案,並且是在invoke方法同步寫入,在資料量大的情況下,資料寫入的效能一定是不容樂觀的。後面可能會有這方面相關的優化,例如非同步化,stay tuned _