hdfs元資料檔案(fsimage、edits)清理
Standby namenode (sbn)的EditLogTailer的功能之一就是觸發Active Namenode(nn) rollEditLog。每隔${dfs.ha.tail-edits.period}秒(預設60)秒,EditLogTailer檢測一次當前時間距離上一次roll的時間是否超過${dfs.ha.log-roll.period}秒(預設120),如果超過就通過rpc請求觸發nn 進行rollEditLog。nn通過NameNodeRpcServer接收rpc請求,之後由FSNamesystem處理:
- finalize 當前正在接收寫入的edit檔案 : 將edits_inprogress_${begTxid} finaliz 到edits_${begTxid}_${endTxid}
- 觸發所有的JournalNode(jn)進行rollEditLog
- 建立新的edits檔案(edits_inprogress_${endTxid+1})用於記錄新的事務
rollEditLog週期性將事務日誌記錄到一個個獨立的小檔案,久而久之,必然會在nn和所有jn上產生大量的檔案,一旦checkpoint,這些檔案大都沒什麼用,因此需要有一個清理策略。
sbn在完成checkpoint【參考checkpoint過程】 後會觸發sbn、nn和jn對保留在磁碟上的歷史版本的元資料檔案:fsimage檔案和edits檔案進行清理。清理過程就是根據配置的事物保留策略和映象保留策略將超出保留範圍的fsimage檔案和edits檔案直接刪除。
確定清理範圍並清理
操作的入口為Fsimage的purgeOldStorage(NameNodeFile nnf)方法,實際管理fsimage檔案和edits檔案的是NNStorageRetentionManager,由它通過配置計算出哪些檔案保留、哪些檔案刪除。
public static final String DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY = "dfs.namenode.num.checkpoints.retained"; public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2; public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained"; public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained"; public static final int DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
public NNStorageRetentionManager(
Configuration conf,
NNStorage storage,
LogsPurgeable purgeableLogs,
StoragePurger purger) {
this.numCheckpointsToRetain = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT);
this.numExtraEditsToRetain = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
this.maxExtraEditsSegmentsToRetain = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
Preconditions.checkArgument(numCheckpointsToRetain > 0,
"Must retain at least one checkpoint");
Preconditions.checkArgument(numExtraEditsToRetain >= 0,
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY +
" must not be negative");
this.storage = storage;
this.purgeableLogs = purgeableLogs;
this.purger = purger;
}
第一步:確定fsimage檔案的清理範圍。NNStorageRetentionManager通過getImageTxIdToRetain( )方法找出所有的fsimage檔案,並按照TxId升序存入集合,根據配置的fsimage檔案保留數量(numCheckpointsToRetain)和集合size確定起始保留範圍minTxId,TxId大於等於minTxId的fsimage檔案保留,Txid小於minTxid的fsimage檔案將被刪除。
private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) {
//找出所有的fsimage檔案,根據檔名解析出txid,然後構建出FSImageFile物件
List<FSImageFile> images = inspector.getFoundImages();
//使用TreeSet 保證imageTxIds 內儲存的txid 按升序排序
TreeSet<Long> imageTxIds = Sets.newTreeSet();
for (FSImageFile image : images) {
imageTxIds.add(image.getCheckpointTxId());
}
List<Long> imageTxIdsList = Lists.newArrayList(imageTxIds);
if (imageTxIdsList.isEmpty()) {
return 0;
}
//imageTxIdsList儲存的txid 降序排序,保證所有的fsimage按txid由大到小(生成時間由近及遠)的順序排序,方便確定刪除的位置
Collections.reverse(imageTxIdsList);
int toRetain = Math.min(numCheckpointsToRetain, imageTxIdsList.size());
long minTxId = imageTxIdsList.get(toRetain - 1);
//txid小於minTxId的fsimage檔案將被刪除,其他的保留
LOG.info("Going to retain " + toRetain + " images with txid >= " +
minTxId);
return minTxId;
}
第二步:清理fsimage檔案。NNStorageRetentionManager呼叫purgeCheckpointsOlderThan( )方法進行fsimage檔案清理。遍歷儲存目錄下的每一個fsimage檔案,只要其Txid小於minTxid,就直接刪除。刪除過程由DeletionStoragePurger完成,先刪除fsimsge檔案,然後刪除對應儲存md5值的檔案。
第三步:確定edits檔案的清理範圍。根據第一步算出的minTxid(minImageTxId)、配置的事物保留數量numExtraEditsToRetain(預設1000000)和配置的最大edits檔案保留數量maxExtraEditsSegmentsToRetain(預設10000)確定清理範圍purgeLogsFrom
// If fsimage_N is the image we want to keep, then we need to keep
// all txns > N. We can remove anything < N+1, since fsimage_N
// reflects the state up to and including N. However, we also
// provide a "cushion" of older txns that we keep, which is
// handy for HA, where a remote node may not have as many
// new images.
//
// First, determine the target number of extra transactions to retain based
// on the configured amount.
long minimumRequiredTxId = minImageTxId + 1;
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return ComparisonChain.start()
.compare(a.getFirstTxId(), b.getFirstTxId())
.compare(a.getLastTxId(), b.getLastTxId())
.result();
}
});
// Remove from consideration any edit logs that are in fact required.
while (editLogs.size() > 0 &&
editLogs.get(editLogs.size() - 1).getFirstTxId() >= minimumRequiredTxId) {
editLogs.remove(editLogs.size() - 1);
}
// Next, adjust the number of transactions to retain if doing so would mean
// keeping too many segments around.
while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
purgeLogsFrom = editLogs.get(0).getLastTxId() + 1;
editLogs.remove(0);
}
// Finally, ensure that we're not trying to purge any transactions that we
// actually need.
if (purgeLogsFrom > minimumRequiredTxId) {
throw new AssertionError("Should not purge more edits than required to "
+ "restore: " + purgeLogsFrom + " should be <= "
+ minimumRequiredTxId);
}
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
第四步:清理edits檔案。
清理的時機
sbn:sbn每完成一次checkpoint就會清理一次。
nn:當sbn在完成checkpoint後會將新的fsimaeg檔案上傳到nn,nn通過ImageServlet接收sbn上傳的fsimage檔案,之後便會對fsimage呼叫purgeOldStorage(NameNodeFile nnf)方法進行清理。
jn:nn在清理過程中會向所有的jn傳送包含清理位置minTxIdToKeep的rpc請求,jn在收到請求後將txid小於minTxIdToKeep的edits檔案全部清理。