Flink HDFS Sink 如何保證 exactly-once 語義
本文將從原始碼層面來分析在Flink中是如何保證sink資料到HDFS的exactly-once語義的。
Flink中sink資料到HDFS是由BucketingSink
來完成。BucketingSink
產生的HDFS檔案路徑格式如下,
/{base/path}/{bucket/path}/{part_prefix}-{parallel_task_index}-{count}{part_suffix}
其中,
{base/path}
,構造BucketingSink
時指定的base路徑;{bucket/path}
,分桶路徑,BucketingSink
可以對資料進行分桶(也可以理解為分割槽),可以根據系統時間進行分桶,也可以根據資料進行分桶,通過實現Bucketer
可以自定義分桶規則;{part_prefix}
與{part_suffix}
,可自定義的字串,{part_prefix}
預設為part
,{part_suffix}
預設為空;{parallel_task_index}
,因為存在並行的sink task,因此寫出去的檔案需要用task index來區分;{count}
,一個sink task在一個分桶下不可能只允許寫入一個檔案,BucketingSink
會對檔案進行滾動操作,有兩種rolling策略,檔案大小以及時間(與日誌檔案rolling策略類似)。通過BucketingSink#setBatchSize
及BucketingSink#setBatchRolloverInterval
384MB
,時間為Long.MAX_VALUE
。{count}
從0開始,根據檔案rolling遞增;
為了實現exactly-once語義,BucketingSink
產生的檔案有3種不同的狀態:
- in-progress,分桶下正在被寫入的檔案,一個分桶只會有一個。檔名格式為
{in_progress_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{in_progress_suffix}
; - pending,
in-progress
狀態的檔案關閉後進入pending
狀態,檔案重新命名,等待Checkpoint。檔名格式為{pending_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{pending_suffix}
- finished,Checkpoint成功之後,
pending
狀態的檔案即可置為finished
狀態,檔案重新命名,該狀態的檔案即為最終產生的可用檔案,檔名格式之前已經描述過了;
另外,in-progress
狀態檔案關閉,進入pending
狀態,由兩種機制觸發,一個是上文已經提到過的rolling策略,另一個則是分桶inactive觸發的檔案關閉。BucketingSink
會定期掃描所有分桶,當某個分桶超過一定時間沒有寫入,則會關閉該分桶下in-progress
狀態的檔案,進入pending
狀態。通過BucketingSink#setInactiveBucketCheckInterval
及BucketingSink#setInactiveBucketThreshold
即可分別設定掃描週期以及分桶inactive時間閾值,預設都是60秒。
接下來開始進入原始碼分析,主要是這幾個方法:
initializeState
open
invoke
snapshotState
notifyCheckpointComplete
先說明下為什麼是這幾個方法。每個Flink程式都會被轉換成JobGraph
,在執行時最終部署成一個個的StreamTask
,每個StreamTask
執行自己負責的OperatorChain
。而所有的SinkFunction
都是由StreamSink
這個operator來執行。
/**
* A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}.
*/
@Internal
public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
implements OneInputStreamOperator<IN, Object> { ... }
StreamTask
的執行是通過其invoke
方法。invoke
方法做的事情大致如下,
* -- invoke()
* |
* +----> Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup()
* +----> task specific init()
* +----> initialize-operator-states()
* +----> open-operators()
* +----> run()
* +----> close-operators()
* +----> dispose-operators()
* +----> common cleanup
* +----> task specific cleanup()
其中,initialize operator states,open operators以及run是這裡主要關注的三個操作,對應的程式碼塊如下,
synchronized (lock) {
// both the following operations are protected by the lock
// so that we avoid race conditions in the case that initializeState()
// registers a timer, that fires before the open() is called.
initializeState();
openAllOperators();
}
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
isRunning = true;
run();
initializeState
方法會呼叫到BucketingSink#initializeState
;openAllOperators
方法會呼叫到BucketingSink#open
;run
方法會呼叫到BucketingSink#invoke
。
另外兩個方法,snapshotState
以及notifyCheckpointComplete
是跟Checkpoint相關的,StreamTask
在執行Checkpoint時,會呼叫operator的snapshotState
方法,最終會呼叫到BucketingSink#snapshotState
。整個Job Checkpoint成功後會傳送通知,BucketingSink#notifyCheckpointComplete
會被呼叫。
下面就來看下這些方法的具體實現。
initializeState
initializeState
主要做了兩件事:
- 呼叫
initFileSystem
初始化Hadoop的FileSystem
; - 呼叫
handleRestoredBucketState
從Checkpoint/Savepoint中恢復狀態資訊;
我們往下先看看其他方法再回過頭來看狀態恢復的實現,即handleRestoredBucketState
方法。
open
open
方法也比較簡單,主要是利用ProcessingTimeService
註冊定時器,定時檢查上文提到的inactive的分桶。
invoke
invoke
方法主要做了四件事,如下注釋,
@Override
public void invoke(T value) throws Exception {
/******* 第一步,使用Bucketer獲取當前資料所屬的分桶 *******/
Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
/******* 第二步,獲取分桶狀態,若分桶不存在,則生成分桶狀態資訊 *******/
BucketState<T> bucketState = state.getBucketState(bucketPath);
if (bucketState == null) {
bucketState = new BucketState<>(currentProcessingTime);
state.addBucketState(bucketPath, bucketState);
}
/******* 第三步,對分桶當前正在寫入的檔案執行rolling策略 *******/
if (shouldRoll(bucketState, currentProcessingTime)) {
openNewPartFile(bucketPath, bucketState);
}
/******* 第四步,資料寫入檔案 *******/
bucketState.writer.write(value);
bucketState.lastWrittenToTime = currentProcessingTime;
}
currentFile
,該分桶當前正在被寫入,即in-progress
狀態的檔案;currentFileValidLength
,該檔案的有效長度;creationTime
,該檔案的建立時間;lastWrittenToTime
,該分桶最後一次寫入的時間;partCounter
,上文提到的檔名稱格式中的{count}
值;pendingFiles
,該分桶下處於pending
狀態的檔案;pendingFilesPerCheckpoint
,等待Checkpoint成功通知的檔案;
來看下檔案新建跟寫入的實現。由openNewPartFile
新建檔案,該方法主要做兩件事:
- 呼叫
closeCurrentPartFile
方法,如果當前分桶有處於in-progress
狀態的檔案,則呼叫Writer#close
方法關閉該檔案,並且將該檔案重新命名,置為pending
狀態,並修改分桶狀態的pendingFiles
資訊; - 構造新的
in-progress
狀態的檔名,呼叫Writer#open
開啟檔案,修改分桶狀態的currentFile
,creationTime
等資訊;
檔案的寫入則是呼叫Writer#write
來完成。可以看到這裡分別使用了Writer
的open
,write
,close
方法來實現檔案的建立,寫入以及關閉。Writer
的預設實現為StringWriter
,下面來看下該實現。
StringWriter
open
,呼叫由BucketingSink
初始化的FileSystem
的create
方法來建立檔案,得到FSDataOutputStream
;write
,呼叫FSDataOutputStream#write
方法寫入資料;close
,首先根據配置(syncOnFlush
)呼叫FSDataOutputStream
的hsync
或者hflush
來flush資料(二者區別請參考API文件),然後呼叫FSDataOutputStream#close
方法關閉檔案;
snapshotState
snapshotState
主要是以下三步,
- 對所有分桶呼叫
Writer#flush
將快取的資料寫出去,並記錄檔案長度,更新currentFileValidLength
資訊; - 修改所有分桶的
pendingFilesPerCheckpoint
資訊,記錄本次Checkpoint該分桶對應的pendingFiles
,並將pendingFiles
置空;
notifyCheckpointComplete
notifyCheckpointComplete
主要做的事情就是根據snapshotState
中記錄的pendingFilesPerCheckpoint
資訊,將所有等待Checkpoint成功的pending
狀態的檔案重新命名,置為最終的,也是可用的finished
狀態。
handleRestoredBucketState
現在可以回過頭來看下,狀態恢復是如何實現,從而保證了exactly-once語義的。狀態恢復實現,上文我們已經提到,是在handleRestoredBucketState
方法。
private void handleRestoredBucketState(State<T> restoredState) {
Preconditions.checkNotNull(restoredState);
for (BucketState<T> bucketState : restoredState.bucketStates.values()) {
/******* Checkpoint成功時`pendingFiles`應該是空的 *******/
// we can clean all the pending files since they were renamed to
// final files after this checkpoint was successful
// (we re-start from the last **successful** checkpoint)
bucketState.pendingFiles.clear();
/******* 處理上一次Checkpoint成功時處於`in-progress`狀態的檔案 *******/
handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
// Now that we've restored the bucket to a valid state, reset the current file info
bucketState.currentFile = null;
bucketState.currentFileValidLength = -1;
bucketState.isWriterOpen = false;
/******* 處理分桶狀態的`pendingFilesPerCheckpoint`資訊 *******/
handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
bucketState.pendingFilesPerCheckpoint.clear();
}
}
可以看到主要是根據Checkpoint成功後持久化下來的分桶狀態資訊進行處理,in-progress
狀態的檔案以及pendingFilesPerCheckpoint
資訊。下面來看下對應的handlePendingInProgressFile
以及handlePendingFilesForPreviousCheckpoints
這兩個方法。
handlePendingInProgressFile
處於in-progress
狀態的檔案,在該次Checkpoint成功之後,故障發生之前(這裡假設狀態恢復是由於發生故障引起),有以下幾種可能,
- 仍然處於
in-progress
狀態,沒有繼續寫入,檔案有效長度仍然是Checkpoint時由snapshotState
方法記錄下的currentFileValidLength
; - 仍然處於
in-progress
狀態,發生了寫入,檔案有效長度大於記錄下的currentFileValidLength
; - 已經關閉並處於
pending
狀態; - 已經處於
finished
狀態;
無論處於以上哪種狀態,現在需要做的,就是將檔案的有效長度恢復到Checkpoint成功時記錄的currentFileValidLength
。因為恢復後的in-progress
狀態的檔案不再繼續寫入(產生新檔案來寫入後續資料),因此先將該檔案重新命名置為finished
狀態(已經是finished
狀態則省略這一步),然後有兩種做法,
- 如果檔案系統支援
truncate
,則直接將檔案truncate到currentFileValidLength
這個有效長度即可; - 不支援truncate的話,則新建一個
{valid_length_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{valid_length_suffix}
檔案,將currentFileValidLength
寫入該檔案,後續在讀取資料檔案時需要先讀取這個記錄了該檔案有效長度的檔案以確保資料的有效性,否則可能讀取到重複的資料,這樣就不能保證exactly-once語義了;
handlePendingFilesForPreviousCheckpoints
這個方法主要是處理分桶狀態的pendingFilesPerCheckpoint
資訊,當Checkpoint成功(snapshotState
方法)持久化下來的pendingFilesPerCheckpoint
,儲存的是等待Checkpoint成功通知的處於pending
狀態的檔案,這些檔案在Checkpoint成功之後,故障發生之前,有下面兩種可能:
- Checkpoint成功,但是Checkpoint成功通知之前發生了故障。那麼此時這些檔案應該仍處於
pending
狀態,因為從pengding
轉為finished
狀態是在Checkpoint成功通知到才會發生(也是就成功執行了notifyCheckpointComplete
方法)。針對這種情況,現在需要將這些檔案置為finished
狀態; - Checkpoint成功,也成功通知到之後才發生了故障。此時這些檔案已經處於
finished
狀態,無需額外操作;
這裡需要額外說明,Checkpoint是否成功,在傳送通知,即呼叫notifyCheckpointComplete
之前就已經確定了。notifyCheckpointComplete
即使發生了異常也不會導致Checkpoint失敗,參考CheckpointListener
的API文件,
This method is called as a notification once a distributed checkpoint has been completed. Note that any exception during this method will not cause the checkpoint to fail any more.
exactly-once語義
經過原始碼層面的分析可以看到,對exactly-once語義的保證,是通過引入中間狀態(in-progress
及pending
)和最終可用狀態(finished
)來實現的,是一種兩階段提交(2PC)的方案。當故障發生時,對處於中間狀態的資料進行回滾或者提交(initializeState
方法)以保證資料的有效性。資料只有流轉(Checkpoint成功)到最終狀態才是可用的。
結語
本文通過對原始碼的分析來了解BucketingSink
對exactly-once語義的保證。另外,這裡記錄下一個潛在的問題,就是資料寫入的效能問題,當前的設計,每個分桶下只有一個正在寫入,即in-progress
狀態的檔案,並且是在invoke
方法同步寫入,在資料量大的情況下,資料寫入的效能一定是不容樂觀的。後面可能會有這方面相關的優化,例如非同步化,stay tuned _