hadoop單個數據節點的不同儲存路徑的儲存策略原始碼分析
產生問題於資料叢集的數節點儲存磁碟大小不同,造成使用一段時間以後容量小的磁碟空間緊張。
其實,早期配置了磁碟使用儲存策略,就能解決該問題,部分網來上說這個策略無效,再hadoop2.0.1 本版有效,該版本應用於CHD4.6中。
為了找到準確的程式定位點,參考了以下的Hadoop設計文件。
參考
Hadoop中HDFS檔案系統的Append/Hflush/Read設計文件:
http://blog.csdn.net/chenpingbupt/article/details/7972589
文件中給出:
在一個DN的disk中,每個DN具有三個目錄:current em bw,current包含finallized的replica,tmp包含temporary replica,rbw包含rbw,rwr,rur replicas。當一個replica第一次被dfs client發起請求而建立的時候,將會放到rbw中。當第一次建立是在block replication和clust balance過程中發起的話,replica就會放置到tmp中。一旦一個replica被finallized,他就會被move到current中。當一個DN重啟之後,tmp中的replica將會被刪除,rbw中的將會被載入為rwr狀態,current中的會load為finallized狀態
我們就從tmp 或 rbw 檔案建立開始。
1.參見java class BlockPoolSlice
01.
/**
02.
* A block pool slice represents a portion of a block pool stored on a volume.
03.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
04.
* cluster represent a single block pool.
05.
*
06.
* This class is synchronized by {@link FsVolumeImpl}.
07.
*/
08.
class
BlockPoolSlice {
09.
private
final
String bpid;
10.
private
final
FsVolumeImpl volume;
// volume to which this BlockPool belongs to
11.
private
final
File currentDir;
// StorageDirectory/current/bpid/current
12.
private
final
LDir finalizedDir;
// directory store Finalized replica
13.
private
final
File rbwDir;
// directory store RBW replica
14.
private
final
File tmpDir;
// directory store Temporary replica
從類的描述中看出BlockPoolSlice 是建立叢集資料block的基礎。
01.
/**
02.
* Temporary files. They get moved to the finalized block directory when
03.
* the block is finalized.
04.
*/
05.
File createTmpFile(Block b)
throws
IOException {
06.
File f =
new
File(tmpDir, b.getBlockName());
07.
return
DatanodeUtil.createTmpFile(b, f);
08.
}
09.
10.
/**
11.
* RBW files. They get moved to the finalized block directory when
12.
* the block is finalized.
13.
*/
14.
File createRbwFile(Block b)
throws
IOException {
15.
File f =
new
File(rbwDir, b.getBlockName());
16.
return
DatanodeUtil.createTmpFile(b, f);
17.
}
這是建立基礎檔案的方法。
2.該方法的實現
01.
/** Provide utility methods for Datanode. */
02.
@InterfaceAudience
.Private
03.
public
class
DatanodeUtil {
04.
public
static
final
String UNLINK_BLOCK_SUFFIX =
".unlinked"
;
05.
06.
public
static
final
String DISK_ERROR =
"Possible disk error: "
;
07.
08.
/** Get the cause of an I/O exception if caused by a possible disk error
09.
* @param ioe an I/O exception
10.
* @return cause if the I/O exception is caused by a possible disk error;
11.
* null otherwise.
12.
*/
13.
static
IOException getCauseIfDiskError(IOException ioe) {
14.
if
(ioe.getMessage()!=
null
&& ioe.getMessage().startsWith(DISK_ERROR)) {
15.
return
(IOException)ioe.getCause();
16.
}
else
{
17.
return
null
;
18.
}
19.
}
20.
21.
/**
22.
* Create a new file.
23.
* @throws IOException
24.
* if the file already exists or if the file cannot be created.
25.
*/
26.
public
static
File createTmpFile(Block b, File f)
throws
IOException {
27.
if
(f.exists()) {
28.
throw
new
IOException(
"Failed to create temporary file for "
+ b
29.
+
". File "
+ f +
" should not be present, but is."
);
30.
}
31.
// Create the zero-length temp file
32.
final
boolean
fileCreated;
33.
try
{
34.
fileCreated = f.createNewFile();
35.
}
catch
(IOException ioe) {
36.
throw
new
IOException(DISK_ERROR +
"Failed to create "
+ f, ioe);
37.
}
38.
if
(!fileCreated) {
39.
throw
new
IOException(
"Failed to create temporary file for "
+ b
40.
+
". File "
+ f +
" should be creatable, but is already present."
);
41.
}
42.
return
f;
43.
}
在呼叫該方法建立資料block時,並沒有我們關心的儲存路徑的選擇策略。
3.我們再來查詢createRbwFile調用出處
1.
/**************************************************
2.
* FSDataset manages a set of data blocks. Each block
3.
* has a unique name and an extent on disk.
4.
*
5.
***************************************************/
6.
@InterfaceAudience
.Private
7.
class
FsDatasetImpl
implements
FsDatasetSpi<FsVolumeImpl> {
8.
static
final
Log LOG = LogFactory.getLog(FsDatasetImpl.
class
);
block管理操作類
01.
@Override
// FsDatasetSpi
02.
public
synchronized
ReplicaInPipeline createRbw(ExtendedBlock b)
03.
throws
IOException {
04.
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
05.
b.getBlockId());
06.
if
(replicaInfo !=
null
) {
07.
throw
new
ReplicaAlreadyExistsException(
"Block "
+ b +
08.
" already exists in state "
+ replicaInfo.getState() +
09.
" and thus cannot be created."
);
10.
}
11.
// create a new block
12.
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
13.
// create a rbw file to hold block in the designated volume
14.
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
15.
ReplicaBeingWritten newReplicaInfo =
new
ReplicaBeingWritten(b.getBlockId(),
16.
b.getGenerationStamp(), v, f.getParentFile());
17.
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
18.
return
newReplicaInfo;
19.
}
呼叫了createRbwFile 方法,該方法同樣建立rbw檔案。
這裡發現了我們關係的volumes,它是配置的儲存路徑。
4.檢視volumes 的初始
volumnes是在建構函式中初始化的,使用了volArray
01.
/**
02.
* An FSDataset has a directory where it loads its data files.
03.
*/
04.
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
05.
)
throws
IOException {
06.
this
.datanode = datanode;
07.
// The number of volumes required for operation is the total number
08.
// of volumes minus the number of failed volumes we can tolerate.
09.
final
int
volFailuresTolerated =
10.
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
11.
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
12.
13.
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
14.
15.
int
volsConfigured = (dataDirs ==
null
) ?
0
: dataDirs.length;
16.
int
volsFailed = volsConfigured - storage.getNumStorageDirs();
17.
this
.validVolsRequired = volsConfigured - volFailuresTolerated;
18.
19.
if
(volFailuresTolerated <
0
|| volFailuresTolerated >= volsConfigured) {
20.
throw
new
DiskErrorException(
"Invalid volume failure "
21.
+
" config value: "
+ volFailuresTolerated);
22.
}
23.
if
(volsFailed > volFailuresTolerated) {
24.
throw
new
DiskErrorException(
"Too many failed volumes - "
25.
+
"current valid volumes: "
+ storage.getNumStorageDirs()
26.
+
", volumes configured: "
+ volsConfigured
27.
+
", volumes failed: "
+ volsFailed
28.
+
", volume failures tolerated: "
+ volFailuresTolerated);
29.
}
30.
31.
final
List<FsVolumeImpl> volArray =
new
ArrayList<FsVolumeImpl>(
32.
storage.getNumStorageDirs());
33.
for
(
int
idx =
0
; idx < storage.getNumStorageDirs(); idx++) {
34.
final
File dir = storage.getStorageDir(idx).getCurrentDir();
35.
volArray.add(
new
FsVolumeImpl(
this
, storage.getStorageID(), dir, conf));
36.
LOG.info(
"Added volume - "
+ dir);
37.
}
38.
volumeMap =
new
ReplicaMap(
this
);
39.
40.
@SuppressWarnings
(
"unchecked"
)
41.
final
VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
42.
ReflectionUtils.newInstance(conf.getClass(
43.
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
44.
RoundRobinVolumeChoosingPolicy.
class
,
45.
VolumeChoosingPolicy.
class
),
conf);
46.
volumes =
new
FsVolumeList(volArray, volsFailed, blockChooserImpl);
47.
volumes.getVolumeMap(volumeMap);
48.
49.
File[] roots =
new
File[storage.getNumStorageDirs()];
50.
for
(
int
idx =
0
; idx < storage.getNumStorageDirs(); idx++) {
51.
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
52.
}
53.
asyncDiskService =
new
FsDatasetAsyncDiskService(datanode, roots);
54.
registerMBean(storage.getStorageID());
55.
}
而volArray 如下生成的:
1.
final
List<FsVolumeImpl> volArray =
new
ArrayList<FsVolumeImpl>(
2.
storage.getNumStorageDirs());
3.
for
(
int
idx =
0
; idx < storage.getNumStorageDirs(); idx++) {
4.
final
File dir = storage.getStorageDir(idx).getCurrentDir();
5.
volArray.add(
new
FsVolumeImpl(
this
, storage.getStorageID(), dir, conf));
6.
LOG.info(
"Added volume - "
+ dir);
7.
}
正式配置檔案中的儲存路徑。
到此,我們找到了需要的儲存路徑,下面再找到如何選擇的路徑的就容易多了。
5.路徑選擇從getNextVolume開始
01.
class
FsVolumeList {
02.
/**
03.
* Read access to this unmodifiable list is not synchronized.
04.
* This list is replaced on modification holding "this" lock.
05.
*/
06.
volatile
List<FsVolumeImpl> volumes =
null
;
07.
08.
private
final
VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
09.
private
volatile
int
numFailedVolumes;
10.
11.
FsVolumeList(List<FsVolumeImpl> volumes,
int
failedVols,
12.
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
13.
this
.volumes = Collections.unmodifiableList(volumes);
14.
this
.blockChooser = blockChooser;
15.
this
.numFailedVolumes = failedVols;
16.
}
17.
18.
int
numberOfFailedVolumes() {
19.
return
numFailedVolumes;
20.
}
21.
22.
/**
23.
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
24.
* by a single thread and next volume is chosen with no concurrent
25.
* update to {@link #volumes}.
26.
* @param blockSize free space needed on the volume
27.
* @return next volume to store the block in.
28.
*/
29.
synchronized
FsVolumeImpl getNextVolume(
long
blockSize)
throws
IOException {
30.
return
blockChooser.chooseVolume(volumes, blockSize);
31.
}
6.繼續chooseVolume 源自於 blockChooser 型別是 VolumeChoosingPolicy ,該方法實現在下面的類中:
01.
/**
02.
* A DN volume choosing policy which takes into account the amount of free
03.
* space on each of the available volumes when considering where to assign a
04.
* new replica allocation. By default this policy prefers assigning replicas to
05.
* those volumes with more available free space, so as to over time balance the
06.
* available space of all the volumes within a DN.
07.
*/
08.
public
class
AvailableSpaceVolumeChoosingPolicy<V
extends
FsVolumeSpi>
09.
implements
VolumeChoosingPolicy<V>, Configurable {
10.
11.
private
static
final
Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.
class
);
12.
13.
private
static
final
Random RAND =
new
Random();
14.
15.
private
long
balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
16.
private
float
balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
從描述中可以看出了,這就是策略檔案。
7.策略實現就是這樣的:
01.
@Override
02.
public
synchronized
V chooseVolume(List<V> volumes,
03.
final
long
replicaSize)
throws
IOException {
04.
if
(volumes.size() <
1
) {
05.
throw
new
DiskOutOfSpaceException(
"No more available volumes"
);
06.
}
07.
08.
AvailableSpaceVolumeList volumesWithSpaces =
09.
new
AvailableSpaceVolumeList(volumes);
10.
11.
if
(volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
12.
// If they're actually not too far out of whack, fall back on pure round
13.
// robin.
14.
V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
15.
if
(LOG.isDebugEnabled()) {
16.
LOG.debug(
"All volumes are within the configured free
space balance "
+
17.
"threshold. Selecting "
+ volume +
" for write of block size "
+
18.
replicaSize);
19.
}
20.
return
volume;
21.
}
else
{
22.
V volume =
null
;
23.
// If none of the volumes with low free space have enough space for the
24.
// replica, always try to choose a volume with a lot of free space.
25.
long
mostAvailableAmongLowVolumes = volumesWithSpaces
26.
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();
27.
28.
List<V> highAvailableVolumes = extractVolumesFromPairs(
29.
volumesWithSpaces.getVolumesWithHighAvailableSpace());
30.
List<V> lowAvailableVolumes = extractVolumesFromPairs(
31.
volumesWithSpaces.getVolumesWithLowAvailableSpace());
32.
33.
float
preferencePercentScaler =
34.
(highAvailableVolumes.size() * balancedPreferencePercent) +
35.
(lowAvailableVolumes.size() * (
1
- balancedPreferencePercent));
36.
float
scaledPreferencePercent =
37.
(highAvailableVolumes.size() * balancedPreferencePercent) /
38.
preferencePercentScaler;
39.
if
(mostAvailableAmongLowVolumes < replicaSize ||
40.
RAND.nextFloat() < scaledPreferencePercent) {
41.
volume = roundRobinPolicyHighAvailable.chooseVolume(
42.
highAvailableVolumes,
43.
replicaSize);
44.
if
(LOG.isDebugEnabled()) {
45.
LOG.debug(
"Volumes are imbalanced. Selecting "
+ volume +
46.
" from high available space volumes for write of block size "
47.
+ replicaSize);
48.
}
49.
}
else
{
50.
volume = roundRobinPolicyLowAvailable.chooseVolume(
51.
lowAvailableVolumes,
52.
replicaSize);
53.
if
(LOG.isDebugEnabled()) {
54.
LOG.debug(
"Volumes are imbalanced. Selecting "
+ volume +
55.
" from low available space volumes for write of block size "
56.
+ replicaSize);
57.
}
58.
}
59.
return
volume;
60.
}
61.
}
關於配置中各個儲存路徑如何選擇及選擇策略都在這裡了,sigh 累死了~~
花費了接近3天的時間,純程式碼看著實累,可以步進就好了。
相關的配置說明。
dfs.datanode.fsdataset.volume.choosing.policy
dfs.datanode.available-space-volume-choosing-policy.balanced-space-thr