1. 程式人生 > >hadoop單個數據節點的不同儲存路徑的儲存策略原始碼分析

hadoop單個數據節點的不同儲存路徑的儲存策略原始碼分析

  • 產生問題於資料叢集的數節點儲存磁碟大小不同,造成使用一段時間以後容量小的磁碟空間緊張。

    其實,早期配置了磁碟使用儲存策略,就能解決該問題,部分網來上說這個策略無效,再hadoop2.0.1 本版有效,該版本應用於CHD4.6中。

    為了找到準確的程式定位點,參考了以下的Hadoop設計文件。

    參考

    Hadoop中HDFS檔案系統的Append/Hflush/Read設計文件:

    http://blog.csdn.net/chenpingbupt/article/details/7972589

    文件中給出:

    在一個DN的disk中,每個DN具有三個目錄:current em bw,current包含finallized的replica,tmp包含temporary replica,rbw包含rbw,rwr,rur replicas。當一個replica第一次被dfs client發起請求而建立的時候,將會放到rbw中。當第一次建立是在block replication和clust balance過程中發起的話,replica就會放置到tmp中。一旦一個replica被finallized,他就會被move到current中。當一個DN重啟之後,tmp中的replica將會被刪除,rbw中的將會被載入為rwr狀態,current中的會load為finallized狀態

    我們就從tmp 或 rbw 檔案建立開始。

    1.參見java class BlockPoolSlice

    01./** 02.* A block pool slice represents a portion of a block pool stored on a volume.   03.* Taken together, all BlockPoolSlices sharing a block pool ID across a  04.* cluster represent a single block pool. 05. 06.* This class is synchronized by {@link FsVolumeImpl}.
    07.*/ 08.class BlockPoolSlice { 09.private final String bpid; 10.private final FsVolumeImpl volume; // volume to which this BlockPool belongs to 11.private final File currentDir; // StorageDirectory/current/bpid/current 12.private final LDir finalizedDir; // directory store Finalized replica 13.private final
    File rbwDir; // directory store RBW replica
    14.private final File tmpDir; // directory store Temporary replica
    從類的描述中看出BlockPoolSlice 是建立叢集資料block的基礎。 01./** 02.* Temporary files. They get moved to the finalized block directory when 03.* the block is finalized. 04.*/ 05.File createTmpFile(Block b) throws IOException { 06.File f = new File(tmpDir, b.getBlockName()); 07.return DatanodeUtil.createTmpFile(b, f); 08.} 09. 10./** 11.* RBW files. They get moved to the finalized block directory when 12.* the block is finalized. 13.*/ 14.File createRbwFile(Block b) throws IOException { 15.File f = new File(rbwDir, b.getBlockName()); 16.return DatanodeUtil.createTmpFile(b, f); 17.} 這是建立基礎檔案的方法。


    2.該方法的實現

    01./** Provide utility methods for Datanode. */ 02.@InterfaceAudience.Private 03.public class DatanodeUtil { 04.public static final String UNLINK_BLOCK_SUFFIX = ".unlinked"; 05. 06.public static final String DISK_ERROR = "Possible disk error: "; 07. 08./** Get the cause of an I/O exception if caused by a possible disk error 09.* @param ioe an I/O exception 10.* @return cause if the I/O exception is caused by a possible disk error; 11.*         null otherwise. 12.*/  13.static IOException getCauseIfDiskError(IOException ioe) { 14.if (ioe.getMessage()!=null && ioe.getMessage().startsWith(DISK_ERROR)) { 15.return (IOException)ioe.getCause(); 16.} else { 17.return null; 18.} 19.} 20. 21./** 22.* Create a new file. 23.* @throws IOException  24.* if the file already exists or if the file cannot be created. 25.*/ 26.public static File createTmpFile(Block b, File f) throws IOException { 27.if (f.exists()) { 28.throw new IOException("Failed to create temporary file for " + b 29.+ ".  File " + f + " should not be present, but is."); 30.} 31.// Create the zero-length temp file 32.final boolean fileCreated; 33.try { 34.fileCreated = f.createNewFile(); 35.} catch (IOException ioe) { 36.throw new IOException(DISK_ERROR + "Failed to create " + f, ioe); 37.} 38.if (!fileCreated) { 39.throw new IOException("Failed to create temporary file for " + b 40.+ ".  File " + f + " should be creatable, but is already present."); 41.} 42.return f; 43.}

    在呼叫該方法建立資料block時,並沒有我們關心的儲存路徑的選擇策略。

    3.我們再來查詢createRbwFile調用出處

    1./************************************************** 2.* FSDataset manages a set of data blocks.  Each block 3.* has a unique name and an extent on disk. 4.* 5.***************************************************/ 6.@InterfaceAudience.Private 7.class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { 8.static final Log LOG = LogFactory.getLog(FsDatasetImpl.class); block管理操作類 01.@Override // FsDatasetSpi 02.public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) 03.throws IOException { 04.ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),  05.b.getBlockId()); 06.if (replicaInfo != null) { 07.throw new ReplicaAlreadyExistsException("Block " + b + 08." already exists in state " + replicaInfo.getState() + 09." and thus cannot be created."); 10.} 11.// create a new block 12.FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); 13.// create a rbw file to hold block in the designated volume 14.File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); 15.ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),  16.b.getGenerationStamp(), v, f.getParentFile()); 17.volumeMap.add(b.getBlockPoolId(), newReplicaInfo); 18.return newReplicaInfo; 19.} 呼叫了createRbwFile 方法,該方法同樣建立rbw檔案。

    這裡發現了我們關係的volumes,它是配置的儲存路徑。

    4.檢視volumes 的初始

    volumnes是在建構函式中初始化的,使用了volArray

    01./** 02.* An FSDataset has a directory where it loads its data files. 03.*/ 04.FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf 05.) throws IOException { 06.this.datanode = datanode; 07.// The number of volumes required for operation is the total number  08.// of volumes minus the number of failed volumes we can tolerate. 09.final int volFailuresTolerated = 10.conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 11.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); 12. 13.String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); 14. 15.int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; 16.int volsFailed = volsConfigured - storage.getNumStorageDirs(); 17.this.validVolsRequired = volsConfigured - volFailuresTolerated; 18. 19.if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { 20.throw new DiskErrorException("Invalid volume failure " 21.+ " config value: " + volFailuresTolerated); 22.} 23.if (volsFailed > volFailuresTolerated) { 24.throw new DiskErrorException("Too many failed volumes - " 25.+ "current valid volumes: " + storage.getNumStorageDirs()  26.+ ", volumes configured: " + volsConfigured  27.+ ", volumes failed: " + volsFailed 28.+ ", volume failures tolerated: " + volFailuresTolerated); 29.} 30. 31.final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>( 32.storage.getNumStorageDirs()); 33.for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { 34.final File dir = storage.getStorageDir(idx).getCurrentDir(); 35.volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf)); 36.LOG.info("Added volume - " + dir); 37.} 38.volumeMap = new ReplicaMap(this); 39. 40.@SuppressWarnings("unchecked") 41.final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = 42.ReflectionUtils.newInstance(conf.getClass( 43.DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, 44.RoundRobinVolumeChoosingPolicy.class, 45.VolumeChoosingPolicy.class), conf); 46.volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl); 47.volumes.getVolumeMap(volumeMap); 48. 49.File[] roots = new File[storage.getNumStorageDirs()]; 50.for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { 51.roots[idx] = storage.getStorageDir(idx).getCurrentDir(); 52.} 53.asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots); 54.registerMBean(storage.getStorageID()); 55.} 而volArray 如下生成的: 1.final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>( 2.storage.getNumStorageDirs()); 3.for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { 4.final File dir = storage.getStorageDir(idx).getCurrentDir(); 5.volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf)); 6.LOG.info("Added volume - " + dir); 7.} 正式配置檔案中的儲存路徑。

    到此,我們找到了需要的儲存路徑,下面再找到如何選擇的路徑的就容易多了。

    5.路徑選擇從getNextVolume開始

    01.class FsVolumeList { 02./** 03.* Read access to this unmodifiable list is not synchronized. 04.* This list is replaced on modification holding "this" lock. 05.*/ 06.volatile List<FsVolumeImpl> volumes = null; 07. 08.private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser; 09.private volatile int numFailedVolumes; 10. 11.FsVolumeList(List<FsVolumeImpl> volumes, int failedVols, 12.VolumeChoosingPolicy<FsVolumeImpl> blockChooser) { 13.this.volumes = Collections.unmodifiableList(volumes); 14.this.blockChooser = blockChooser; 15.this.numFailedVolumes = failedVols; 16.} 17. 18.int numberOfFailedVolumes() { 19.return numFailedVolumes; 20.} 21. 22./**  23.* Get next volume. Synchronized to ensure {@link #curVolume} is updated 24.* by a single thread and next volume is chosen with no concurrent 25.* update to {@link #volumes}. 26.* @param blockSize free space needed on the volume 27.* @return next volume to store the block in. 28.*/ 29.synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException { 30.return blockChooser.chooseVolume(volumes, blockSize); 31.}

    6.繼續chooseVolume 源自於 blockChooser 型別是 VolumeChoosingPolicy ,該方法實現在下面的類中:

    01./** 02.* A DN volume choosing policy which takes into account the amount of free 03.* space on each of the available volumes when considering where to assign a 04.* new replica allocation. By default this policy prefers assigning replicas to 05.* those volumes with more available free space, so as to over time balance the 06.* available space of all the volumes within a DN. 07.*/ 08.public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi> 09.implements VolumeChoosingPolicy<V>, Configurable { 10. 11.private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class); 12. 13.private static final Random RAND = new Random(); 14. 15.private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT; 16.private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT; 從描述中可以看出了,這就是策略檔案。

    7.策略實現就是這樣的:

    01.@Override 02.public synchronized V chooseVolume(List<V> volumes, 03.final long replicaSize) throws IOException { 04.if (volumes.size() < 1) { 05.throw new DiskOutOfSpaceException("No more available volumes"); 06.} 07. 08.AvailableSpaceVolumeList volumesWithSpaces = 09.new AvailableSpaceVolumeList(volumes); 10. 11.if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) { 12.// If they're actually not too far out of whack, fall back on pure round 13.// robin. 14.V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize); 15.if (LOG.isDebugEnabled()) { 16.LOG.debug("All volumes are within the configured free space balance " + 17."threshold. Selecting " + volume + " for write of block size " + 18.replicaSize); 19.} 20.return volume; 21.} else { 22.V volume = null; 23.// If none of the volumes with low free space have enough space for the 24.// replica, always try to choose a volume with a lot of free space. 25.long mostAvailableAmongLowVolumes = volumesWithSpaces 26..getMostAvailableSpaceAmongVolumesWithLowAvailableSpace(); 27. 28.List<V> highAvailableVolumes = extractVolumesFromPairs( 29.volumesWithSpaces.getVolumesWithHighAvailableSpace()); 30.List<V> lowAvailableVolumes = extractVolumesFromPairs( 31.volumesWithSpaces.getVolumesWithLowAvailableSpace()); 32. 33.float preferencePercentScaler = 34.(highAvailableVolumes.size() * balancedPreferencePercent) + 35.(lowAvailableVolumes.size() * (1 - balancedPreferencePercent)); 36.float scaledPreferencePercent = 37.(highAvailableVolumes.size() * balancedPreferencePercent) / 38.preferencePercentScaler; 39.if (mostAvailableAmongLowVolumes < replicaSize || 40.RAND.nextFloat() < scaledPreferencePercent) { 41.volume = roundRobinPolicyHighAvailable.chooseVolume( 42.highAvailableVolumes, 43.replicaSize); 44.if (LOG.isDebugEnabled()) { 45.LOG.debug("Volumes are imbalanced. Selecting " + volume + 46." from high available space volumes for write of block size " 47.+ replicaSize); 48.} 49.} else { 50.volume = roundRobinPolicyLowAvailable.chooseVolume( 51.lowAvailableVolumes, 52.replicaSize); 53.if (LOG.isDebugEnabled()) { 54.LOG.debug("Volumes are imbalanced. Selecting " + volume + 55." from low available space volumes for write of block size " 56.+ replicaSize); 57.} 58.} 59.return volume; 60.} 61.}
    關於配置中各個儲存路徑如何選擇及選擇策略都在這裡了,sigh 累死了~~

    花費了接近3天的時間,純程式碼看著實累,可以步進就好了。

    相關的配置說明。

    dfs.datanode.fsdataset.volume.choosing.policy

    dfs.datanode.available-space-volume-choosing-policy.balanced-space-thr