解讀:CombineFileInputFormat類
MR-Job預設的輸入格式FileInputFormat為每一個小檔案生成一個切片。CombineFileInputFormat通過將多個“小檔案”合併為一個"切片"(在形成切片的過程中也考慮同一節點、同一機架的資料本地性),讓每一個Mapper任務可以處理更多的資料,從而提高MR任務的執行速度。
1).三個重要的屬性:
- maxSplitSize:切片大小最大值。可通過屬性 "mapreduce.input.fileinputformat.split.maxsize" 或 CombineFileInputFormat.setMaxInputSplitSize()方法進行設定【不設定,則所有輸入只啟動一個map任務】
- minSplitSizeNode:同一節點的資料塊形成切片時,切片大小的最小值。可通過屬性 "mapreduce.input.fileinputformat.split.minsize.per.node" 或 CombineFileInputFormat.setMinSplitSizeNode()方法進行設定
- minSplitSizeRack:同一機架的資料塊形成切片時,切片大小的最小值。可通過屬性 "mapreduce.input.fileinputformat.split.minsize.per.rack" 或 CombineFileInputFormat.setMinSplitSizeRack()方法進行設定
- 大小關係:maxSplitSize > minSplitSizeNode > minSplitSizeRack
2).切片的形成過程:
2.1. 不斷迭代節點列表,逐個節點 (以資料塊為單位) 形成切片(Local Split)
a. 如果maxSplitSize == 0,則整個節點上的Block資料形成一個切片
b. 如果maxSplitSize != 0,遍歷並累加每個節點上的資料塊,如果累加資料塊大小 >= maxSplitSize,則將這些資料塊形成一個切片。繼續該過程,直到剩餘資料塊累加大小 < maxSplitSize 。則進行下一步
c. 如果剩餘資料塊累加大小 >= minSplitSizeNode,則將這些剩餘資料塊形成一個切片。繼續該過程,直到剩餘資料塊累加大小 < minSplitSizeNode。然後進行下一步,並這些資料塊留待後續處理
2.2. 不斷迭代機架列表,逐個機架 (以資料塊為單位) 形成切片(Rack Split)
a. 遍歷並累加這個機架上所有節點的資料塊 (這些資料塊即上一步遺留下來的資料塊),如果累加資料塊大小 >= maxSplitSize,則將這些資料塊形成一個切片。繼續該過程,直到剩餘資料塊累加大小<maxSplitSize。則進行下一步
b. 如果剩餘資料塊累加大小 >= minSplitSizeRack,則將這些剩餘資料塊形成一個切片。如果剩餘資料塊累加大小 < minSplitSizeRack,則這些資料塊留待後續處理
2.3. 遍歷並累加所有Rack上的剩餘資料塊,如果累加資料塊大小 >= maxSplitSize,則將這些資料塊形成一個切片。繼續該過程,直到剩餘資料塊累加大小< maxSplitSize。則進行下一步
2.4. 將最終剩餘的資料塊形成一個切片。
Demo:
規定:maxSplit=100 > minSizeNode=50 > minSizeRack=30
原有檔案:Rack01:{[30,60,70] [80,110]} Rack02:{170}
處理過程:
30+60+70 > 100 ? 100+60 80+110 > 100 ? 100+90 170 > 100 ? 100+70
---> 3個數據切片,以及Rack01:{[60] [90]} Rack02:{70}
---> 60 > 50 ? 50+10 90 > 50 ? 50+40 70 > 50 ? 50+20
---> 3+3個數據切片,以及Rack01:{[10] [40]} Rack02:{20}
---> 10+40 < 100 ?0 20 < 100 ? 0
---> 3+3+0個數據切片,以及Rack01:{50} Rack02:{20}
---> 50+20 > 30 ? 30+30+10
---> 3+3+0+3個數據切片
3).原始碼:getSplit()
@Override
public List<InputSplit> getSplits(JobContext job)
throws IOException {
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
Configuration conf = job.getConfiguration();
// 通過setxxxSplitSize()方法設定的引數值會覆蓋掉從配置檔案中讀取的引數值
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
} else {
//如果maxSize沒有配置,整個Node生成一個Split
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
throw new IOException("Minimum split size per rack " + minSizeRack +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node " + minSizeNode +
" cannot be larger than minimum split " +
"size per rack " + minSizeRack);
}
//獲取輸入路徑中的所有檔案
List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
if (stats.size() == 0) {
return splits;
}
// 迭代為每個過濾池中的檔案生成切片
//一個切片中的資料塊只可能來自於同一個過濾池,但可以來自同一個過濾池中的不同檔案
for (MultiPathFilter onepool : pools) {
ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
//獲取滿足當前過濾池例項onepool的所有檔案myPaths
for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
FileStatus p = iter.next();
if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
iter.remove();
}
}
//為mypaths中的檔案生成切片
getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
//為不屬於任何過濾池的檔案生成切片
getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
//free up rackToNodes map
rackToNodes.clear();
return splits;
}
4).原始碼:getMoreSplits()
無論是滿足某過濾池例項 onePool 條件的檔案,還是不屬於任何過濾池的檔案,可以籠統地理解為 "一批檔案",getMoreSplits()就是為這一批檔案生成切片的。
/**
* Return all the splits in the specified set of paths
*/
private void getMoreSplits(JobContext job, List<FileStatus> stats,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
Configuration conf = job.getConfiguration();
//OneFileInfo類:代表一個檔案
OneFileInfo[] files;
//rackToBlocks:機架和資料塊的對應關係,即某一個機架上有哪些資料塊;
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();
//blockToNodes:資料塊與節點的對應關係,即一塊資料塊的“拷貝”位於哪些節點
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
//nodeToBlocks:節點和資料塊的對應關係,即某一個節點上有哪些資料塊;
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
files = new OneFileInfo[stats.size()];
if (stats.size() == 0) {
return;
}
/**
* 迭代這"一批檔案",為每一個檔案構建OneFileInfo物件
* OneFileInfo物件在構建過程中維護了上述三個對應關係的資訊。
* 迭代完成之後,即可以認為資料塊、節點、機架相互之間的對應關係已經建立完畢
* 接下來可以根據這些資訊生成切片
*/
long totLength = 0;
int i = 0;
for (FileStatus stat : stats) {
files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
totLength += files[i].getLength();
}
//切片的形成過程
createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
maxSize, minSizeNode, minSizeRack, splits);
}
5).原始碼:createSplits()
@VisibleForTesting
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<OneBlockInfo, String[]> blockToNodes,
Map<String, List<OneBlockInfo>> rackToBlocks,
long totLength,
long maxSize,
long minSizeNode,
long minSizeRack,
List<InputSplit> splits
) {
//儲存當前切片所包含的資料塊
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
//儲存當前切片的大小
long curSplitSize = 0;
int totalNodes = nodeToBlocks.size();
long totalLength = totLength;
Multiset<String> splitsPerNode = HashMultiset.create();
Set<String> completedNodes = new HashSet<String>();
while(true) {
// it is allowed for maxSize to be 0. Disable smoothing load for such cases
//逐個節點(資料塊)形成切片
// process all nodes and create splits that are local to a node. Generate
// one split per node iteration, and walk over nodes multiple times to
// distribute the splits across nodes.
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
String node = one.getKey();
// Skip the node if it has previously been marked as completed.
if (completedNodes.contains(node)) {
continue;
}
Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
while (oneBlockIter.hasNext()) {
OneBlockInfo oneblock = oneBlockIter.next();
// Remove all blocks which may already have been assigned to other
// splits.
if(!blockToNodes.containsKey(oneblock)) {
oneBlockIter.remove();
continue;
}
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
//如果資料塊累積大小大於或等於maxSize,則形成一個切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
curSplitSize = 0;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk these
// again.
blocksInCurrentNode.removeAll(validBlocks);
validBlocks.clear();
// Done creating a single split for this node. Move on to the next
// node so that splits are distributed across nodes.
break;
}
}
if (validBlocks.size() != 0) {
// This implies that the last few blocks (or all in case maxSize=0)
// were not part of a split. The node is complete.
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
// This condition also kicks in when max split size is not set. All
// blocks on a node will be grouped together into a single split.
// 如果剩餘資料塊大小大於或等於minSizeNode,則將這些資料塊構成一個切片;
// 如果剩餘資料塊大小小於minSizeNode,則將這些資料塊歸還給blockToNodes,交由後期“同一機架”過程處理
if (minSizeNode != 0 && curSplitSize >= minSizeNode
&& splitsPerNode.count(node) == 0) {
// haven't created any split on this machine. so its ok to add a
// smaller one for parallelism. Otherwise group it in the rack for
// balanced size create an input split and add it to the splits
// array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk this again.
blocksInCurrentNode.removeAll(validBlocks);
// The node is done. This was the last set of blocks for this node.
} else {
// Put the unplaced blocks back into the pool for later rack-allocation.
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
curSplitSize = 0;
completedNodes.add(node);
} else { // No in-flight blocks.
if (blocksInCurrentNode.size() == 0) {
// Node is done. All blocks were fit into node-local splits.
completedNodes.add(node);
} // else Run through the node again.
}
}
// Check if node-local assignments are complete.
if (completedNodes.size() == totalNodes || totalLength == 0) {
// All nodes have been walked over and marked as completed or all blocks
// have been assigned. The rest should be handled via rackLock assignment.
LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
+ completedNodes.size() + ", size left: " + totalLength);
break;
}
}
//逐個機架(資料塊)形成切片
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
//overflowBlocks用於儲存“同一機架”過程處理之後剩餘的資料塊
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
// Create one split for this rack before moving over to the next rack.
// Come back to this rack after creating a single split for each of the
// remaining racks.
// Process one rack location at a time, Combine all possible blocks that
// reside on this rack as one split. (constrained by minimum and maximum
// split size).
//依次處理每個機架
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
rackToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
racks.add(one.getKey());
List<OneBlockInfo> blocks = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
boolean createdSplit = false;
//依次處理該機架的每個資料塊
for (OneBlockInfo oneblock : blocks) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.如果資料塊累積大小大於或等於maxSize,則形成一個切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
createdSplit = true;
break;
}
}
}
// if we created a split, then just go to the next rack
if (createdSplit) {
curSplitSize = 0;
validBlocks.clear();
racks.clear();
continue;
}
if (!validBlocks.isEmpty()) {
//如果剩餘資料塊大小大於或等於minSizeRack,則將這些資料塊構成一個切片
if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a minimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure
addCreatedSplit(splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
//如果剩餘資料塊大小小於minSizeRack,則將這些資料塊加入overflowBlocks
overflowBlocks.addAll(validBlocks);
}
}
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
assert blockToNodes.isEmpty();
assert curSplitSize == 0;
assert validBlocks.isEmpty();
assert racks.isEmpty();
//遍歷並累加剩餘資料塊
for (OneBlockInfo oneblock : overflowBlocks) {
validBlocks.add(oneblock);
curSplitSize += oneblock.length;
// This might cause an exiting rack location to be re-added,
// but it should be ok.
for (int i = 0; i < oneblock.racks.length; i++) {
racks.add(oneblock.racks[i]);
}
// if the accumulated split size exceeds the maximum, then
// create this split.
// 如果剩餘資料塊大小大於或等於maxSize,則將這些資料塊構成一個切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
//剩餘資料塊形成一個切片
if (!validBlocks.isEmpty()) {
addCreatedSplit(splits, getHosts(racks), validBlocks);
}
}
轉發:https://www.cnblogs.com/skyl/p/4754999.html