聊聊flink的MemoryBackendCheckpointStorage
阿新 • • 發佈:2018-12-13
序
本文主要研究一下flink的MemoryBackendCheckpointStorage
CheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java
/** * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, * created by this class. */ public interface CheckpointStorage { boolean supportsHighlyAvailableStorage(); boolean hasDefaultSavepointLocation(); CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException; CheckpointStorageLocation initializeLocationForSavepoint( long checkpointId, @Nullable String externalLocationPointer) throws IOException; CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException; CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; }
- CheckpointStorage介面主要定義了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回該backend是否支援highly available storage;hasDefaultSavepointLocation方法是否有預設的savepoint location;resolveCheckpoint方法用於解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根據checkpointId來初始化storage location;initializeLocationForSavepoint方法用於根據checkpointId來初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用於開啟一個stream來持久化checkpoint state
AbstractFsCheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
/** * An implementation of durable checkpoint storage to file systems. */ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage { // ------------------------------------------------------------------------ // Constants // ------------------------------------------------------------------------ /** The prefix of the directory containing the data exclusive to a checkpoint. */ public static final String CHECKPOINT_DIR_PREFIX = "chk-"; /** The name of the directory for shared checkpoint state. */ public static final String CHECKPOINT_SHARED_STATE_DIR = "shared"; /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */ public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned"; /** The name of the metadata files in checkpoints / savepoints. */ public static final String METADATA_FILE_NAME = "_metadata"; /** The magic number that is put in front of any reference. */ private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 }; // ------------------------------------------------------------------------ // Fields and properties // ------------------------------------------------------------------------ /** The jobId, written into the generated savepoint directories. */ private final JobID jobId; /** The default location for savepoints. Null, if none is configured. */ @Nullable private final Path defaultSavepointDirectory; @Override public boolean hasDefaultSavepointLocation() { return defaultSavepointDirectory != null; } @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException { return resolveCheckpointPointer(checkpointPointer); } /** * Creates a file system based storage location for a savepoint. * * <p>This methods implements the logic that decides which location to use (given optional * parameters for a configured location and a location passed for this specific savepoint) * and how to name and initialize the savepoint directory. * * @param externalLocationPointer The target location pointer for the savepoint. * Must be a valid URI. Null, if not supplied. * @param checkpointId The checkpoint ID of the savepoint. * * @return The checkpoint storage location for the savepoint. * * @throws IOException Thrown if the target directory could not be created. */ @Override public CheckpointStorageLocation initializeLocationForSavepoint( @SuppressWarnings("unused") long checkpointId, @Nullable String externalLocationPointer) throws IOException { // determine where to write the savepoint to final Path savepointBasePath; if (externalLocationPointer != null) { savepointBasePath = new Path(externalLocationPointer); } else if (defaultSavepointDirectory != null) { savepointBasePath = defaultSavepointDirectory; } else { throw new IllegalArgumentException("No savepoint location given and no default location configured."); } // generate the savepoint directory final FileSystem fs = savepointBasePath.getFileSystem(); final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-'; Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix)); try { if (fs.mkdirs(path)) { // we make the path qualified, to make it independent of default schemes and authorities final Path qp = path.makeQualified(fs); return createSavepointLocation(fs, qp); } } catch (Exception e) { latestException = e; } } throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException); } protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException; //...... }
- AbstractFsCheckpointStorage主要是實現了CheckpointStorage介面的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
- resolveCheckpoint方法主要做兩件事情,一個是解析checkpoint/savepoint path,一個是解析checkpoint/savepoint的metadata path,獲取他們的FileStatus,然後建立FsCompletedCheckpointStorageLocation
- initializeLocationForSavepoint方法主要是給savepoint建立一個CheckpointStorageLocation,它可以根據externalLocationPointer來建立,該值為null的話則使用defaultSavepointDirectory,該方法裡頭呼叫了createSavepointLocation抽象方法,由子類去實現
MemoryBackendCheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
/**
* An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
* Depending on whether this is created with a checkpoint location, the setup supports
* durable checkpoints (durable metadata) or not.
*/
public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {
/** The target directory for checkpoints (here metadata files only). Null, if not configured. */
@Nullable
private final Path checkpointsDirectory;
/** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
@Nullable
private final FileSystem fileSystem;
/** The maximum size of state stored in a state handle. */
private final int maxStateSize;
/**
* Creates a new MemoryBackendCheckpointStorage.
*
* @param jobId The ID of the job writing the checkpoints.
* @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
* in which case this storage does not support durable persistence.
* @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
* @param maxStateSize The maximum size of each individual piece of state.
*
* @throws IOException Thrown if a checkpoint base directory is given configured and the
* checkpoint directory cannot be created within that directory.
*/
public MemoryBackendCheckpointStorage(
JobID jobId,
@Nullable Path checkpointsBaseDirectory,
@Nullable Path defaultSavepointLocation,
int maxStateSize) throws IOException {
super(jobId, defaultSavepointLocation);
checkArgument(maxStateSize > 0);
this.maxStateSize = maxStateSize;
if (checkpointsBaseDirectory == null) {
checkpointsDirectory = null;
fileSystem = null;
}
else {
this.fileSystem = checkpointsBaseDirectory.getFileSystem();
this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);
fileSystem.mkdirs(checkpointsDirectory);
}
}
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
/**
* Gets the size (in bytes) that a individual chunk of state may have at most.
*/
public int getMaxStateSize() {
return maxStateSize;
}
// ------------------------------------------------------------------------
// Checkpoint Storage
// ------------------------------------------------------------------------
@Override
public boolean supportsHighlyAvailableStorage() {
return checkpointsDirectory != null;
}
@Override
public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
checkArgument(checkpointId >= 0);
if (checkpointsDirectory != null) {
// configured for durable metadata
// prepare all the paths needed for the checkpoints
checkState(fileSystem != null);
final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
// create the checkpoint exclusive directory
fileSystem.mkdirs(checkpointDir);
return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize);
}
else {
// no durable metadata - typical in IDE or test setup case
return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize);
}
}
@Override
public CheckpointStreamFactory resolveCheckpointStorageLocation(
long checkpointId,
CheckpointStorageLocationReference reference) throws IOException {
// no matter where the checkpoint goes, we always return the storage location that stores
// state inline with the state handles.
return new MemCheckpointStreamFactory(maxStateSize);
}
@Override
public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
return new MemoryCheckpointOutputStream(maxStateSize);
}
@Override
protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@Override
public String toString() {
return "MemoryBackendCheckpointStorage {" +
"checkpointsDirectory=" + checkpointsDirectory +
", fileSystem=" + fileSystem +
", maxStateSize=" + maxStateSize +
'}';
}
}
- MemoryBackendCheckpointStorage繼承了AbstractFsCheckpointStorage,實現了它定義的createSavepointLocation方法,這裡返回的是PersistentMetadataCheckpointStorageLocation
- MemoryBackendCheckpointStorage還實現了CheckpointStorage介面定義的AbstractFsCheckpointStorage未實現的幾個方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
- supportsHighlyAvailableStorage是根據是否有配置checkpointsDirectory來判斷;initializeLocationForCheckpoint這個根據checkpointsDirectory是否有設定來建立,為null的話,建立的是NonPersistentMetadataCheckpointStorageLocation,不為null建立的是PersistentMetadataCheckpointStorageLocation;resolveCheckpointStorageLocation這裡建立的是MemCheckpointStreamFactory;而createTaskOwnedStateStream建立的是MemoryCheckpointOutputStream
小結
- CheckpointStorage介面主要定義了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是實現了CheckpointStorage介面的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同時定義了一個抽象方法createSavepointLocation
- MemoryBackendCheckpointStorage繼承了AbstractFsCheckpointStorage,實現了它定義的createSavepointLocation方法,同時還實現了CheckpointStorage介面定義的AbstractFsCheckpointStorage未實現的幾個方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
- 這裡可以看到MemoryBackendCheckpointStorage雖然是memory的,但是如果有配置checkpointsDirectory(
highly available storage
),checkpoint location使用的是PersistentMetadataCheckpointStorageLocation,否則使用NonPersistentMetadataCheckpointStorageLocation;而savepoint location使用的是PersistentMetadataCheckpointStorageLocation(checkpiont可以選擇是否使用檔案儲存,而metadata只能使用檔案儲存
)