聊聊flink StreamOperator的initializeState方法
序
本文主要研究一下flink StreamOperator的initializeState方法
Task.run
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener { public void run() { // ---------------------------- // Initial State transition // ---------------------------- while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } } else if (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } else if (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } } else { if (metrics != null) { metrics.close(); } throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } } // all resource acquisitions and registrations from here on // need to be undone in the end Map<String, Future<Path>> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { // ---------------------------- // Task Bootstrap - We periodically // check for canceling as a shortcut // ---------------------------- //...... // ---------------------------------------------------------------- // call the user code initialization methods // ---------------------------------------------------------------- TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // finalization of a successful execution // ---------------------------------------------------------------- // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { //...... } finally { //...... } } //...... }
- Task的run方法會呼叫invokable.invoke(),這裡的invokable為StreamTask
StreamTask.invoke
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { @Override public final void invoke() throws Exception { boolean disposed = false; try { // -------- Initialize --------- LOG.debug("Initializing {}.", getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory(); synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler( getExecutionConfig().isFailTaskOnCheckpointError(), getEnvironment()); asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this); stateBackend = createStateBackend(); checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName(), getUserCodeClassLoader()); timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } operatorChain = new OperatorChain<>(this, streamRecordWriters); headOperator = operatorChain.getHeadOperator(); // task specific initialization init(); // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } // -------- Invoke -------- LOG.debug("Invoking {}", getName()); // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. initializeState(); openAllOperators(); } // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); // make sure no further checkpoint and notification actions happen. // we make sure that no other thread is currently in the locked scope before // we close the operators by trying to acquire the checkpoint scope lock // we also need to make sure that no triggers fire concurrently with the close logic // at the same time, this makes sure that during any "regular" exit where still synchronized (lock) { // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); // make sure no new timers can come timerService.quiesce(); // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; } // make sure all timers finish timerService.awaitPendingAfterQuiesce(); LOG.debug("Closed operators for task {}", getName()); // make sure all buffered data is flushed operatorChain.flushOutputs(); // make an attempt to dispose the operators such that failures in the dispose call // still let the computation fail tryDisposeAllOperators(); disposed = true; } finally { //...... } } private void initializeState() throws Exception { StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); for (StreamOperator<?> operator : allOperators) { if (null != operator) { operator.initializeState(); } } } //...... }
- StreamTask的invoke方法會呼叫initializeState方法,該方法會遍歷operatorChain上的allOperators(
StreamOperator
),呼叫其initializeState方法;比如這裡的operator為StreamSource
StreamOperator.initializeState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable { /** * Provides a context to initialize all state in the operator. */ void initializeState() throws Exception; //...... }
- StreamOperator介面定義了initializeState方法用於初始化operator的state
StreamSource.initializeState
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
//......
}
- StreamSource繼承了AbstractUdfStreamOperator,它沒有覆蓋initializeState,而AbstractUdfStreamOperator也沒有覆蓋initializeState方法,因而是執行的是AbstractUdfStreamOperator的父類AbstractStreamOperator的initializeState
AbstractStreamOperator.initializeState
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, Serializable {
@Override
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask =
Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics);
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
if (keyedStateBackend != null) {
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
timeServiceManager = context.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
}
}
/**
* Stream operators with state which can be restored need to override this hook method.
*
* @param context context that allows to register different states.
*/
public void initializeState(StateInitializationContext context) throws Exception {
}
//......
}
- AbstractStreamOperator實現了StreamOperator介面定義的initializeState方法,該方法會呼叫initializeState(initializationContext)方法,其子類AbstractUdfStreamOperator對該方法進行了覆蓋
AbstractUdfStreamOperator.initializeState(initializationContext)
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
//......
}
- initializeState(initializationContext)方法這裡呼叫了StreamingFunctionUtils.restoreFunctionState
StreamingFunctionUtils.restoreFunctionState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
public static void restoreFunctionState(
StateInitializationContext context,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
while (true) {
if (tryRestoreFunction(context, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean tryRestoreFunction(
StateInitializationContext context,
Function userFunction) throws Exception {
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).initializeState(context);
return true;
}
if (context.isRestored() && userFunction instanceof ListCheckpointed) {
@SuppressWarnings("unchecked")
ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
ListState<Serializable> listState = context.getOperatorStateStore().
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
List<Serializable> list = new ArrayList<>();
for (Serializable serializable : listState.get()) {
list.add(serializable);
}
try {
listCheckpointedFun.restoreState(list);
} catch (Exception e) {
throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
}
return true;
}
return false;
}
- restoreFunctionState主要是呼叫了tryRestoreFunction方法,而該方法會判斷,如果userFunction實現了CheckpointedFunction介面則呼叫其initializeState方法,如果userFunction實現了ListCheckpointed介面,而且是context.isRestored()為true,那麼就會從OperatorStateStore獲取ListState,將裡頭的值轉換為List,呼叫ListCheckpointed.restoreState方法
小結
- Task的run方法會觸發invokable.invoke(),這裡的invokable為StreamTask,StreamTask的invoke方法會呼叫initializeState方法,該方法會遍歷operatorChain上的allOperators(
StreamOperator
),呼叫其initializeState方法;比如這裡的operator為StreamSource,它繼承了AbstractUdfStreamOperator - StreamOperator介面定義了initializeState方法用於初始化operator的state,其抽象子類AbstractStreamOperator實現了initializeState方法,但是它內部會呼叫呼叫initializeState(initializationContext)方法,而其子類AbstractUdfStreamOperator對該方法進行了覆蓋
- AbstractUdfStreamOperator的initializeState(initializationContext)方法呼叫了StreamingFunctionUtils.restoreFunctionState,而後者會判斷,如果userFunction實現了CheckpointedFunction介面則呼叫其initializeState方法,如果userFunction實現了ListCheckpointed介面,而且是context.isRestored()為true,那麼就會從OperatorStateStore獲取ListState,將裡頭的值轉換為List,呼叫ListCheckpointed.restoreState方法