1. 程式人生 > >Partitioner分割槽過程分析

Partitioner分割槽過程分析

            Partition的中文意思就是分割槽,分片的意思,這個階段也是整個MapReduce過程的第三個階段,就在Map任務的後面,他的作用就是使key分到通過一定的分割槽演算法,分到固定的區域中,給不同的Reduce做處理,達到負載均衡的目的。他的執行過程其實就是發生在上篇文章提到的collect的過程階段,當輸入的key呼叫了使用者的map函式時,中間結果就會被分割槽了。雖說這個過程看似不是很重要,但是也有值得學習的地方。Hadoop預設的演算法是HashPartitioner,就是根據key的hashcode取摸運算,很簡單的。

/** Partition keys by their {@link Object#hashCode()}. 
 */
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
但是這雖然能保證了key的隨機分佈,但不能保證全域性有序的實現,因為有些需求需要不同的分割槽呈現出階段性的分佈,第一個區所有key小於第二區間,同樣第二區間要小於第三區間,而你的隨機演算法只是區域性有序,在區間內時有序的,但是存在第一區間的key會大於第二區間的,因此,這裡出現了一個叫TotalOrderPartitioner的類,這也是本次學習的重點。先看看關係Partition的相關類結構。


可見,TotalOrderPartitioner還是挺複雜的。

        TotalOrderPartitioner的作用就是保證全域性有序,對於key的劃分,他劃分了幾個key的抽樣點,作為key的劃分點,比【2,4,6,8】,4個key抽樣點,把區間劃成了5份,如果某個key的值為5,他的區間為4-6,所以在第三區間,也就是說,這個類的作用就是圍繞給定的劃分點,尋找他的區間號,就代表任務的完成,至於你中間用的是二分搜尋,還是其他的什麼演算法,都由你說了算。

      好的,首先第一步,從配置檔案中得到劃分點,他其實是存在於一個叫partition.file的檔案中,配置中只保留了路徑,

public void configure(JobConf job) {
    try {
      //獲得partition file
      String parts = getPartitionFile(job);
      final Path partFile = new Path(parts);
      final FileSystem fs = (DEFAULT_PATH.equals(parts))
        ? FileSystem.getLocal(job)     // assume in DistributedCache
        : partFile.getFileSystem(job);

      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
      //從partition中讀出Spilts分割槽點
      K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
      ....
spiltPoints在後面會起著關鍵的作用。

       然後開始關鍵的操作了,如果你的key值型別不是BinaryComparable二進位制比較型別的話,比如能直接比較值的數字型別,就直接用二分演算法,建立二分搜尋節點,傳入自己的比較器實現:

....
      RawComparator<K> comparator =
        (RawComparator<K>) job.getOutputKeyComparator();
      for (int i = 0; i < splitPoints.length - 1; ++i) {
        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
          throw new IOException("Split points are out of order");
        }
      }
      boolean natOrder =
        job.getBoolean("total.order.partitioner.natural.order", true);
      //判斷是否為BinaryComparable型別,如果是,建立Trie樹
      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
            splitPoints.length, new byte[0],
            job.getInt("total.order.partitioner.max.trie.depth", 2));
      } else {
    	//如果是不是則建立構建BinarySearchNode,用二分查詢,用自己構建的比較器
        partitions = new BinarySearchNode(splitPoints, comparator);
      }
繼續往裡點,裡面的獲取分割槽號的演算法,直接用的是二分搜尋查詢:
/**
   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
   * search the partition keyset with a binary search.
   */
  class BinarySearchNode implements Node<K> {
	//比較的內容節點
    private final K[] splitPoints;
    //比較器
    private final RawComparator<K> comparator;
    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
      this.splitPoints = splitPoints;
      this.comparator = comparator;
    }
    
   /**
    * 通過自己傳入的比較器方法進行二分查詢
    */
    public int findPartition(K key) {
      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
      return (pos < 0) ? -pos : pos;
    }
  }
       但是如果key的型別如果是BinaryComparable二進位制比較型別的呢(你可以就理解為字串型別),則要依賴TrieTree的建立了。裡面分為2種節點,InnerTrieNode和LeafTrieNode,都繼承了TrieNode , LeafTrieNode為葉子節點,最底層儲存的是分割槽點剛剛說過的splitPoints。InnerTrieNode就是在葉子節點上面的節點。這個TrieTree的原理就是從上往下掃描節點,最後到葉子節點,返回分割槽號
。有種二分搜尋樹的感覺。每個inner節點保留255個位元組點,代表著255個字元
/**
   * An inner trie node that contains 256 children based on the next
   * character.
   */
  class InnerTrieNode extends TrieNode {
    private TrieNode[] child = new TrieNode[256];

    InnerTrieNode(int level) {
      super(level);
    }
    ...
所以最後的圖線類似下面這樣,這裡只顯示出了A-Z 26個字母,其實應該有255個:



可以想象這個樹完全展開還是非常大的,所以這是標準的空間換時間的演算法實現,所以建立TrieTree的過程應該是遞迴的過程,直到到達最深的深度,此時應該建立的Leaf葉子節點,至此,樹建立完畢,看程式碼實現:

private TrieNode buildTrie(BinaryComparable[] splits, int lower,
      int upper, byte[] prefix, int maxDepth) {
    final int depth = prefix.length;
    if (depth >= maxDepth || lower == upper) {
      //深度抵達最大的時候,應建立葉子節點了
      return new LeafTrieNode(depth, splits, lower, upper);
    }
    InnerTrieNode result = new InnerTrieNode(depth);
    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
    // append an extra byte on to the prefix
    int currentBound = lower;
    //每個父節點擁有著255個子節點
    for(int ch = 0; ch < 255; ++ch) {
      trial[depth] = (byte) (ch + 1);
      lower = currentBound;
      while (currentBound < upper) {
        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
          break;
        }
        currentBound += 1;
      }
      trial[depth] = (byte) ch;
      //result.child為首節點,遞迴建立子節點
      result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
                                   maxDepth);
    }
    // pick up the rest
    trial[depth] = 127;
    result.child[255] = buildTrie(splits, currentBound, upper, trial,
                                  maxDepth);
    return result;
  }
以上的步驟還只是初始化的過程,並非key查詢獲取partition分割槽的操作,構建過程的的流程圖如下:


    接下來的步驟就是關鍵的輸入key,進而查詢分割槽的過程了,非二進位制比較型別的情況很簡單,直接通過自己的插入的比較器,二分搜尋即可知道結果。我們看看TrieTree實現的字串型別的查詢分割槽如何實現,從以上構建的過程,我們知道,他是一層層的逐層查詢過程,比如你要找,aad這個字元,你當然首先得第一個節點找a,然後再往這個節點的第一個子節點就是字元a在查詢,最後找到葉子節點,在葉子節點的查詢,Hadoop還是用了二分查詢,這時因為本身的劃分資料不是很多,不需要排序直接查詢即可。

下面看看程式碼的實現,首先是innner節點,但字元的查詢:

....
    /**
     * 非葉子的節點的查詢
     */
    public int findPartition(BinaryComparable key) {
      //獲取當前的深度
      int level = getLevel();
      
      if (key.getLength() <= level) {
        return child[0].findPartition(key);
      }
      
      //從key在此位置對應的字元child開始繼續搜尋下一個,key.getBytes()[level]為第level位置的字元
      return child[0xFF & key.getBytes()[level]].findPartition(key);
    }
如果抵達了最後一層的LeafTrieNode,呼叫的是他自己的方法:
....
    //在葉子節點,進行二分查詢分割槽號
    public int findPartition(BinaryComparable key) {
      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
      return (pos < 0) ? -pos : pos;
    }
最終返回的也是分割槽號。也就完成了這個分割槽演算法的最終實現了。標準的空間換時間演算法,但是要保證此演算法的高效性,對於劃分點的採集就顯得非常重要了,得要保證有一定的代表性。才能保證分割槽間的有序。在Hadoop中提供了3個採集的類:

SplitSampler對前n個記錄進行取樣
RandomSampler遍歷所有資料,隨機取樣
IntervalSampler:固定間隔取樣


小小的partition演算法也蘊藏著很多奇妙的演算法,MapReduce的程式碼真的是一份不可多得的好資料啊。