1. 程式人生 > >Hadoop Partition函數應用(歸檔)

Hadoop Partition函數應用(歸檔)

1-1 sage true throw tasks omsa 如何 wid 選擇

一、實例描述

  在這個實例裏我們使用簡單的數據集,裏面包含多條數據,每條數據由姓名年齡性別成績組成。實例要求是按照如下規則歸檔用戶。

  1.找出年齡小於20歲中男生和女生的最大分數
  2.找出20歲到50歲男生和女生的最大分數
  3.找出50歲以上的男生和女生的最大分數

  樣例輸入:

  技術分享圖片

  樣例輸出:

  1.年齡小於20歲中男生和女生的最大分數

  技術分享圖片

  2.20歲到50歲男生和女生的最大分數

  技術分享圖片

  3.50歲以上的男生和女生的最大分數

  技術分享圖片

二、設計思路

  基於實例需求,我們通過以下幾步完成:第一步,編寫Mapper類,按需求將數據集解析為key=gender,value=name+age+score,然後輸出。第二步

,編寫Partitioner類,按年齡段,將結果指定給不同的Reduce執行。第三步,編寫Reduce類,分別統計出男女學生的最高分。

這裏簡單介紹一下Partition的概念和使用:

  得到map產生的記錄後,他們該分配給哪些reducer來處理呢?hadoop默認是根據散列值來派發,但是實際中,這並不能很高效或者按照我們要求的去執行任務。例如,經過partition處理後,一個節點的reducer分配到了20條記錄,另一個卻分配到了10W萬條,試想,這種情況效率如何。又或者,我們想要處理後得到的文件按照一定的規律進行輸出,假設有兩個reducer,我們想要最終結果中part-00000中存儲的是”h”開頭的記錄的結果,part-00001中存儲其他開頭的結果,這些默認的partitioner是做不到的。所以需要我們自己定制partition來選擇reducer。自定義partitioner很簡單,只要自定義一個類,並且繼承Partitioner類,重寫其getPartition方法就好了,在使用的時候通過調用Job的setPartitionerClass指定一下即可。

  MapReduce基於key的全排序的原理:

  如何使用mapreduce來做全排序?最簡單的方法就是使用一個partition,因為一個partition對應一個reduce的task,然而reduce的輸入本來就是對key有序的,所以很自然地就產生了一個全排序文件。但是這種方法在處理大型文件時效率極低,因為一臺機器必須處理所有輸出文件,從而完全喪失了mapreduce所提供的並行架構的優勢。

  如果是分多個partition呢,則只要確保partition是有序的就行了。首先創建一系列排好序的文件;其次,串聯這些文件(類似於歸並排序);最後得到一個全局有序的文件。比如有1000個1-10000的數據,跑10個ruduce任務,如果進行partition的時候,能夠將在1-1000中數據的分配到第一個reduce中,1001-2000的數據分配到第二個reduce中,以此類推。即第n個reduce所分配到的數據全部大於第n-1個reduce中的數據。這樣,每個reduce出來之後都是有序的了,我們只要concat所有的輸出文件,變成一個大的文件,就都是有序的了。

  這時候可能會有一個疑問,雖然各個reduce的數據是按照區間排列好的,但是每個reduce裏面的數據是亂序的啊?當然不會,不要忘了排序是MapReduce的天然特性 — 在數據達到reducer之前,mapreduce框架已經對這些數據按key排序了。

  但是這裏又有另外一個問題,就是在定義每個partition的邊界的時候,可能會導致每個partition上分配到的記錄數相差很大,這樣數據最多的partition就會拖慢整個系統。我們期望的是每個partition上分配的數據量基本相同,hadoop提供了采樣器幫我們預估整個邊界,以使數據的分配盡量平均。

  在Hadoop中,patition我們可以用TotalOrderPartitioner替換默認的分區,然後將采樣的結果傳給他,就可以實現我們想要的分區。在采樣時,可以使用hadoop的幾種采樣工具,如RandomSampler,InputSampler,IntervalSampler。

三、程序代碼

  程序代碼如下:

  1 import java.io.IOException;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.fs.Path;
  5 import org.apache.hadoop.io.LongWritable;
  6 import org.apache.hadoop.io.Text;
  7 import org.apache.hadoop.mapreduce.Job;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.Partitioner;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 13 import org.apache.hadoop.util.GenericOptionsParser;
 14 
 15 
 16 public class Gender {
 17     
 18     private static String spiltChar = "\t";    //  字段分隔符
 19     
 20     public static class GenderMapper extends Mapper<LongWritable, Text, Text, Text>{
 21         
 22         //  調用map解析一行數據,該行的數據存儲在value參數中,然後根據\t分隔符,解析出姓名,年齡,性別和成績
 23         @Override
 24         protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, Text>.Context context)
 25                 throws IOException, InterruptedException {
 26             //  super.map(key, value, context);
 27             String [] tokens = value.toString().split(spiltChar);
 28             String gender = tokens[2];
 29             String nameAgeScore = tokens[0]+spiltChar+tokens[1]+spiltChar+tokens[3];
 30             //  輸出 key=gender  value=name+age+score
 31             context.write(new Text(gender), new Text(nameAgeScore));
 32         }
 33     }
 34     
 35     //  合並 Mapper 輸出結果
 36     public static class GenderCombiner extends Reducer<Text, Text, Text, Text>{
 37         @Override
 38         protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)
 39                 throws IOException, InterruptedException {
 40             //  super.reduce(arg0, arg1, arg2);
 41             int maxScore = Integer.MIN_VALUE;
 42             int score = 0;
 43             String name = " ";
 44             String age = " ";
 45             for(Text val:values){
 46                 String [] valTokens = val.toString().split(spiltChar);
 47                 score = Integer.parseInt(valTokens[2]);
 48                 if(score>maxScore){
 49                     name = valTokens[0];
 50                     age = valTokens[1];
 51                     maxScore = score;
 52                 }
 53             }
 54             context.write(key, new Text(name + spiltChar + age + spiltChar + maxScore));
 55         }
 56     }
 57     
 58     //  根據age年齡段將map輸出結果均勻分布在reduce 上
 59     public static class GenderPartitioner extends Partitioner<Text, Text>{
 60         @Override
 61         public int getPartition(Text key, Text value, int numReduceTasks) {
 62             String [] nameAgeScore = value.toString().split(spiltChar);
 63             int age = Integer.parseInt(nameAgeScore[1]);
 64             
 65             //  默認指定分區0
 66             if (numReduceTasks == 0) {
 67                 return 0;
 68             }
 69             //  年齡小於等於20,指定分區0
 70             if (age <= 20) {
 71                 return 0;
 72             }else if (age <= 50) {          //  年齡大於20,小於等於50,指定分區1
 73                 return 1 % numReduceTasks;
 74             }else {                          //  剩余年齡指定分區2
 75                 return 2 % numReduceTasks;
 76             }
 77         }
 78     }
 79     
 80     //  統計出不同性別的最高分
 81     public static class GenderReducer extends Reducer<Text, Text, Text, Text>{
 82         @Override
 83         protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)
 84                 throws IOException, InterruptedException {
 85             //  super.reduce(arg0, arg1, arg2);
 86             int maxScore = Integer.MIN_VALUE;
 87             int score = 0;
 88             String name = " ";
 89             String age = " ";
 90             String gender = " ";
 91             
 92             //  根據key,叠代value集合,求出最高分
 93             for(Text val:values){
 94                 String[] valTokens = val.toString().split(spiltChar);
 95                 score = Integer.parseInt(valTokens[2]);
 96                 if (score > maxScore) {
 97                     name = valTokens[0];
 98                     age = valTokens[1];
 99                     gender = key.toString();
100                     maxScore = score;
101                 }
102             }
103             context.write(new Text(name), new Text("age:" + age + spiltChar + "gender:" + gender + spiltChar + "score:" + maxScore));
104         }
105     }
106     
107     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
108         Configuration conf = new Configuration();
109         String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
110         if(otherArgs.length!=2){
111             System.out.println("Usage:wordcount <in> <out>");
112             System.exit(2);
113         }
114         Job job = new Job(conf,"Gender");
115         job.setJarByClass(Gender.class);
116         
117         job.setMapperClass(GenderMapper.class);
118         job.setReducerClass(GenderReducer.class);
119         job.setMapOutputKeyClass(Text.class);
120         job.setMapOutputValueClass(Text.class);
121         job.setOutputKeyClass(Text.class);
122         job.setOutputValueClass(Text.class);
123         
124         job.setCombinerClass(GenderCombiner.class);
125         job.setPartitionerClass(GenderPartitioner.class);
126         job.setNumReduceTasks(3);            //  reduce個數設置為3
127         
128         FileInputFormat.addInputPath(job,new Path(args[0]));
129         FileOutputFormat.setOutputPath(job, new Path(args[1]));
130         System.exit(job.waitForCompletion(true)?0:1);
131     }
132 
133 }

Hadoop Partition函數應用(歸檔)