1. 程式人生 > >聊聊flink的MemoryBackendCheckpointStorage

聊聊flink的MemoryBackendCheckpointStorage

本文主要研究一下flink的MemoryBackendCheckpointStorage

CheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java

/**
 * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
 * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
 * created by this class.
 */
public interface CheckpointStorage {


	boolean supportsHighlyAvailableStorage();

	boolean hasDefaultSavepointLocation();

	CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

	CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;

	CheckpointStorageLocation initializeLocationForSavepoint(
			long checkpointId,
			@Nullable String externalLocationPointer) throws IOException;

	CheckpointStreamFactory resolveCheckpointStorageLocation(
			long checkpointId,
			CheckpointStorageLocationReference reference) throws IOException;

	CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
  • CheckpointStorage介面主要定義了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回該backend是否支援highly available storage;hasDefaultSavepointLocation方法是否有預設的savepoint location;resolveCheckpoint方法用於解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根據checkpointId來初始化storage location;initializeLocationForSavepoint方法用於根據checkpointId來初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用於開啟一個stream來持久化checkpoint state

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {

	// ------------------------------------------------------------------------
	//  Constants
	// ------------------------------------------------------------------------

	/** The prefix of the directory containing the data exclusive to a checkpoint. */
	public static final String CHECKPOINT_DIR_PREFIX = "chk-";

	/** The name of the directory for shared checkpoint state. */
	public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";

	/** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
	public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";

	/** The name of the metadata files in checkpoints / savepoints. */
	public static final String METADATA_FILE_NAME = "_metadata";

	/** The magic number that is put in front of any reference. */
	private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };

	// ------------------------------------------------------------------------
	//  Fields and properties
	// ------------------------------------------------------------------------

	/** The jobId, written into the generated savepoint directories. */
	private final JobID jobId;

	/** The default location for savepoints. Null, if none is configured. */
	@Nullable
	private final Path defaultSavepointDirectory;

	@Override
	public boolean hasDefaultSavepointLocation() {
		return defaultSavepointDirectory != null;
	}

	@Override
	public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
		return resolveCheckpointPointer(checkpointPointer);
	}

	/**
	 * Creates a file system based storage location for a savepoint.
	 *
	 * <p>This methods implements the logic that decides which location to use (given optional
	 * parameters for a configured location and a location passed for this specific savepoint)
	 * and how to name and initialize the savepoint directory.
	 *
	 * @param externalLocationPointer    The target location pointer for the savepoint.
	 *                                   Must be a valid URI. Null, if not supplied.
	 * @param checkpointId               The checkpoint ID of the savepoint.
	 *
	 * @return The checkpoint storage location for the savepoint.
	 *
	 * @throws IOException Thrown if the target directory could not be created.
	 */
	@Override
	public CheckpointStorageLocation initializeLocationForSavepoint(
			@SuppressWarnings("unused") long checkpointId,
			@Nullable String externalLocationPointer) throws IOException {

		// determine where to write the savepoint to

		final Path savepointBasePath;
		if (externalLocationPointer != null) {
			savepointBasePath = new Path(externalLocationPointer);
		}
		else if (defaultSavepointDirectory != null) {
			savepointBasePath = defaultSavepointDirectory;
		}
		else {
			throw new IllegalArgumentException("No savepoint location given and no default location configured.");
		}

		// generate the savepoint directory

		final FileSystem fs = savepointBasePath.getFileSystem();
		final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';

		Exception latestException = null;
		for (int attempt = 0; attempt < 10; attempt++) {
			final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));

			try {
				if (fs.mkdirs(path)) {
					// we make the path qualified, to make it independent of default schemes and authorities
					final Path qp = path.makeQualified(fs);

					return createSavepointLocation(fs, qp);
				}
			} catch (Exception e) {
				latestException = e;
			}
		}

		throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
	}

	protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;

	//......
}
  • AbstractFsCheckpointStorage主要是實現了CheckpointStorage介面的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
  • resolveCheckpoint方法主要做兩件事情,一個是解析checkpoint/savepoint path,一個是解析checkpoint/savepoint的metadata path,獲取他們的FileStatus,然後建立FsCompletedCheckpointStorageLocation
  • initializeLocationForSavepoint方法主要是給savepoint建立一個CheckpointStorageLocation,它可以根據externalLocationPointer來建立,該值為null的話則使用defaultSavepointDirectory,該方法裡頭呼叫了createSavepointLocation抽象方法,由子類去實現

MemoryBackendCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java

/**
 * An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
 * Depending on whether this is created with a checkpoint location, the setup supports
 * durable checkpoints (durable metadata) or not.
 */
public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {

	/** The target directory for checkpoints (here metadata files only). Null, if not configured. */
	@Nullable
	private final Path checkpointsDirectory;

	/** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
	@Nullable
	private final FileSystem fileSystem;

	/** The maximum size of state stored in a state handle. */
	private final int maxStateSize;

	/**
	 * Creates a new MemoryBackendCheckpointStorage.
	 *
	 * @param jobId The ID of the job writing the checkpoints.
	 * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
	 *                                 in which case this storage does not support durable persistence.
	 * @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
	 * @param maxStateSize The maximum size of each individual piece of state.
	 *
	 * @throws IOException Thrown if a checkpoint base directory is given configured and the
	 *                     checkpoint directory cannot be created within that directory.
	 */
	public MemoryBackendCheckpointStorage(
			JobID jobId,
			@Nullable Path checkpointsBaseDirectory,
			@Nullable Path defaultSavepointLocation,
			int maxStateSize) throws IOException {

		super(jobId, defaultSavepointLocation);

		checkArgument(maxStateSize > 0);
		this.maxStateSize = maxStateSize;

		if (checkpointsBaseDirectory == null) {
			checkpointsDirectory = null;
			fileSystem = null;
		}
		else {
			this.fileSystem = checkpointsBaseDirectory.getFileSystem();
			this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);

			fileSystem.mkdirs(checkpointsDirectory);
		}
	}

	// ------------------------------------------------------------------------
	//  Properties
	// ------------------------------------------------------------------------

	/**
	 * Gets the size (in bytes) that a individual chunk of state may have at most.
	 */
	public int getMaxStateSize() {
		return maxStateSize;
	}

	// ------------------------------------------------------------------------
	//  Checkpoint Storage
	// ------------------------------------------------------------------------

	@Override
	public boolean supportsHighlyAvailableStorage() {
		return checkpointsDirectory != null;
	}

	@Override
	public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
		checkArgument(checkpointId >= 0);

		if (checkpointsDirectory != null) {
			// configured for durable metadata
			// prepare all the paths needed for the checkpoints
			checkState(fileSystem != null);

			final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);

			// create the checkpoint exclusive directory
			fileSystem.mkdirs(checkpointDir);

			return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize);
		}
		else {
			// no durable metadata - typical in IDE or test setup case
			return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize);
		}
	}

	@Override
	public CheckpointStreamFactory resolveCheckpointStorageLocation(
			long checkpointId,
			CheckpointStorageLocationReference reference) throws IOException {

		// no matter where the checkpoint goes, we always return the storage location that stores
		// state inline with the state handles.
		return new MemCheckpointStreamFactory(maxStateSize);
	}

	@Override
	public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
		return new MemoryCheckpointOutputStream(maxStateSize);
	}

	@Override
	protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
		return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
	}

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	@Override
	public String toString() {
		return "MemoryBackendCheckpointStorage {" +
				"checkpointsDirectory=" + checkpointsDirectory +
				", fileSystem=" + fileSystem +
				", maxStateSize=" + maxStateSize +
				'}';
	}
}
  • MemoryBackendCheckpointStorage繼承了AbstractFsCheckpointStorage,實現了它定義的createSavepointLocation方法,這裡返回的是PersistentMetadataCheckpointStorageLocation
  • MemoryBackendCheckpointStorage還實現了CheckpointStorage介面定義的AbstractFsCheckpointStorage未實現的幾個方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • supportsHighlyAvailableStorage是根據是否有配置checkpointsDirectory來判斷;initializeLocationForCheckpoint這個根據checkpointsDirectory是否有設定來建立,為null的話,建立的是NonPersistentMetadataCheckpointStorageLocation,不為null建立的是PersistentMetadataCheckpointStorageLocation;resolveCheckpointStorageLocation這裡建立的是MemCheckpointStreamFactory;而createTaskOwnedStateStream建立的是MemoryCheckpointOutputStream

小結

  • CheckpointStorage介面主要定義了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是實現了CheckpointStorage介面的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同時定義了一個抽象方法createSavepointLocation
  • MemoryBackendCheckpointStorage繼承了AbstractFsCheckpointStorage,實現了它定義的createSavepointLocation方法,同時還實現了CheckpointStorage介面定義的AbstractFsCheckpointStorage未實現的幾個方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • 這裡可以看到MemoryBackendCheckpointStorage雖然是memory的,但是如果有配置checkpointsDirectory(highly available storage),checkpoint location使用的是PersistentMetadataCheckpointStorageLocation,否則使用NonPersistentMetadataCheckpointStorageLocation;而savepoint location使用的是PersistentMetadataCheckpointStorageLocation(checkpiont可以選擇是否使用檔案儲存,而metadata只能使用檔案儲存)

doc