Hadoop volume選擇策略
原始碼version:2.6.1
什麼是volume:卷 , 用途:在hadoop中用於dataNode副本儲存,所有dataNode的儲存都離不開volume的策略選擇,策略的選擇可以通過引數dfs.datanode.fsdataset.volume.choosing.policy 引數來設定,引數預設值:org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy
策略選擇:目前有round-robin ,available space
上述兩種策略均繼承: org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy
方法chooseVolume便是選擇的條件,即策略的定義
package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; /** * This interface specifies the policy for choosing volumes to store replicas. */ @InterfaceAudience.Privatepublic interface VolumeChoosingPolicy<V extends FsVolumeSpi> { /** * Choose a volume to place a replica, * given a list of volumes and the replica size sought for storage. * * The implementations of this interface must be thread-safe. * * @param volumes - a list of available volumes.* @param replicaSize - the size of the replica for which a volume is sought. * @return the chosen volume. * @throws IOException when disks are unavailable or are full. */ public V chooseVolume(List<V> volumes, long replicaSize) throws IOException; }
策略1:round-robin,原始碼如下
package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.IOException; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; /** * Choose volumes in round-robin order. */ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi> implements VolumeChoosingPolicy<V> { public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class); private int curVolume = 0; @Override public synchronized V chooseVolume(final List<V> volumes, long blockSize) throws IOException { if(volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } // since volumes could've been removed because of the failure // make sure we are not out of bounds if(curVolume >= volumes.size()) { curVolume = 0; } int startVolume = curVolume; long maxAvailable = 0; while (true) { final V volume = volumes.get(curVolume); curVolume = (curVolume + 1) % volumes.size(); long availableVolumeSize = volume.getAvailable(); if (availableVolumeSize > blockSize) { return volume; } if (availableVolumeSize > maxAvailable) { maxAvailable = availableVolumeSize; } if (curVolume == startVolume) { throw new DiskOutOfSpaceException("Out of space: " + "The volume with the most available space (=" + maxAvailable + " B) is less than the block size (=" + blockSize + " B)."); } } } }
其中volumes就是我們通過hdfs-site.xml 裡面配置的dfs.datanode.data.dir的目錄,blockSize就是副本的大小(副本的大小與配置的副本的副本數有關,後話)
以上可以看出採用的是輪詢的方式進行的,將所有的目錄磁碟進行輪訓,期間記錄最大的卷大小,如果存在卷的大小大於blockSize的大小則直接放回該卷,否則輪詢結束直接丟擲空間不足的異常(異常資訊有最大卷的大小,和副本的大小,易於排查問題)
策略2:available space
@Override public synchronized V chooseVolume(List<V> volumes, long replicaSize) throws IOException { if (volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } AvailableSpaceVolumeList volumesWithSpaces = new AvailableSpaceVolumeList(volumes); if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) { // If they're actually not too far out of whack, fall back on pure round // robin. V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize); if (LOG.isDebugEnabled()) { LOG.debug("All volumes are within the configured free space balance " + "threshold. Selecting " + volume + " for write of block size " + replicaSize); } return volume; } else { V volume = null; // If none of the volumes with low free space have enough space for the // replica, always try to choose a volume with a lot of free space. long mostAvailableAmongLowVolumes = volumesWithSpaces .getMostAvailableSpaceAmongVolumesWithLowAvailableSpace(); List<V> highAvailableVolumes = extractVolumesFromPairs( volumesWithSpaces.getVolumesWithHighAvailableSpace()); List<V> lowAvailableVolumes = extractVolumesFromPairs( volumesWithSpaces.getVolumesWithLowAvailableSpace()); float preferencePercentScaler = (highAvailableVolumes.size() * balancedPreferencePercent) + (lowAvailableVolumes.size() * (1 - balancedPreferencePercent)); float scaledPreferencePercent = (highAvailableVolumes.size() * balancedPreferencePercent) / preferencePercentScaler; if (mostAvailableAmongLowVolumes < replicaSize || random.nextFloat() < scaledPreferencePercent) { volume = roundRobinPolicyHighAvailable.chooseVolume( highAvailableVolumes, replicaSize); if (LOG.isDebugEnabled()) { LOG.debug("Volumes are imbalanced. Selecting " + volume + " from high available space volumes for write of block size " + replicaSize); } } else { volume = roundRobinPolicyLowAvailable.chooseVolume( lowAvailableVolumes, replicaSize); if (LOG.isDebugEnabled()) { LOG.debug("Volumes are imbalanced. Selecting " + volume + " from low available space volumes for write of block size " + replicaSize); } } return volume; } }
volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()會計算所有volume中的最大可用空間和最小可用空間的差,如果該差小於balancedSpaceThreshold (理解為小於balancedSpaceThreshold,則視為各個volume的情況一致,直接輪詢返回即可),則直接使用 round-robin 策略進行選擇volume(balancedSpaceThreshold:該值來源於配置引數
dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold
預設值是10G)
否則分兩步走
1,選擇高可用volume -- highAvailableVolumes (空間大於balancedSpaceThreshold+所有volume中的最小volume的值)
(1)如果副本大小 大於lowAvailableVolumes 最大可用空間,直接在highAvailableVolumes中採用 round-robin輪詢
(2)75%(balancedPreferencePercent)的概率來使用在highAvailableVolumes(這裡的概率理解為權重)
(這個可配置,預設0.75
dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction
) 2,選擇低可用volume --lowAvailableVolumes (空間小於等於balancedSpaceThreshold+所有volume中的最小volume的值)
不滿足上訴(1)的情況下 1-balancedPreferencePercent 的概率實現在lowAvailableVolumes採用round-robin輪詢