Partitioner分割槽過程分析
Partition的中文意思就是分割槽,分片的意思,這個階段也是整個MapReduce過程的第三個階段,就在Map任務的後面,他的作用就是使key分到通過一定的分割槽演算法,分到固定的區域中,給不同的Reduce做處理,達到負載均衡的目的。他的執行過程其實就是發生在上篇文章提到的collect的過程階段,當輸入的key呼叫了使用者的map函式時,中間結果就會被分割槽了。雖說這個過程看似不是很重要,但是也有值得學習的地方。Hadoop預設的演算法是HashPartitioner,就是根據key的hashcode取摸運算,很簡單的。
但是這雖然能保證了key的隨機分佈,但不能保證全域性有序的實現,因為有些需求需要不同的分割槽呈現出階段性的分佈,第一個區所有key小於第二區間,同樣第二區間要小於第三區間,而你的隨機演算法只是區域性有序,在區間內時有序的,但是存在第一區間的key會大於第二區間的,因此,這裡出現了一個叫TotalOrderPartitioner的類,這也是本次學習的重點。先看看關係Partition的相關類結構。/** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
可見,TotalOrderPartitioner還是挺複雜的。
TotalOrderPartitioner的作用就是保證全域性有序,對於key的劃分,他劃分了幾個key的抽樣點,作為key的劃分點,比【2,4,6,8】,4個key抽樣點,把區間劃成了5份,如果某個key的值為5,他的區間為4-6,所以在第三區間,也就是說,這個類的作用就是圍繞給定的劃分點,尋找他的區間號,就代表任務的完成,至於你中間用的是二分搜尋,還是其他的什麼演算法,都由你說了算。
好的,首先第一步,從配置檔案中得到劃分點,他其實是存在於一個叫partition.file的檔案中,配置中只保留了路徑,
spiltPoints在後面會起著關鍵的作用。public void configure(JobConf job) { try { //獲得partition file String parts = getPartitionFile(job); final Path partFile = new Path(parts); final FileSystem fs = (DEFAULT_PATH.equals(parts)) ? FileSystem.getLocal(job) // assume in DistributedCache : partFile.getFileSystem(job); Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); //從partition中讀出Spilts分割槽點 K[] splitPoints = readPartitions(fs, partFile, keyClass, job); ....
然後開始關鍵的操作了,如果你的key值型別不是BinaryComparable二進位制比較型別的話,比如能直接比較值的數字型別,就直接用二分演算法,建立二分搜尋節點,傳入自己的比較器實現:
....
RawComparator<K> comparator =
(RawComparator<K>) job.getOutputKeyComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
job.getBoolean("total.order.partitioner.natural.order", true);
//判斷是否為BinaryComparable型別,如果是,建立Trie樹
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
job.getInt("total.order.partitioner.max.trie.depth", 2));
} else {
//如果是不是則建立構建BinarySearchNode,用二分查詢,用自己構建的比較器
partitions = new BinarySearchNode(splitPoints, comparator);
}
繼續往裡點,裡面的獲取分割槽號的演算法,直接用的是二分搜尋查詢:/**
* For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
* where disabled by <tt>total.order.partitioner.natural.order</tt>,
* search the partition keyset with a binary search.
*/
class BinarySearchNode implements Node<K> {
//比較的內容節點
private final K[] splitPoints;
//比較器
private final RawComparator<K> comparator;
BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
this.splitPoints = splitPoints;
this.comparator = comparator;
}
/**
* 通過自己傳入的比較器方法進行二分查詢
*/
public int findPartition(K key) {
final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
return (pos < 0) ? -pos : pos;
}
}
但是如果key的型別如果是BinaryComparable二進位制比較型別的呢(你可以就理解為字串型別),則要依賴TrieTree的建立了。裡面分為2種節點,InnerTrieNode和LeafTrieNode,都繼承了TrieNode , LeafTrieNode為葉子節點,最底層儲存的是分割槽點剛剛說過的splitPoints。InnerTrieNode就是在葉子節點上面的節點。這個TrieTree的原理就是從上往下掃描節點,最後到葉子節點,返回分割槽號。有種二分搜尋樹的感覺。每個inner節點保留255個位元組點,代表著255個字元
/**
* An inner trie node that contains 256 children based on the next
* character.
*/
class InnerTrieNode extends TrieNode {
private TrieNode[] child = new TrieNode[256];
InnerTrieNode(int level) {
super(level);
}
...
所以最後的圖線類似下面這樣,這裡只顯示出了A-Z 26個字母,其實應該有255個:
可以想象這個樹完全展開還是非常大的,所以這是標準的空間換時間的演算法實現,所以建立TrieTree的過程應該是遞迴的過程,直到到達最深的深度,此時應該建立的Leaf葉子節點,至此,樹建立完畢,看程式碼實現:
private TrieNode buildTrie(BinaryComparable[] splits, int lower,
int upper, byte[] prefix, int maxDepth) {
final int depth = prefix.length;
if (depth >= maxDepth || lower == upper) {
//深度抵達最大的時候,應建立葉子節點了
return new LeafTrieNode(depth, splits, lower, upper);
}
InnerTrieNode result = new InnerTrieNode(depth);
byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
// append an extra byte on to the prefix
int currentBound = lower;
//每個父節點擁有著255個子節點
for(int ch = 0; ch < 255; ++ch) {
trial[depth] = (byte) (ch + 1);
lower = currentBound;
while (currentBound < upper) {
if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
break;
}
currentBound += 1;
}
trial[depth] = (byte) ch;
//result.child為首節點,遞迴建立子節點
result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
maxDepth);
}
// pick up the rest
trial[depth] = 127;
result.child[255] = buildTrie(splits, currentBound, upper, trial,
maxDepth);
return result;
}
以上的步驟還只是初始化的過程,並非key查詢獲取partition分割槽的操作,構建過程的的流程圖如下:
接下來的步驟就是關鍵的輸入key,進而查詢分割槽的過程了,非二進位制比較型別的情況很簡單,直接通過自己的插入的比較器,二分搜尋即可知道結果。我們看看TrieTree實現的字串型別的查詢分割槽如何實現,從以上構建的過程,我們知道,他是一層層的逐層查詢過程,比如你要找,aad這個字元,你當然首先得第一個節點找a,然後再往這個節點的第一個子節點就是字元a在查詢,最後找到葉子節點,在葉子節點的查詢,Hadoop還是用了二分查詢,這時因為本身的劃分資料不是很多,不需要排序直接查詢即可。
下面看看程式碼的實現,首先是innner節點,但字元的查詢:
....
/**
* 非葉子的節點的查詢
*/
public int findPartition(BinaryComparable key) {
//獲取當前的深度
int level = getLevel();
if (key.getLength() <= level) {
return child[0].findPartition(key);
}
//從key在此位置對應的字元child開始繼續搜尋下一個,key.getBytes()[level]為第level位置的字元
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
如果抵達了最後一層的LeafTrieNode,呼叫的是他自己的方法:
....
//在葉子節點,進行二分查詢分割槽號
public int findPartition(BinaryComparable key) {
final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
return (pos < 0) ? -pos : pos;
}
最終返回的也是分割槽號。也就完成了這個分割槽演算法的最終實現了。標準的空間換時間演算法,但是要保證此演算法的高效性,對於劃分點的採集就顯得非常重要了,得要保證有一定的代表性。才能保證分割槽間的有序。在Hadoop中提供了3個採集的類:
SplitSampler:對前n個記錄進行取樣
RandomSampler:遍歷所有資料,隨機取樣
IntervalSampler:固定間隔取樣
小小的partition演算法也蘊藏著很多奇妙的演算法,MapReduce的程式碼真的是一份不可多得的好資料啊。