1. 程式人生 > >聊聊flink的PartitionableListState

聊聊flink的PartitionableListState

本文主要研究一下flink的PartitionableListState

PartitionableListState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

	/**
	 * Implementation of operator list state.
	 *
	 * @param <S> the type of an operator state partition.
	 */
	static final class PartitionableListState<S> implements ListState<S> {

		/**
		 * Meta information of the state, including state name, assignment mode, and serializer
		 */
		private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;

		/**
		 * The internal list the holds the elements of the state
		 */
		private final ArrayList<S> internalList;

		/**
		 * A serializer that allows to perform deep copies of internalList
		 */
		private final ArrayListSerializer<S> internalListCopySerializer;

		PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
			this(stateMetaInfo, new ArrayList<S>());
		}

		private PartitionableListState(
				RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
				ArrayList<S> internalList) {

			this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
			this.internalList = Preconditions.checkNotNull(internalList);
			this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
		}

		private PartitionableListState(PartitionableListState<S> toCopy) {

			this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
		}

		public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
			this.stateMetaInfo = stateMetaInfo;
		}

		public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
			return stateMetaInfo;
		}

		public PartitionableListState<S> deepCopy() {
			return new PartitionableListState<>(this);
		}

		@Override
		public void clear() {
			internalList.clear();
		}

		@Override
		public Iterable<S> get() {
			return internalList;
		}

		@Override
		public void add(S value) {
			Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
			internalList.add(value);
		}

		@Override
		public String toString() {
			return "PartitionableListState{" +
					"stateMetaInfo=" + stateMetaInfo +
					", internalList=" + internalList +
					'}';
		}

		public long[] write(FSDataOutputStream out) throws IOException {

			long[] partitionOffsets = new long[internalList.size()];

			DataOutputView dov = new DataOutputViewStreamWrapper(out);

			for (int i = 0; i < internalList.size(); ++i) {
				S element = internalList.get(i);
				partitionOffsets[i] = out.getPos();
				getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
			}

			return partitionOffsets;
		}

		@Override
		public void update(List<S> values) {
			internalList.clear();

			addAll(values);
		}

		@Override
		public void addAll(List<S> values) {
			if (values != null && !values.isEmpty()) {
				internalList.addAll(values);
			}
		}
	}
  • PartitionableListState是DefaultOperatorStateBackend使用的ListState實現,其內部使用的是ArrayList(internalList)來儲存state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo;其write方法將internalList的資料序列化到FSDataOutputStream,並返回每個記錄對應的offset陣列(partitionOffsets)

ListState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.java

/**
 * {@link State} interface for partitioned list state in Operations.
 * The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <T> Type of values that this list state keeps.
 */
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

	/**
	 * Updates the operator state accessible by {@link #get()} by updating existing values to
	 * to the given list of values. The next time {@link #get()} is called (for the same state
	 * partition) the returned state will represent the updated list.
	 *
	 * <p>If null or an empty list is passed in, the state value will be null.
	 *
	 * @param values The new values for the state.
	 *
	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
	 */
	void update(List<T> values) throws Exception;

	/**
	 * Updates the operator state accessible by {@link #get()} by adding the given values
	 * to existing list of values. The next time {@link #get()} is called (for the same state
	 * partition) the returned state will represent the updated list.
	 *
	 * <p>If null or an empty list is passed in, the state value remains unchanged.
	 *
	 * @param values The new values to be added to the state.
	 *
	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
	 */
	void addAll(List<T> values) throws Exception;
}
  • ListState主要用於operation儲存partitioned list state,它繼承了MergingState介面(指定OUT的泛型為Iterable<T>),同時聲明瞭兩個方法;其中update用於全量更新state,如果引數為null或者empty,那麼state會被清空;addAll方法用於增量更新,如果引數為null或者empty,則保持不變,否則則新增給定的values

MergingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.java

/**
 * Extension of {@link AppendingState} that allows merging of state. That is, two instances
 * of {@link MergingState} can be combined into a single instance that contains all the
 * information of the two merged states.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
  • MergingState介面僅僅是繼承了AppendingState介面,用介面命名錶示該state支援state合併

AppendingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java

/**
 * Base interface for partitioned state that supports adding elements and inspecting the current
 * state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
 *
 * <p>The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

	/**
	 * Returns the current value for the state. When the state is not
	 * partitioned the returned value is the same for all inputs in a given
	 * operator instance. If state partitioning is applied, the value returned
	 * depends on the current operator input, as the operator maintains an
	 * independent state for each partition.
	 *
	 * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
	 * should return {@code null}.
	 *
	 * @return The operator state value corresponding to the current input or {@code null}
	 * if the state is empty.
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	OUT get() throws Exception;

	/**
	 * Updates the operator state accessible by {@link #get()} by adding the given value
	 * to the list of values. The next time {@link #get()} is called (for the same state
	 * partition) the returned state will represent the updated list.
	 *
	 * <p>If null is passed in, the state value will remain unchanged.
	 *
	 * @param value The new value for the state.
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	void add(IN value) throws Exception;

}
  • AppendingState是partitioned state的基本介面,它繼承了State介面,同時聲明瞭get、add兩個方法;get方法用於返回當前state的值,如果為空則返回null;add方法用於給state新增值

State

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java

/**
 * Interface that different types of partitioned state must implement.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 */
@PublicEvolving
public interface State {

	/**
	 * Removes the value mapped under the current key.
	 */
	void clear();
}
  • State介面定義了所有不同partitioned state實現必須實現的方法,這裡定義了clear方法用於清空當前state的所有值

RegisteredOperatorStateBackendMetaInfo

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java

/**
 * Compound meta information for a registered state in an operator state backend.
 * This contains the state name, assignment mode, and state partition serializer.
 *
 * @param <S> Type of the state.
 */
public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {

	/**
	 * The mode how elements in this state are assigned to tasks during restore
	 */
	@Nonnull
	private final OperatorStateHandle.Mode assignmentMode;

	/**
	 * The type serializer for the elements in the state list
	 */
	@Nonnull
	private final TypeSerializer<S> partitionStateSerializer;

	public RegisteredOperatorStateBackendMetaInfo(
			@Nonnull String name,
			@Nonnull TypeSerializer<S> partitionStateSerializer,
			@Nonnull OperatorStateHandle.Mode assignmentMode) {
		super(name);
		this.partitionStateSerializer = partitionStateSerializer;
		this.assignmentMode = assignmentMode;
	}

	private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
		this(
			Preconditions.checkNotNull(copy).name,
			copy.partitionStateSerializer.duplicate(),
			copy.assignmentMode);
	}

	@SuppressWarnings("unchecked")
	public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
		this(
			snapshot.getName(),
			(TypeSerializer<S>) Preconditions.checkNotNull(
				snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
			OperatorStateHandle.Mode.valueOf(
				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
	}

	/**
	 * Creates a deep copy of the itself.
	 */
	@Nonnull
	public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
		return new RegisteredOperatorStateBackendMetaInfo<>(this);
	}

	@Nonnull
	@Override
	public StateMetaInfoSnapshot snapshot() {
		return computeSnapshot();
	}

	//......

	@Nonnull
	private StateMetaInfoSnapshot computeSnapshot() {
		Map<String, String> optionsMap = Collections.singletonMap(
			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
			assignmentMode.toString());
		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
		Map<String, TypeSerializer<?>> serializerMap =
			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
		Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());

		return new StateMetaInfoSnapshot(
			name,
			StateMetaInfoSnapshot.BackendStateType.OPERATOR,
			optionsMap,
			serializerConfigSnapshotsMap,
			serializerMap);
	}
}
  • RegisteredOperatorStateBackendMetaInfo繼承了抽象類RegisteredStateMetaInfoBase,實現了snapshot的抽象方法,這裡是通過computeSnapshot方法來實現;computeSnapshot方法主要是構造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap、serializerMap

小結

  • flink的manageed operator state僅僅支援ListState,DefaultOperatorStateBackend使用的ListState實現是PartitionableListState,其內部使用的是ArrayList(internalList)來儲存state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo
  • PartitionableListState實現了ListState介面(update、addAll方法);而ListState介面繼承了MergingState介面(指定OUT的泛型為Iterable<T>);MergingState介面沒有宣告其他方法,它繼承了AppendingState介面;AppendingState介面繼承了State介面,同時聲明瞭get、add方法;State介面則定義了clear方法
  • RegisteredOperatorStateBackendMetaInfo繼承了抽象類RegisteredStateMetaInfoBase,實現了snapshot的抽象方法,這裡是通過computeSnapshot方法來實現;computeSnapshot方法主要是構造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap、serializerMap

doc