1. 程式人生 > >hadoop —— Reducer全排序

hadoop —— Reducer全排序

最大 數量 style @override ID 排序 函數類 樣本 AR

目錄      

一、關於Reducer全排序

  1.1、  什麽叫全排序

  1.2、  分區的標準是什麽

二、全排序的三種方式

  2.1、  一個Reducer

  2.2、  自定義分區函數

  2.3、  采樣


  一、關於Reducer全排序  

1.1、什麽叫全排序?

在所有的分區(Reducer)中,KEY都是有序的:

  • 正確舉例:如Reducer分區1中的KEY是1、3、4,分區2中的key是5、8、9
  • 錯誤舉例:如Reducer分區1中的KEY是1、3、4,分區2中的key是2、7、9

1.2、數據分區的標準是什麽?

默認的分區方式是根據mapper後的key的hash值,除以Reducer的分區數量,取其余數判定;例:

  • 某key的hash值是999,此時有3個分區(Reducer),則999 % 3 = 0;則該key和其對應value會分在第一個區(同理,當余數為1,2時會分在對應的另外兩個區)。

註意:若key的類型是Text類(或IntWritable等)的,則計算的是Text類型的key的hash值,而非通過Text獲取到的String(或int等)類型的hash值。

也可自定義分區的判定方式,見下2.2、自定義分區函數


  二、全排序的三種方式  

  • 一個Reduce
  • 自定義分區函數
  • 采樣

2.1、一個Reduce

只有一個Reduce分區,自然是全排序效果


2.2、自定義分區函數

  1. 創建一個繼承Partitioner的類,如:Partition
  2. 重寫其”getPartition“方法,作為判斷分區的依據
  3. 在main的job中將其加入:job.setPartitionerClass(Partition.class);

以隨機分區為例,偽代碼如下:

 1 public class Partition extends Partitioner <Text,IntWritable>{
 2 
 3     @Override
 4     public int getPartition(Text text, IntWritable intWritable, int
numPartitions) { 5 Random r = new Random(); 6 //根據分區的數量(numPartitions),獲取一個隨機值返回,返回的值作為Key判斷分區的依據 7 int i = r.nextInt(numPartitions); 8 return i; 9 } 10 } 11 12 public class RandomAPP { 13 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 14 ...... 15 16 //放判斷放入分區的方式(隨機放入) 17 job.setPartitionerClass(Partition.class); 18 19 ...... 20 21 //等待執行MapperReducer 22 job.waitForCompletion(true); 23 } 24 }


2.3、采樣:TotalOrderPartition

  • RandomSampler:隨機采樣 ,性能差,適合亂序數據
  • IntervalSampler:間隔采樣 ,性能較好,適合有序數據
  • SplitSampler:切片采樣 ,性能較好,適合有序數據

以隨機采樣為例,偽代碼如下:

註:以下需要放在App中設置配置文件的後面

 1         //在App中指定分區函數類
 2         job.setPartitionerClass(TotalOrderPartition.class);
 3 
 4         //設置文件的寫入路徑
 5         TotalOrderPartition.setPartitionFile(job.getConfiguration(),new Path("E:/par.dat"));
 6 
 7         /**
 8          * 初始化采樣器
 9          * RandomSampler    采用隨機采樣的方式
10          * freq             每個Key被選中的概率     freq x key > 分區數
11          * numSamples       需要的樣本數           numSamples  > 分區數
12          * maxSplitsSampled 文件最大切片數         maxSplitsSampled > 當前切片數
13          */
14         InputSampler.RandomSampler = new InputSampler.RandomSampler(freq, numsamples,maxsplitsSampled );
15 
16         //寫入采樣數據
17         InputSampler.writePartitionFile(job,sampler);


    Over    

hadoop —— Reducer全排序