1. 程式人生 > >聊聊flink StreamOperator的initializeState方法

聊聊flink StreamOperator的initializeState方法

本文主要研究一下flink StreamOperator的initializeState方法

Task.run

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {

	public void run() {

		// ----------------------------
		//  Initial State transition
		// ----------------------------
		while (true) {
			ExecutionState current = this.executionState;
			if (current == ExecutionState.CREATED) {
				if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
					// success, we can start our work
					break;
				}
			}
			else if (current == ExecutionState.FAILED) {
				// we were immediately failed. tell the TaskManager that we reached our final state
				notifyFinalState();
				if (metrics != null) {
					metrics.close();
				}
				return;
			}
			else if (current == ExecutionState.CANCELING) {
				if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
					// we were immediately canceled. tell the TaskManager that we reached our final state
					notifyFinalState();
					if (metrics != null) {
						metrics.close();
					}
					return;
				}
			}
			else {
				if (metrics != null) {
					metrics.close();
				}
				throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
			}
		}

		// all resource acquisitions and registrations from here on
		// need to be undone in the end
		Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
		AbstractInvokable invokable = null;

		try {
			// ----------------------------
			//  Task Bootstrap - We periodically
			//  check for canceling as a shortcut
			// ----------------------------

			//......

			// ----------------------------------------------------------------
			//  call the user code initialization methods
			// ----------------------------------------------------------------

			TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());

			Environment env = new RuntimeEnvironment(
				jobId,
				vertexId,
				executionId,
				executionConfig,
				taskInfo,
				jobConfiguration,
				taskConfiguration,
				userCodeClassLoader,
				memoryManager,
				ioManager,
				broadcastVariableManager,
				taskStateManager,
				accumulatorRegistry,
				kvStateRegistry,
				inputSplitProvider,
				distributedCacheEntries,
				producedPartitions,
				inputGates,
				network.getTaskEventDispatcher(),
				checkpointResponder,
				taskManagerConfig,
				metrics,
				this);

			// now load and instantiate the task's invokable code
			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

			// ----------------------------------------------------------------
			//  actual task core work
			// ----------------------------------------------------------------

			// we must make strictly sure that the invokable is accessible to the cancel() call
			// by the time we switched to running.
			this.invokable = invokable;

			// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
			if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
				throw new CancelTaskException();
			}

			// notify everyone that we switched to running
			taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

			// make sure the user code classloader is accessible thread-locally
			executingThread.setContextClassLoader(userCodeClassLoader);

			// run the invokable
			invokable.invoke();

			// make sure, we enter the catch block if the task leaves the invoke() method due
			// to the fact that it has been canceled
			if (isCanceledOrFailed()) {
				throw new CancelTaskException();
			}

			// ----------------------------------------------------------------
			//  finalization of a successful execution
			// ----------------------------------------------------------------

			// finish the produced partitions. if this fails, we consider the execution failed.
			for (ResultPartition partition : producedPartitions) {
				if (partition != null) {
					partition.finish();
				}
			}

			// try to mark the task as finished
			// if that fails, the task was canceled/failed in the meantime
			if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
				throw new CancelTaskException();
			}
		}
		catch (Throwable t) {
			//......
		}
		finally {
			//......
		}
	}
    
    //......
}
  • Task的run方法會呼叫invokable.invoke(),這裡的invokable為StreamTask

StreamTask.invoke

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
		extends AbstractInvokable
		implements AsyncExceptionHandler {

	@Override
	public final void invoke() throws Exception {

		boolean disposed = false;
		try {
			// -------- Initialize ---------
			LOG.debug("Initializing {}.", getName());

			asyncOperationsThreadPool = Executors.newCachedThreadPool();

			CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();

			synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
				getExecutionConfig().isFailTaskOnCheckpointError(),
				getEnvironment());

			asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);

			stateBackend = createStateBackend();
			checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

			// if the clock is not already set, then assign a default TimeServiceProvider
			if (timerService == null) {
				ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
					"Time Trigger for " + getName(), getUserCodeClassLoader());

				timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
			}

			operatorChain = new OperatorChain<>(this, streamRecordWriters);
			headOperator = operatorChain.getHeadOperator();

			// task specific initialization
			init();

			// save the work of reloading state, etc, if the task is already canceled
			if (canceled) {
				throw new CancelTaskException();
			}

			// -------- Invoke --------
			LOG.debug("Invoking {}", getName());

			// we need to make sure that any triggers scheduled in open() cannot be
			// executed before all operators are opened
			synchronized (lock) {

				// both the following operations are protected by the lock
				// so that we avoid race conditions in the case that initializeState()
				// registers a timer, that fires before the open() is called.

				initializeState();
				openAllOperators();
			}

			// final check to exit early before starting to run
			if (canceled) {
				throw new CancelTaskException();
			}

			// let the task do its work
			isRunning = true;
			run();

			// if this left the run() method cleanly despite the fact that this was canceled,
			// make sure the "clean shutdown" is not attempted
			if (canceled) {
				throw new CancelTaskException();
			}

			LOG.debug("Finished task {}", getName());

			// make sure no further checkpoint and notification actions happen.
			// we make sure that no other thread is currently in the locked scope before
			// we close the operators by trying to acquire the checkpoint scope lock
			// we also need to make sure that no triggers fire concurrently with the close logic
			// at the same time, this makes sure that during any "regular" exit where still
			synchronized (lock) {
				// this is part of the main logic, so if this fails, the task is considered failed
				closeAllOperators();

				// make sure no new timers can come
				timerService.quiesce();

				// only set the StreamTask to not running after all operators have been closed!
				// See FLINK-7430
				isRunning = false;
			}

			// make sure all timers finish
			timerService.awaitPendingAfterQuiesce();

			LOG.debug("Closed operators for task {}", getName());

			// make sure all buffered data is flushed
			operatorChain.flushOutputs();

			// make an attempt to dispose the operators such that failures in the dispose call
			// still let the computation fail
			tryDisposeAllOperators();
			disposed = true;
		}
		finally {
			//......
		}
	}

	private void initializeState() throws Exception {

		StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

		for (StreamOperator<?> operator : allOperators) {
			if (null != operator) {
				operator.initializeState();
			}
		}
	}

    //......
}
  • StreamTask的invoke方法會呼叫initializeState方法,該方法會遍歷operatorChain上的allOperators(StreamOperator),呼叫其initializeState方法;比如這裡的operator為StreamSource

StreamOperator.initializeState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
	/**
	 * Provides a context to initialize all state in the operator.
	 */
	void initializeState() throws Exception;

	//......
}
  • StreamOperator介面定義了initializeState方法用於初始化operator的state

StreamSource.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
		extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

		//......
}
  • StreamSource繼承了AbstractUdfStreamOperator,它沒有覆蓋initializeState,而AbstractUdfStreamOperator也沒有覆蓋initializeState方法,因而是執行的是AbstractUdfStreamOperator的父類AbstractStreamOperator的initializeState

AbstractStreamOperator.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>, Serializable {

	@Override
	public final void initializeState() throws Exception {

		final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

		final StreamTask<?, ?> containingTask =
			Preconditions.checkNotNull(getContainingTask());
		final CloseableRegistry streamTaskCloseableRegistry =
			Preconditions.checkNotNull(containingTask.getCancelables());
		final StreamTaskStateInitializer streamTaskStateManager =
			Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

		final StreamOperatorStateContext context =
			streamTaskStateManager.streamOperatorStateContext(
				getOperatorID(),
				getClass().getSimpleName(),
				this,
				keySerializer,
				streamTaskCloseableRegistry,
				metrics);

		this.operatorStateBackend = context.operatorStateBackend();
		this.keyedStateBackend = context.keyedStateBackend();

		if (keyedStateBackend != null) {
			this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
		}

		timeServiceManager = context.internalTimerServiceManager();

		CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
		CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();

		try {
			StateInitializationContext initializationContext = new StateInitializationContextImpl(
				context.isRestored(), // information whether we restore or start for the first time
				operatorStateBackend, // access to operator state backend
				keyedStateStore, // access to keyed state backend
				keyedStateInputs, // access to keyed state stream
				operatorStateInputs); // access to operator state stream

			initializeState(initializationContext);
		} finally {
			closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
			closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
		}
	}

	/**
	 * Stream operators with state which can be restored need to override this hook method.
	 *
	 * @param context context that allows to register different states.
	 */
	public void initializeState(StateInitializationContext context) throws Exception {

	}

	//......
}
  • AbstractStreamOperator實現了StreamOperator介面定義的initializeState方法,該方法會呼叫initializeState(initializationContext)方法,其子類AbstractUdfStreamOperator對該方法進行了覆蓋

AbstractUdfStreamOperator.initializeState(initializationContext)

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
		extends AbstractStreamOperator<OUT>
		implements OutputTypeConfigurable<OUT> {

	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);
		StreamingFunctionUtils.restoreFunctionState(context, userFunction);
	}
    
    //......
}
  • initializeState(initializationContext)方法這裡呼叫了StreamingFunctionUtils.restoreFunctionState

StreamingFunctionUtils.restoreFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

	public static void restoreFunctionState(
			StateInitializationContext context,
			Function userFunction) throws Exception {

		Preconditions.checkNotNull(context);

		while (true) {

			if (tryRestoreFunction(context, userFunction)) {
				break;
			}

			// inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
			if (userFunction instanceof WrappingFunction) {
				userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
			} else {
				break;
			}
		}
	}

	private static boolean tryRestoreFunction(
			StateInitializationContext context,
			Function userFunction) throws Exception {

		if (userFunction instanceof CheckpointedFunction) {
			((CheckpointedFunction) userFunction).initializeState(context);

			return true;
		}

		if (context.isRestored() && userFunction instanceof ListCheckpointed) {
			@SuppressWarnings("unchecked")
			ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;

			ListState<Serializable> listState = context.getOperatorStateStore().
					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

			List<Serializable> list = new ArrayList<>();

			for (Serializable serializable : listState.get()) {
				list.add(serializable);
			}

			try {
				listCheckpointedFun.restoreState(list);
			} catch (Exception e) {

				throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
			}

			return true;
		}

		return false;
	}
  • restoreFunctionState主要是呼叫了tryRestoreFunction方法,而該方法會判斷,如果userFunction實現了CheckpointedFunction介面則呼叫其initializeState方法,如果userFunction實現了ListCheckpointed介面,而且是context.isRestored()為true,那麼就會從OperatorStateStore獲取ListState,將裡頭的值轉換為List,呼叫ListCheckpointed.restoreState方法

小結

  • Task的run方法會觸發invokable.invoke(),這裡的invokable為StreamTask,StreamTask的invoke方法會呼叫initializeState方法,該方法會遍歷operatorChain上的allOperators(StreamOperator),呼叫其initializeState方法;比如這裡的operator為StreamSource,它繼承了AbstractUdfStreamOperator
  • StreamOperator介面定義了initializeState方法用於初始化operator的state,其抽象子類AbstractStreamOperator實現了initializeState方法,但是它內部會呼叫呼叫initializeState(initializationContext)方法,而其子類AbstractUdfStreamOperator對該方法進行了覆蓋
  • AbstractUdfStreamOperator的initializeState(initializationContext)方法呼叫了StreamingFunctionUtils.restoreFunctionState,而後者會判斷,如果userFunction實現了CheckpointedFunction介面則呼叫其initializeState方法,如果userFunction實現了ListCheckpointed介面,而且是context.isRestored()為true,那麼就會從OperatorStateStore獲取ListState,將裡頭的值轉換為List,呼叫ListCheckpointed.restoreState方法

doc