解讀Flink中輕量級的非同步快照機制--Flink 1.2 原始碼
上一篇文章中,對於ABS演算法,其實現主要通過checkpoint的barrier的阻塞與釋放來實現。
本片重點關注ABS在Flink 1.2中原始碼的實現。
1、CheckpointBarrierHandler
此介面位於org.apache.flink.streaming.runtime.io中,管理從input channel獲取的barrier的資訊。它提供瞭如下幾種方法:
public interface CheckpointBarrierHandler {
BufferOrEvent getNextNonBlocked() throws Exception;
void registerCheckpointEventHandler(StatefulTask task);
void cleanup() throws IOException;
boolean isEmpty();
long getAlignmentDurationNanos();
}
其中關於barrier的阻塞與釋放,主要在getNextNonBlocked() 中實現。
根據CheckpointingMode的不同,Flink提供了2種不同的檢查點模式:
1、Exactly once
2、At least once
其中預設的模式是EXACTLY_ONCE。
對應這兩種不同的模式,Flink提供了2種不同的實現類:
1、BarrierBuffer類(對應於Exactly Once)
2、BarrierTracker類(對應於At Least Once)
由於論文中重點強調input channel的阻塞,即對於Exactly Once的實現,因此我們這裡也重點關注程式碼中BarrierBuffer類的實現。
2、BarrierBuffer類
我們先回顧一下上一篇論文中關於此演算法的偽碼:
其核心就是一個input channel收到barrier,立刻阻塞,然後判斷是否收到所有input channel的barrier,如果全部收到,則廣播出barrier,觸發此task的檢查點,並對阻塞的channel釋放鎖。
實際上,為了防止輸入流的背壓(back-pressuring),BarrierBuffer並不是真正的阻塞這個流,而是將此channel中,barrier之後資料通過一個BufferSpiller來buffer起來,當channel的鎖釋放後,再從buffer讀回這些資料,繼續處理。
下面我們看看這個類的具體實現:
public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
/** The gate that the buffer draws its input from */
private final InputGate inputGate; //一個task對應一個InputGate,代表input的資料集合(可能來自不同的input channel)
/** Flags that indicate whether a channel is currently blocked/buffered */
private final boolean[] blockedChannels; // 標記每個input channel是否被阻塞(或者叫被buffer)
/** The total number of channels that this buffer handles data from */
private final int totalNumberOfInputChannels; // input channel的數量,可通過InputGate獲得
/** To utility to write blocked data to a file channel */
private final BufferSpiller bufferSpiller; // 將被阻塞的input channel的資料寫到buffer
/** The pending blocked buffer/event sequences. Must be consumed before requesting
* further data from the input gate. */
private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered; // barrier到達時,此operator中在之前buffered的資料要消費掉
/** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */
private final long maxBufferedBytes; // 最多允許buffer的位元組數,-1代表無限制
/** The sequence of buffers/events that has been unblocked and must now be consumed
* before requesting further data from the input gate */
private BufferSpiller.SpilledBufferOrEventSequence currentBuffered; // 已經buffer的資料
/** Handler that receives the checkpoint notifications */
private StatefulTask toNotifyOnCheckpoint; // 通知檢查點進行
/** The ID of the checkpoint for which we expect barriers */
private long currentCheckpointId = -1L; // 當前檢查點ID
/** The number of received barriers (= number of blocked/buffered channels)
* IMPORTANT: A canceled checkpoint must always have 0 barriers */
private int numBarriersReceived; // 接收到的barrier的數量,這個值最終要等於buffered channel的數量。當一個檢查點被cancel時,此值為0
/** The number of already closed channels */
private int numClosedChannels; // 已經關閉的channel的數量
/** The number of bytes in the queued spilled sequences */
private long numQueuedBytes; // spill到佇列中的資料的位元組數
/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
private long startOfAlignmentTimestamp; // 上一次對齊開始時的時間戳
/** The time (in nanoseconds) that the latest alignment took */
private long latestAlignmentDurationNanos; // 最近一次對齊持續的時間
/** Flag to indicate whether we have drawn all available input */
private boolean endOfStream; // 標記是否流結束(所有的input已經收到barrier,標記檢查點完成)
/**
* Creates a new checkpoint stream aligner.
*
* <p>There is no limit to how much data may be buffered during an alignment.
*
* @param inputGate The input gate to draw the buffers and events from.
* @param ioManager The I/O manager that gives access to the temp directories.
*
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
*/
public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
this (inputGate, ioManager, -1);
}
/**
* Creates a new checkpoint stream aligner.
*
* <p>The aligner will allow only alignments that buffer up to the given number of bytes.
* When that number is exceeded, it will stop the alignment and notify the task that the
* checkpoint has been cancelled.
*
* @param inputGate The input gate to draw the buffers and events from.
* @param ioManager The I/O manager that gives access to the temp directories.
* @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
*
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
*/
public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException {
checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
this.inputGate = inputGate;
this.maxBufferedBytes = maxBufferedBytes;
this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
}
其構造方法中傳入InputGate引數,每個task都會對應有一個InputGate,目的是專門處理流入到此task中的所有的輸入資訊,這些輸入可能來自多個partition。
我們再看看BarrierBuffer中最重要的方法:getNextNonBlocked。
getNextNonBlocked
// ------------------------------------------------------------------------
// Buffer and barrier handling
// ------------------------------------------------------------------------
@Override
public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
BufferOrEvent next; // buffer代表資料,event代表事件,例如barrier就是個事件
if (currentBuffered == null) {
next = inputGate.getNextBufferOrEvent();// 如果已經buffer的資料為空,則直接從inputGate中獲取下一個BufferOrEvent
}
else {
next = currentBuffered.getNext(); // 否則,從currentBuffered的佇列中拿到下一個BufferOrEvent
if (next == null) { // 如果next為空,說明已經buffer的資料被處理完了
completeBufferedSequence(); // 清空currentBuffered,然後繼續處理queuedBuffered中的資料
return getNextNonBlocked(); // 遞迴呼叫,此時currentBuffered如果為null,則queuedBuffered也為null;否則如果currentBuffered不為null,說明還要繼續處理queuedBuffere中的資料
}
}
if (next != null) {
if (isBlocked(next.getChannelIndex())) { //如果這個channel還是被阻塞,則繼續把這條record新增到buffer中
// if the channel is blocked we, we just store the BufferOrEvent
bufferSpiller.add(next);
checkSizeLimit();
}
else if (next.isBuffer()) {//否則如果這個channel不再被阻塞,且下一條記錄是資料,則返回此資料
return next;
}
else if (next.getEvent().getClass() == CheckpointBarrier.class) { // 如果下一個是Barrier,且流沒有結束,則說明這個channel收到了barrier了
if (!endOfStream) {
// process barriers only if there is a chance of the checkpoint completing
processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); // 此時,進行processBarrier處理
}
}
else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { // 如果下一個是帶有cancel標記的barrier,則進行processCancellationBarrier處理
processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
}
else {
if (next.getEvent().getClass() == EndOfPartitionEvent.class) { // 如果此partition的資料全部消費完
processEndOfPartition(); // 增加numClosedChannels的值,且將此channel解鎖
}
return next;
}
}
else if (!endOfStream) { // 如果next為null且不是stream的終點,則置為終點,且釋放所有channel的鎖,重置初始值
// end of input stream. stream continues with the buffered data
endOfStream = true;
releaseBlocksAndResetBarriers();
return getNextNonBlocked();
}
else {
// final end of both input and buffered data
return null;
}
}
}
這個方法中,當收到barrier後,立刻進行processBarrier()的處理,這也是其核心所在。
processBarrier
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
if (totalNumberOfInputChannels == 1) { // 如果總共的channel數量只有1,此時說明這個operator只有一個input
if (barrierId > currentCheckpointId) { //如果這個barrierId大於當前的檢查點ID,則說明這個barrier是一個新的barrier
// new checkpoint
currentCheckpointId = barrierId;//將這個barrierId賦給當前的檢查點ID
notifyCheckpoint(receivedBarrier); //觸發檢查點
}
return;
}
// -- general code path for multiple input channels --
if (numBarriersReceived > 0) { //如果已經收到過barrier
// this is only true if some alignment is already progress and was not canceled
if (barrierId == currentCheckpointId) { // 判斷此barrierId與當前的檢查點ID是否一致
// regular case
onBarrier(channelIndex); // 如果一直,則阻塞此channel
}
else if (barrierId > currentCheckpointId) { // 如果barrierId大於當前的檢查點ID,則說明當前的檢查點過期了,跳過當前的檢查點
// we did not complete the current checkpoint, another started before
LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.", barrierId, currentCheckpointId);
// let the task know we are not completing this
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));// 通知task終止當前的檢查點
// abort the current checkpoint
releaseBlocksAndResetBarriers();// 釋放所有channel的鎖
// begin a the new checkpoint
beginNewAlignment(barrierId, channelIndex);// 根據barrierId,開始新的檢查點
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
return;
}
}
else if (barrierId > currentCheckpointId) { // 如果第一次收到的barrierID大於當前的檢查點ID,說明是一個新的barrier
// first barrier of a new checkpoint
beginNewAlignment(barrierId, channelIndex);// 根據barrierId,開始新的檢查點
}
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
return;
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { //如果收到所有channel的barrier,說明走到了
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers, triggering checkpoint {} at {}",
receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
releaseBlocksAndResetBarriers(); // 釋放所有channel的鎖
notifyCheckpoint(receivedBarrier);// 觸發檢查點
}
}
Flink 1.2中有個變化就是判斷當前的operator是否只有一個input channel且收到了最新的barrier,如果是,則開通一個綠色通道,直接進行檢查點:notifyCheckpoint。
否則如果有多個input channel(totalNumberOfInputChannels是通過InputGate獲得),則只有當收到所有input channel的最新的barrier後,才開始進行檢查點:notifyCheckpoint,否則就要先阻塞該input channel,實際上是buffer起來後續的資料。
notifyCheckpoint
private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
if (toNotifyOnCheckpoint != null) {
CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L;
checkpointMetaData
.setBytesBufferedInAlignment(bytesBuffered)
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
}
}
toNotifyOnCheckpoint是個StatefulTask介面,管理每個task接收檢查點的通知,其triggerCheckpoint方法是真正的實現。
3、Flink 1.2中webUI對checkpoint的改進
webUI中對checkpoint的部分增加了很多的元資料資訊,包括檢查點的詳細資訊:
包括每個checkpoint中state的大小,檢查點的狀態,完成的時間以及持續的時間。並且對每一個檢查點,可以額看到每一個subtask的詳細資訊。這點對於檢查點的管理、監控以及對state的調整都起到了積極的作用。
4、總結
ABS在Flink中預設是Exactly Once,需要對齊,對齊的演算法就是阻塞+解除。阻塞和解除阻塞都有各自的判斷依據。
相關推薦
解讀Flink中輕量級的非同步快照機制--Flink 1.2 原始碼
上一篇文章中,對於ABS演算法,其實現主要通過checkpoint的barrier的阻塞與釋放來實現。 本片重點關注ABS在Flink 1.2中原始碼的實現。 1、CheckpointBarrierHandler 此介面位於org.apache.fli
Flink中Periodic水印和Punctuated水印實現原理(原始碼分析)
在使用者程式碼中,我們設定生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的過載 我們傳入的物件分為兩種 AssignerWithPunctuatedWatermarks(可以理解為每條資料都會產生水印,如果不想產生水印,返回一個null的水印) Assig
Atitit 持久化 Persistence概念的藝術 目錄 1. 持久化是將程式資料在持久狀態和瞬時狀態間轉換的機制。 1 2. DBC就是一種持久化機制。檔案IO也是一種持久化機制。 2 3.
Atitit 持久化 Persistence概念的藝術 目錄 1. 持久化是將程式資料在持久狀態和瞬時狀態間轉換的機制。 1 2. DBC就是一種持久化機制。檔案IO也是一種持久化機制。 2 3. 日常持久化的方法 2 4. 理解與分類 3 4.1
Flink中接收端反壓以及Credit機制 (原始碼分析)
先上一張圖整體瞭解Flink中的反壓 可以看到每個task都會有自己對應的IG(inputgate)對接上游傳送過來的資料和RS(resultPatation)對接往下游傳送資料, 整個反壓機制通過inputgat
Flink中傳送端反壓以及Credit機制(原始碼分析)
上一篇《Flink接收端反壓機制》說到因為Flink每個Task的接收端和傳送端是共享一個bufferPool的,形成了天然的反壓機制,當Task接收資料的時候,接收端會根據積壓的資料量以及可用的buffer數量(可用的memorySegment數)來決定是否向上遊傳送Credit(簡而言之就是當我還有空間的
[Flink基礎]--Apache Flink中的廣播狀態實用指南
感謝英文原文作者:https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink Apache Flink中的廣播狀態實用指南 從版本1.5.0開始,Apache FlinkⓇ具
Flink中scala提示錯誤——could not find implicit value for evidence parameter of type org.apa
Flink第一個簡單的demo ,wordCount 該問題參考引用如下: https://blog.csdn.net/dax1n/article/details/70211035 自身程式碼中問題: package cetc.flink import org.apa
Flink狀態管理和容錯機制介紹
本文主要內容如下: 有狀態的流資料處理; Flink中的狀態介面; 狀態管理和容錯機制實現; 一.有狀態的流資料處理 1.1.什麼是有狀態的計算 計算任務的結果不僅僅依賴於輸入,還依賴於它的當前狀態,其實大多數的計算都是有狀態的計算。 比如wordc
Flink中的序列化失敗問題 和transent宣告
最近在Flink的的map運算元中使用了自義定類(實現richMapFunction)來序列化中存在的問題? 一、背景介紹 在編寫Spark程式中,由於在map等運算元內部使用了外部定義的變數和函式,從而引發Task未序列化問題。然而,Spark運算元在計算過程中使用外部變數在許多情
Flink中的資料傳輸與背壓
一圖道盡心酸: 大的原理,上游的task產生資料後,會寫在本地的快取中,然後通知JM自己的資料已經好了,JM通知下游的Task去拉取資料,下游的Task然後去上游的Task拉取資料,形成鏈條。 但是在何時通知JM?這裡有一個設定,比如pipeline還是blocking,pipeline意味著上游哪怕
Flink原始碼系列——Flink中一個簡單的資料處理功能的實現過程
在Flink中,實現從指定主機名和埠接收字串訊息,對接收到的字串中出現的各個單詞,每隔1秒鐘就輸出最近5秒內出現的各個單詞的統計次數。 程式碼實現如下: public class SocketWindowWordCount { public static void
將Flink中的批處理的WordCount轉化為流處理的WordCount
將Flink中的批處理的WordCount轉化為流處理的WordCount 目的:將Flink中批處理的WordCount轉化為流處理的WordCount 作用:感覺毫無用處 如何實現:將批的environmentBatch中的各個運算元,在流的environmentStream中
Akka在Flink中的使用剖析
Akka與Actor 模型 Akka是一個用來開發支援併發、容錯、擴充套件性的應用程式框架。它是actor model的實現,因此跟Erlang的併發模型很像。在actor模型的上下文中,所有的活動實體都被認為是互不依賴的actor。actor之間的互相通訊是
Android中的非同步訊息處理機制
這也是Android中老生常談的一個話題了,它本身並不是很複雜,可是面試官比較喜歡問。本文就從原始碼再簡單的理一下這個機制。也可以說是理一下Handler、Looper、MessageQueue之間的關係。 單執行緒中的訊息處理機制的實現 首先我們以Looper.java原始碼中給出的一個例子來
Apache Flink中的廣播狀態實用指南
感謝英文原文作者:https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink 不過,原文最近好像不能訪問了。應該是https://www.da-platform.com/網站移除了blog板塊了。
Flink中slot的一點理解
slot在flink裡面可以認為是資源組,Flink是通過將任務分成子任務並且將這些子任務分配到slot來並行執行程式。 每個Flink TaskManager在叢集中提供處理槽。 插槽的數量通常與每個TaskManager的可用CPU核心數成比例。一般情況下你的slot數
《從0到1學習Flink》—— Flink 中幾種 Time 詳解
前言Flink 在流程式中支援不同的 Time 概念,就比如有 Processing Time、Event Time 和 Ingestion Time。 下面我們一起來看看這幾個 Time: Processing TimeProcessing Time 是指事件被處理時機器的系統時間。 當流程式在 Pr
Flink中的多source+event watermark測試
這次需要做一個監控專案,全網日誌的指標計算,上線的話,計算量應該是百億/天 單個source對應的sql如下 最原始的sql select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl from ( select pro,thro
深入理解Flink中的狀態
本文是整理自幾個月前的內部flink state分享,flink狀態所包含的東西很多,在下面列舉了一些,還有一些在本文沒有體現,後續會單獨的挑出來再進行講解 state的層次結構 keyedState => windowState OperatorState => kaf
深入理解python3.4中Asyncio庫與Node.js的非同步IO機制
譯者前言 如何用yield以及多路複用機制實現一個基於協程的非同步事件框架? 現有的元件中yield from是如何工作的,值又是如何被傳入yield from表示式的? 在這個yield from之上,是如何在一個執行緒內實現一個排程機制去排程協程的? 協程中呼叫協程的呼叫