DataNode引用計數磁碟選擇策略
前言
在HDFS中,所有的資料都是存在各個DataNode上的.而這些DataNode上的資料都是存放於節點機器上的各個目錄中的,而一般每個目錄我們會對應到1個獨立的盤,以便我們把機器的儲存空間基本用上.這麼多的節點,這麼多塊盤,HDFS在進行寫操作時如何進行有效的磁碟選擇呢,選擇不當必然造成寫效能下降,從而影響叢集整體的效能.本文來討論一下目前HDFS中存在的幾個磁碟選擇策略的特點和不足,然後針對其不足,自定義1個新的磁碟選擇策略.
HDFS現有磁碟選擇策略
上文前言中提到,隨著節點數的擴增,磁碟數也會跟著線性變化,這麼的磁碟,會造成1個問題,資料不均衡現象,這個是最容易發生的.原因可能有下面2個:
1.HDFS寫操作不當導致.
2.新老機器上線使用時間不同,造成新機器資料少,老機器資料多的問題.
第二點這個通過Balancer操作可以解決.第一個問題才是最根本的,為了解決磁碟資料空間不均衡的現象,HDFS目前的2套磁碟選擇策略都是圍繞著"資料均衡"的目標設計的.下面介紹這2個磁碟選擇策略.
一.RoundRobinVolumeChoosingPolicy
上面這個比較長的類名稱可以拆成2個單詞,RoundRobin和VolumeChoosingPolicy,VolumeChoosingPolicy理解為磁碟選擇策略,RoundRobin這個是一個專業術語,叫做"輪詢",類似的還有一些別的類似的術語,Round-Robin Scheduling(輪詢排程),Round-Robin 演算法等.RoundRobin輪詢的意思用最簡單的方式翻譯就是一個一個的去遍歷,到尾巴了,再從頭開始.下面是一張解釋圖:
下面給出在HDFS中他的核心程式碼如下,我加了註釋上去,幫助大家理解:
/** * Choose volumes in round-robin order. */ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi> implements VolumeChoosingPolicy<V> { public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class); private int curVolume = 0; @Override public synchronized V chooseVolume(final List<V> volumes, long blockSize) throws IOException { //如果磁碟數目小於1個,則拋異常 if(volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } //如果由於失敗磁碟導致當前磁碟下標越界了,則將下標置為0 // since volumes could've been removed because of the failure // make sure we are not out of bounds if(curVolume >= volumes.size()) { curVolume = 0; } //賦值開始下標 int startVolume = curVolume; long maxAvailable = 0; while (true) { //獲取當前所下標所代表的磁碟 final V volume = volumes.get(curVolume); //下標遞增 curVolume = (curVolume + 1) % volumes.size(); //獲取當前選中磁碟的可用剩餘空間 long availableVolumeSize = volume.getAvailable(); //如果可用空間滿足所需要的副本塊大小,則直接返回這塊盤 if (availableVolumeSize > blockSize) { return volume; } //更新最大可用空間值 if (availableVolumeSize > maxAvailable) { maxAvailable = availableVolumeSize; } //如果當前指標又回到了起始下標位置,說明已經遍歷完整個磁碟列 //沒有找到符合可用空間要求的磁碟 if (curVolume == startVolume) { throw new DiskOutOfSpaceException("Out of space: " + "The volume with the most available space (=" + maxAvailable + " B) is less than the block size (=" + blockSize + " B)."); } } } }
理論上來說這種策略是蠻符合資料均衡的目標的,因為一個個的寫嗎,每塊盤寫入的次數都差不多,不存在哪塊盤多寫少寫的現象,但是唯一的不足之處在於每次寫入的資料量是無法控制的,可能我某次操作在A盤上寫入了512位元組的資料,在輪到B盤寫的時候我寫了128M的資料,資料就不均衡了,所以說輪詢策略在某種程度上來說是理論上均衡但還不是最好的.更好的是下面這種.
二.AvailableSpaceVolumeChoosingPolicy
剩餘可用空間磁碟選擇策略.這個磁碟選擇策略比第一種設計的就精妙很多了,首選他根據1個閾值,將所有的磁碟分為了2大類,高可用空間磁碟列表和低可用空間磁碟列表.然後通過1個隨機數概率,會比較高概率下選擇高剩餘磁碟列表中的塊,然後對這些磁碟列表進行輪詢策略的選擇,下面是相關程式碼:
/**
* A DN volume choosing policy which takes into account the amount of free
* space on each of the available volumes when considering where to assign a
* new replica allocation. By default this policy prefers assigning replicas to
* those volumes with more available free space, so as to over time balance the
* available space of all the volumes within a DN.
*/
public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V>, Configurable {
...
//用於一般的需要平衡磁碟的輪詢磁碟選擇策略
private final VolumeChoosingPolicy<V> roundRobinPolicyBalanced =
new RoundRobinVolumeChoosingPolicy<V>();
//用於可用空間高的磁碟的輪詢磁碟選擇策略
private final VolumeChoosingPolicy<V> roundRobinPolicyHighAvailable =
new RoundRobinVolumeChoosingPolicy<V>();
//用於可用空間低的剩餘磁碟的輪詢磁碟選擇策略
private final VolumeChoosingPolicy<V> roundRobinPolicyLowAvailable =
new RoundRobinVolumeChoosingPolicy<V>();
@Override
public synchronized V chooseVolume(List<V> volumes,
long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
//獲取所有磁碟包裝列表物件
AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes);
//如果所有的磁碟在資料平衡閾值之內,則在所有的磁碟塊中直接進行輪詢選擇
if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
// If they're actually not too far out of whack, fall back on pure round
// robin.
V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("All volumes are within the configured free space balance " +
"threshold. Selecting " + volume + " for write of block size " +
replicaSize);
}
return volume;
} else {
V volume = null;
// If none of the volumes with low free space have enough space for the
// replica, always try to choose a volume with a lot of free space.
//如果存在資料不均衡的現象,則從低剩餘空間磁碟塊中選出可用空間最大值
long mostAvailableAmongLowVolumes = volumesWithSpaces
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();
//得到高可用空間磁碟列表
List<V> highAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithHighAvailableSpace());
//得到低可用空間磁碟列表
List<V> lowAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithLowAvailableSpace());
float preferencePercentScaler =
(highAvailableVolumes.size() * balancedPreferencePercent) +
(lowAvailableVolumes.size() * (1 - balancedPreferencePercent));
//計算平衡比值,balancedPreferencePercent越大,highAvailableVolumes.size()所佔的值會變大
//整個比例值也會變大,就會有更高的隨機概率在這個值下
float scaledPreferencePercent =
(highAvailableVolumes.size() * balancedPreferencePercent) /
preferencePercentScaler;
//如果低可用空間磁碟列表中最大的可用空間無法滿足副本大小
//或隨機概率小於比例值,就在高可用空間磁碟中進行輪詢排程選擇
if (mostAvailableAmongLowVolumes < replicaSize ||
random.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
highAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size "
+ replicaSize);
}
} else {
//否則在低磁碟空間列表中選擇磁碟
volume = roundRobinPolicyLowAvailable.chooseVolume(
lowAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size "
+ replicaSize);
}
}
return volume;
}
}
低剩餘空間磁碟和高剩餘空間磁碟的標準是這樣定義的:
/**
* @return the list of volumes with relatively low available space.
*/
public List<AvailableSpaceVolumePair> getVolumesWithLowAvailableSpace() {
long leastAvailable = getLeastAvailableSpace();
List<AvailableSpaceVolumePair> ret = new ArrayList<AvailableSpaceVolumePair>();
for (AvailableSpaceVolumePair volume : volumes) {
//可用空間小於最小空間與平衡空間閾值的和的磁碟加入低磁碟空間列表
if (volume.getAvailable() <= leastAvailable + balancedSpaceThreshold) {
ret.add(volume);
}
}
return ret;
}
/**
* @return the list of volumes with a lot of available space.
*/
public List<AvailableSpaceVolumePair> getVolumesWithHighAvailableSpace() {
long leastAvailable = getLeastAvailableSpace();
List<AvailableSpaceVolumePair> ret = new ArrayList<AvailableSpaceVolumePair>();
for (AvailableSpaceVolumePair volume : volumes) {
//高剩餘空間磁碟選擇條件與上面相反
if (volume.getAvailable() > leastAvailable + balancedSpaceThreshold) {
ret.add(volume);
}
}
return ret;
}
現有HDFS磁碟選擇策略的不足
OK,我們已經瞭解了HDFS目前存在的2種磁碟選擇策略,我們看看HDFS在使用這些策略的是不是就是完美的呢,答案顯然不是,下面是我總結出的2點不足之處.
1.HDFS的預設磁碟選擇策略是RoundRobinVolumeChoosingPolicy,而不是更優的AvailableSpaceVolumeChoosingPolicy,我猜測的原因估計是AvailableSpaceVolumeChoosingPolicy是後來才有的,但是預設值的選擇沒有改,依然是老的策略.
2.磁碟選擇策略考慮的因素過於單一,磁碟可用空間只是其中1個因素,其實還有別的指標比如這個塊目前的IO情況,如果正在執行許多讀寫操作的時候,我們當然希望找沒有進行任何操作的磁碟進行資料寫入,否則只會更加影響當前磁碟的寫入速度,這個維度也是下面我自定義的新的磁碟選擇策略的1個根本需求點.
自定義磁碟選擇策略之ReferenceCountVolumeChoosingPolicy
新的磁碟選擇策略的根本依賴點在於ReferenceCount,引用計數,他能讓你瞭解有多少物件正在操作你,引用計數在很多地方都有用到,比如jvm中通過引用計數,判斷是否進行垃圾回收.在磁碟相關類FsVolume中也有類似的1個變數,剛好可以滿足我們的需求,如下:
/**
* The underlying volume used to store replica.
*
* It uses the {@link FsDatasetImpl} object for synchronization.
*/
@InterfaceAudience.Private
@VisibleForTesting
public class FsVolumeImpl implements FsVolumeSpi {
...
private CloseableReferenceCount reference = new CloseableReferenceCount();
然後我們需要將此變數值開放出去,便於我們呼叫. @Override
public int getReferenceCount() {
return this.reference.getReferenceCount();
}
然後模仿AvailableSpaceVolumeChoosingPolicy策略進行選擇,核心程式碼如下:@Override
public synchronized V chooseVolume(final List<V> volumes, long blockSize)
throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
V volume = null;
//獲取當前磁碟中被引用次數最少的1塊盤
int minReferenceCount = getMinReferenceCountOfVolumes(volumes);
//根據最少引用次數以及引用計數臨界值得到低引用計數磁碟列表
List<V> lowReferencesVolumes =
getLowReferencesCountVolume(volumes, minReferenceCount);
//根據最少引用次數以及引用計數臨界值得到高引用計數磁碟列表
List<V> highReferencesVolumes =
getHighReferencesCountVolume(volumes, minReferenceCount);
//判斷低引用磁碟列表中是否存在滿足要求塊大小的磁碟,如果有優選從低磁碟中進行輪詢磁碟的選擇
if (isExistVolumeHasFreeSpaceForBlock(lowReferencesVolumes, blockSize)) {
volume =
roundRobinPolicyLowReferences.chooseVolume(lowReferencesVolumes,
blockSize);
} else {
//如果低磁碟塊中沒有可用空間的塊,則再從高引用計數的磁碟列表中進行磁碟的選擇
volume =
roundRobinPolicyHighReferences.chooseVolume(highReferencesVolumes,
blockSize);
}
return volume;
}
附上相應的單元測試,測試已經通過@Test
public void testReferenceCountVolumeChoosingPolicy() throws Exception {
@SuppressWarnings("unchecked")
final ReferenceCountVolumeChoosingPolicy<FsVolumeSpi> policy =
ReflectionUtils.newInstance(ReferenceCountVolumeChoosingPolicy.class,
null);
initPolicy(policy);
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
// Add two low references count volumes.
// First volume, with 1 reference.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(0).getReferenceCount()).thenReturn(1);
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
// First volume, with 2 references.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getReferenceCount()).thenReturn(2);
Mockito.when(volumes.get(1).getAvailable()).thenReturn(100L);
// Add two high references count volumes.
// First volume, with 4 references.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(2).getReferenceCount()).thenReturn(4);
Mockito.when(volumes.get(2).getAvailable()).thenReturn(100L);
// First volume, with 5 references.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(3).getReferenceCount()).thenReturn(5);
Mockito.when(volumes.get(3).getAvailable()).thenReturn(100L);
// initPolicy(policy, 1.0f);
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 50));
volumes.clear();
// Test when the low-references volumes has not enough available space for
// block
// First volume, with 1 reference.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(0).getReferenceCount()).thenReturn(1);
Mockito.when(volumes.get(0).getAvailable()).thenReturn(50L);
// First volume, with 2 references.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getReferenceCount()).thenReturn(2);
Mockito.when(volumes.get(1).getAvailable()).thenReturn(50L);
// Add two high references count volumes.
// First volume, with 4 references.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(2).getReferenceCount()).thenReturn(4);
Mockito.when(volumes.get(2).getAvailable()).thenReturn(200L);
// First volume, with 5 references.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(3).getReferenceCount()).thenReturn(5);
Mockito.when(volumes.get(3).getAvailable()).thenReturn(200L);
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
}
我在程式碼註釋中已經進行了很詳細的分析了,這裡就不多說了.
總結
當然根據引用計數的磁碟選擇策略也不見得是最好的,因為這裡忽略了磁碟間資料不均衡的問題,顯然這個弊端會慢慢凸顯出來,所以說你很難做到1個策略是絕對完美的,可能最好的辦法是根據使用者使用場景使用最合適的磁碟選擇策略,或者定期更換策略以此達到最佳的效果.引用計數磁碟選擇策略的相關程式碼可以從我的github patch連結中查閱,學習.