聊聊flink的CheckpointScheduler
序
本文主要研究一下flink的CheckpointScheduler
CheckpointCoordinatorDeActivator
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */ public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { this.coordinator = checkNotNull(coordinator); } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } }
- CheckpointCoordinatorDeActivator實現了JobStatusListener介面,在jobStatusChanges的時候,根據狀態來呼叫coordinator.startCheckpointScheduler或者coordinator.stopCheckpointScheduler
CheckpointCoordinator.ScheduledTrigger
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
/** * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. */ public class CheckpointCoordinator { /** Map from checkpoint ID to the pending checkpoint */ private final Map<Long, PendingCheckpoint> pendingCheckpoints; /** The number of consecutive failed trigger attempts */ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); //...... public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; long initialDelay = ThreadLocalRandom.current().nextLong( minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L); currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS); } } public void stopCheckpointScheduler() { synchronized (lock) { triggerRequestQueued = false; periodicScheduling = false; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } for (PendingCheckpoint p : pendingCheckpoints.values()) { p.abortError(new Exception("Checkpoint Coordinator is suspending.")); } pendingCheckpoints.clear(); numUnsuccessfulCheckpointsTriggers.set(0); } } private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint for job {}.", job, e); } } } //...... }
- CheckpointCoordinator的startCheckpointScheduler方法首先呼叫stopCheckpointScheduler取消PendingCheckpoint,之後使用timer.scheduleAtFixedRate重新排程ScheduledTrigger
- stopCheckpointScheduler會呼叫PendingCheckpoint.abortError來取消pendingCheckpoints,然後清空pendingCheckpoints(
Map<Long, PendingCheckpoint>
)以及numUnsuccessfulCheckpointsTriggers(AtomicInteger
- ScheduledTrigger實現了Runnable介面,其run方法主要是呼叫triggerCheckpoint,傳遞的isPeriodic引數為true
CheckpointCoordinator.triggerCheckpoint
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
/**
* The checkpoint coordinator coordinates the distributed snapshots of operators and state.
* It triggers the checkpoint by sending the messages to the relevant tasks and collects the
* checkpoint acknowledgements. It also collects and maintains the overview of the state handles
* reported by the tasks that acknowledge the checkpoint.
*/
public class CheckpointCoordinator {
/** Tasks who need to be sent a message when a checkpoint is started */
private final ExecutionVertex[] tasksToTrigger;
/** Tasks who need to acknowledge a checkpoint before it succeeds */
private final ExecutionVertex[] tasksToWaitFor;
/** Map from checkpoint ID to the pending checkpoint */
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
/** The maximum number of checkpoints that may be in progress at the same time */
private final int maxConcurrentCheckpointAttempts;
/** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
private final long minPauseBetweenCheckpointsNanos;
/**
* Triggers a new standard checkpoint and uses the given timestamp as the checkpoint
* timestamp.
*
* @param timestamp The timestamp for the checkpoint.
* @param isPeriodic Flag indicating whether this triggered checkpoint is
* periodic. If this flag is true, but the periodic scheduler is disabled,
* the checkpoint will be declined.
* @return <code>true</code> if triggering the checkpoint succeeded.
*/
public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
}
@VisibleForTesting
public CheckpointTriggerResult triggerCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic) {
// make some eager pre-checks
synchronized (lock) {
// abort if the coordinator has been shutdown in the meantime
if (shutdown) {
return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
}
// Don't allow periodic checkpoint if scheduling has been disabled
if (isPeriodic && !periodicScheduling) {
return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
}
// validate whether the checkpoint can be triggered, with respect to the limit of
// concurrent checkpoints, and the minimum time between checkpoints.
// these checks are not relevant for savepoints
if (!props.forceCheckpoint()) {
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
// if too many checkpoints are currently in progress, we need to mark that a request is queued
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
}
// make sure the minimum interval between checkpoints has passed
final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
// Reassign the new trigger to the currentPeriodicTrigger
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(),
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
}
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
// next, check if all tasks that need to acknowledge the checkpoint are running.
// if not, abort the checkpoint
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
ev.getTaskNameWithSubtaskIndex(),
job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
// we will actually trigger this checkpoint!
// we lock with a special lock to make sure that trigger requests do not overtake each other.
// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
// may issue blocking operations. Using a different lock than the coordinator-wide lock,
// we avoid blocking the processing of 'acknowledge/decline' messages during that time.
synchronized (triggerLock) {
final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
try {
// this must happen outside the coordinator-wide lock, because it communicates
// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
checkpointStorageLocation = props.isSavepoint() ?
checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
job,
numUnsuccessful,
t);
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
props,
checkpointStorageLocation,
executor);
if (statsTracker != null) {
PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
checkpointID,
timestamp,
props);
checkpoint.setStatsCallback(callback);
}
// schedule the timer that will clean up the expired checkpoints
final Runnable canceller = () -> {
synchronized (lock) {
// only do the work if the checkpoint is not discarded anyways
// note that checkpoint completion discards the pending checkpoint object
if (!checkpoint.isDiscarded()) {
LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
checkpoint.abortExpired();
pendingCheckpoints.remove(checkpointID);
rememberRecentCheckpointId(checkpointID);
triggerQueuedRequests();
}
}
};
try {
// re-acquire the coordinator-wide lock
synchronized (lock) {
// since we released the lock in the meantime, we need to re-check
// that the conditions still hold.
if (shutdown) {
return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
}
else if (!props.forceCheckpoint()) {
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
}
// make sure the minimum interval between checkpoints has passed
final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
// Reassign the new trigger to the currentPeriodicTrigger
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(),
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
pendingCheckpoints.put(checkpointID, checkpoint);
ScheduledFuture<?> cancellerHandle = timer.schedule(
canceller,
checkpointTimeout, TimeUnit.MILLISECONDS);
if (!checkpoint.setCancellerHandle(cancellerHandle)) {
// checkpoint is already disposed!
cancellerHandle.cancel(false);
}
// trigger the master hooks for the checkpoint
final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
for (MasterState s : masterStates) {
checkpoint.addMasterState(s);
}
}
// end of lock scope
final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference());
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
numUnsuccessfulCheckpointsTriggers.set(0);
return new CheckpointTriggerResult(checkpoint);
}
catch (Throwable t) {
// guard the map against concurrent modifications
synchronized (lock) {
pendingCheckpoints.remove(checkpointID);
}
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
checkpointID, job, numUnsuccessful, t);
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
}
try {
checkpointStorageLocation.disposeOnFailure();
}
catch (Throwable t2) {
LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
}
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
} // end trigger lock
}
//......
}
- 首先判斷如果不是forceCheckpoint的話,則判斷當前的pendingCheckpoints值是否超過maxConcurrentCheckpointAttempts,超過的話,立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);之後判斷距離lastCheckpointCompletionNanos的時間是否大於等於minPauseBetweenCheckpointsNanos,否則fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS),確保checkpoint不被頻繁觸發
- 之後檢查tasksToTrigger的任務(
觸發checkpoint的時候需要通知到的task
)是否都處於RUNNING狀態,不是的話則立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) - 之後檢查tasksToWaitFor的任務(
需要在執行成功的時候ack checkpoint的任務
)是否都處於RUNNING狀態,不是的話立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) - 前面幾步檢查通過了之後才開始真正的checkpoint的觸發,它首先分配一個checkpointID,然後初始化checkpointStorageLocation,如果異常則返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);之後建立PendingCheckpoint,同時準備canceller(
用於在失效的時候執行abort操作
);之後對於不是forceCheckpoint的,再重新來一輪TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校驗 - 最後就是針對Execution,挨個觸發execution的triggerCheckpoint操作,成功返回CheckpointTriggerResult(checkpoint),異常則返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)
Execution.triggerCheckpoint
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java
public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
/**
* Trigger a new checkpoint on the task of this execution.
*
* @param checkpointId of th checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
*/
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
}
}
//......
}
- triggerCheckpoint主要是呼叫taskManagerGateway.triggerCheckpoint,這裡的taskManagerGateway為RpcTaskManagerGateway
RpcTaskManagerGateway
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
/**
* Implementation of the {@link TaskManagerGateway} for Flink's RPC system.
*/
public class RpcTaskManagerGateway implements TaskManagerGateway {
private final TaskExecutorGateway taskExecutorGateway;
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
taskExecutorGateway.triggerCheckpoint(
executionAttemptID,
checkpointId,
timestamp,
checkpointOptions);
}
//......
}
- RpcTaskManagerGateway的triggerCheckpoint方法呼叫taskExecutorGateway.triggerCheckpoint,這裡的taskExecutorGateway為AkkaInvocationHandler,通過rpc通知TaskExecutor
TaskExecutor.triggerCheckpoint
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
/**
* TaskExecutor implementation. The task executor is responsible for the execution of multiple
* {@link Task}.
*/
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message));
}
}
//......
}
- TaskExecutor的triggerCheckpoint方法這裡呼叫task.triggerCheckpointBarrier
Task.triggerCheckpointBarrier
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener {
/** The invokable of this task, if initialized. All accesses must copy the reference and
* check for null, as this field is cleared as part of the disposal logic. */
@Nullable
private volatile AbstractInvokable invokable;
/**
* Calls the invokable to trigger a checkpoint.
*
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
* @param checkpointOptions Options for performing this checkpoint.
*/
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
// build a local closure
final String taskName = taskNameWithSubtask;
final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
try {
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
} finally {
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
}
}
//......
}
- Task的triggerCheckpointBarrier方法首先判斷executionState是否RUNNING以及invokable是否不為null,不滿足條件則執行checkpointResponder.declineCheckpoint
- 滿足條件則執行executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId))
- 這個runnable方法裡頭會執行invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions),這裡的invokable為SourceStreamTask
SourceStreamTask.triggerCheckpoint
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
extends StreamTask<OUT, OP> {
private volatile boolean externallyInducedCheckpoints;
@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
if (!externallyInducedCheckpoints) {
return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
}
else {
// we do not trigger checkpoints here, we simply state whether we can trigger them
synchronized (getCheckpointLock()) {
return isRunning();
}
}
}
//......
}
- SourceStreamTask的triggerCheckpoint先判斷,如果externallyInducedCheckpoints為false,則呼叫父類StreamTask的triggerCheckpoint
StreamTask.triggerCheckpoint
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
try {
// No alignment if we inject a checkpoint
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);
return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
" for operator " + getName() + '.', e);
} else {
LOG.debug("Could not perform checkpoint {} for operator {} while the " +
"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
return false;
}
}
}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
if (isRunning) {
// we can do a checkpoint
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
else {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
Exception exception = null;
for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
try {
streamRecordWriter.broadcastEvent(message);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
exception);
}
}
if (exception != null) {
throw exception;
}
return false;
}
}
}
private void checkpointState(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getTargetLocation());
CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
this,
checkpointMetaData,
checkpointOptions,
storage,
checkpointMetrics);
checkpointingOperation.executeCheckpointing();
}
//......
}
- StreamTask的triggerCheckpoint方法的主要處理邏輯在performCheckpoint方法上,該方法針對task的isRunning分別進行不同處理
- isRunning為true的時候,這裡頭分了三步來處理,第一步執行operatorChain.prepareSnapshotPreBarrier,第二步執行operatorChain.broadcastCheckpointBarrier,第三步執行checkpointState方法,checkpointState裡頭建立CheckpointingOperation,然後呼叫checkpointingOperation.executeCheckpointing()
- 如果isRunning為false,則這裡streamRecordWriter.broadcastEvent(message),這裡的message為CancelCheckpointMarker
OperatorChain.prepareSnapshotPreBarrier
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// go forward through the operator chain and tell each operator
// to prepare the checkpoint
final StreamOperator<?>[] operators = this.allOperators;
for (int i = operators.length - 1; i >= 0; --i) {
final StreamOperator<?> op = operators[i];
if (op != null) {
op.prepareSnapshotPreBarrier(checkpointId);
}
}
}
//......
}
- OperatorChain的prepareSnapshotPreBarrier會遍歷allOperators挨個呼叫StreamOperator的prepareSnapshotPreBarrier方法
OperatorChain.broadcastCheckpointBarrier
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
//......
}
- OperatorChain的broadcastCheckpointBarrier方法則會遍歷streamOutputs挨個呼叫streamOutput的broadcastEvent方法
CheckpointingOperation.executeCheckpointing
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
private static final class CheckpointingOperation {
private final StreamTask<?, ?> owner;
private final CheckpointMetaData checkpointMetaData;
private final CheckpointOptions checkpointOptions;
private final CheckpointMetrics checkpointMetrics;
private final CheckpointStreamFactory storageLocation;
private final StreamOperator<?>[] allOperators;
private long startSyncPartNano;
private long startAsyncPartNano;
// ------------------------
private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
public CheckpointingOperation(
StreamTask<?, ?> owner,
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory checkpointStorageLocation,
CheckpointMetrics checkpointMetrics) {
this.owner = Preconditions.checkNotNull(owner);
this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation);
this.allOperators = owner.operatorChain.getAllOperators();
this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length);
}
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime();
try {
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
} catch (Exception ex) {
// Cleanup to release resources
for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
if (null != operatorSnapshotResult) {
try {
operatorSnapshotResult.cancel();
} catch (Exception e) {
LOG.warn("Could not properly cancel an operator snapshot result.", e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);
}
}
@SuppressWarnings("deprecation")
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions,
storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
}
}
private enum AsyncCheckpointState {
RUNNING,
DISCARDED,
COMPLETED
}
}
- CheckpointingOperation定義在StreamTask類裡頭,executeCheckpointing方法先對所有的StreamOperator執行checkpointStreamOperator操作,checkpointStreamOperator方法會呼叫StreamOperator的snapshotState方法,之後建立AsyncCheckpointRunnable任務並提交非同步執行
AbstractStreamOperator.snapshotState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, Serializable {
@Override
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables())) {
snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + ".";
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
throw new Exception(snapshotFailMessage, snapshotException);
}
return snapshotInProgress;
}
/**
* Stream operators with state, which want to participate in a snapshot need to override this hook method.
*
* @param context context that provides information and means required for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws Exception {
final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
KeyedStateCheckpointOutputStream out;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed operator state stream for " +
getOperatorName() + '.', exception);
}
try {
KeyGroupsList allKeyGroups = out.getKeyGroupList();
for (int keyGroupIdx : allKeyGroups) {
out.startNewKeyGroup(keyGroupIdx);
timeServiceManager.snapshotStateForKeyGroup(
new DataOutputViewStreamWrapper(out), keyGroupIdx);
}
} catch (Exception exception) {
throw new Exception("Could not write timer service of " + getOperatorName() +
" to checkpoint state stream.", exception);
} finally {
try {
out.close();
} catch (Exception closeException) {
LOG.warn("Could not close raw keyed operator state stream for {}. This " +
"might have prevented deleting some state data.", getOperatorName(), closeException);
}
}
}
}
//......
}
- AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend型別,而且requiresLegacySynchronousTimerSnapshots為true的條件下才會操作,具體是觸發timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx);不過它有不同的子類可能覆蓋了snapshotState方法,比如AbstractUdfStreamOperator
AbstractUdfStreamOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}
//......
}
- AbstractUdfStreamOperator覆蓋了父類AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作
StreamingFunctionUtils.snapshotFunctionState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
@Internal
public final class StreamingFunctionUtils {
public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
if (userFunction instanceof ListCheckpointed) {
@SuppressWarnings("unchecked")
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
ListState<Serializable> listState = backend.
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
listState.clear();
if (null != partitionableState) {
try {
for (Serializable statePartition : partitionableState) {
listState.add(statePartition);
}
} catch (Exception e) {
listState.clear();
throw new Exception("Could not write partitionable state to operator " +
"state backend.", e);
}
}
return true;
}
return false;
}
//......
}
- snapshotFunctionState方法,這裡執行了trySnapshotFunctionState操作,這裡userFunction的型別,如果實現了CheckpointedFunction介面,則呼叫CheckpointedFunction.snapshotState,如果實現了ListCheckpointed介面,則呼叫ListCheckpointed.snapshotState方法,注意這裡先clear了ListState,然後呼叫ListState.add方法將返回的List新增到ListState中
小結
- flink的CheckpointCoordinatorDeActivator在job的status為RUNNING的時候會觸發CheckpointCoordinator的startCheckpointScheduler,非RUNNING的時候呼叫CheckpointCoordinator的stopCheckpointScheduler方法
- CheckpointCoordinator的startCheckpointScheduler主要是註冊了ScheduledTrigger任務,其run方法執行triggerCheckpoint操作,triggerCheckpoint方法在真正觸發checkpoint之前會進行一系列的校驗,不滿足則立刻fail fast,其中可能的原因有(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS、CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS、NOT_ALL_REQUIRED_TASKS_RUNNING);滿足條件的話,就是挨個遍歷executions,呼叫Execution.triggerCheckpoint,它藉助taskManagerGateway.triggerCheckpoint來通過rpc呼叫TaskExecutor的triggerCheckpoint方法
- TaskExecutor的triggerCheckpoint主要是呼叫Task的triggerCheckpointBarrier方法,後者主要是非同步執行一個runnable,裡頭的run方法是呼叫invokable.triggerCheckpoint,這裡的invokable為SourceStreamTask,而它主要是呼叫父類StreamTask的triggerCheckpoint方法,該方法的主要邏輯在performCheckpoint操作上;performCheckpoint在isRunning為true的時候,分了三步來處理,第一步執行operatorChain.prepareSnapshotPreBarrier,第二步執行operatorChain.broadcastCheckpointBarrier,第三步執行checkpointState方法,checkpointState裡頭建立CheckpointingOperation,然後呼叫checkpointingOperation.executeCheckpointing()
- CheckpointingOperation的executeCheckpointing方法會對所有的StreamOperator執行checkpointStreamOperator操作,而checkpointStreamOperator方法會呼叫StreamOperator的snapshotState方法;AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend型別,而且requiresLegacySynchronousTimerSnapshots為true的條件下才會操作
- AbstractUdfStreamOperator覆蓋了父類AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作,該操作會根據userFunction的型別呼叫相應的方法(
如果實現了CheckpointedFunction介面,則呼叫CheckpointedFunction.snapshotState,如果實現了ListCheckpointed介面,則呼叫ListCheckpointed.snapshotState方法
)