Hadoop Partition函數應用(歸檔)
一、實例描述
在這個實例裏我們使用簡單的數據集,裏面包含多條數據,每條數據由姓名、年齡、性別和成績組成。實例要求是按照如下規則歸檔用戶。
1.找出年齡小於20歲中男生和女生的最大分數
2.找出20歲到50歲男生和女生的最大分數
3.找出50歲以上的男生和女生的最大分數
樣例輸入:
樣例輸出:
1.年齡小於20歲中男生和女生的最大分數
2.20歲到50歲男生和女生的最大分數
3.50歲以上的男生和女生的最大分數
二、設計思路
基於實例需求,我們通過以下幾步完成:第一步,編寫Mapper類,按需求將數據集解析為key=gender,value=name+age+score,然後輸出。第二步
這裏簡單介紹一下Partition的概念和使用:
得到map產生的記錄後,他們該分配給哪些reducer來處理呢?hadoop默認是根據散列值來派發,但是實際中,這並不能很高效或者按照我們要求的去執行任務。例如,經過partition處理後,一個節點的reducer分配到了20條記錄,另一個卻分配到了10W萬條,試想,這種情況效率如何。又或者,我們想要處理後得到的文件按照一定的規律進行輸出,假設有兩個reducer,我們想要最終結果中part-00000中存儲的是”h”開頭的記錄的結果,part-00001中存儲其他開頭的結果,這些默認的partitioner是做不到的。所以需要我們自己定制partition來選擇reducer。自定義partitioner很簡單,只要自定義一個類,並且繼承Partitioner類,重寫其getPartition方法就好了,在使用的時候通過調用Job的setPartitionerClass指定一下即可。
MapReduce基於key的全排序的原理:
如何使用mapreduce來做全排序?最簡單的方法就是使用一個partition,因為一個partition對應一個reduce的task,然而reduce的輸入本來就是對key有序的,所以很自然地就產生了一個全排序文件。但是這種方法在處理大型文件時效率極低,因為一臺機器必須處理所有輸出文件,從而完全喪失了mapreduce所提供的並行架構的優勢。
如果是分多個partition呢,則只要確保partition是有序的就行了。首先創建一系列排好序的文件;其次,串聯這些文件(類似於歸並排序);最後得到一個全局有序的文件。比如有1000個1-10000的數據,跑10個ruduce任務,如果進行partition的時候,能夠將在1-1000中數據的分配到第一個reduce中,1001-2000的數據分配到第二個reduce中,以此類推。即第n個reduce所分配到的數據全部大於第n-1個reduce中的數據。這樣,每個reduce出來之後都是有序的了,我們只要concat所有的輸出文件,變成一個大的文件,就都是有序的了。
這時候可能會有一個疑問,雖然各個reduce的數據是按照區間排列好的,但是每個reduce裏面的數據是亂序的啊?當然不會,不要忘了排序是MapReduce的天然特性 — 在數據達到reducer之前,mapreduce框架已經對這些數據按key排序了。
但是這裏又有另外一個問題,就是在定義每個partition的邊界的時候,可能會導致每個partition上分配到的記錄數相差很大,這樣數據最多的partition就會拖慢整個系統。我們期望的是每個partition上分配的數據量基本相同,hadoop提供了采樣器幫我們預估整個邊界,以使數據的分配盡量平均。
在Hadoop中,patition我們可以用TotalOrderPartitioner替換默認的分區,然後將采樣的結果傳給他,就可以實現我們想要的分區。在采樣時,可以使用hadoop的幾種采樣工具,如RandomSampler,InputSampler,IntervalSampler。
三、程序代碼
程序代碼如下:
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Partitioner; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.util.GenericOptionsParser; 14 15 16 public class Gender { 17 18 private static String spiltChar = "\t"; // 字段分隔符 19 20 public static class GenderMapper extends Mapper<LongWritable, Text, Text, Text>{ 21 22 // 調用map解析一行數據,該行的數據存儲在value參數中,然後根據\t分隔符,解析出姓名,年齡,性別和成績 23 @Override 24 protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, Text>.Context context) 25 throws IOException, InterruptedException { 26 // super.map(key, value, context); 27 String [] tokens = value.toString().split(spiltChar); 28 String gender = tokens[2]; 29 String nameAgeScore = tokens[0]+spiltChar+tokens[1]+spiltChar+tokens[3]; 30 // 輸出 key=gender value=name+age+score 31 context.write(new Text(gender), new Text(nameAgeScore)); 32 } 33 } 34 35 // 合並 Mapper 輸出結果 36 public static class GenderCombiner extends Reducer<Text, Text, Text, Text>{ 37 @Override 38 protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context) 39 throws IOException, InterruptedException { 40 // super.reduce(arg0, arg1, arg2); 41 int maxScore = Integer.MIN_VALUE; 42 int score = 0; 43 String name = " "; 44 String age = " "; 45 for(Text val:values){ 46 String [] valTokens = val.toString().split(spiltChar); 47 score = Integer.parseInt(valTokens[2]); 48 if(score>maxScore){ 49 name = valTokens[0]; 50 age = valTokens[1]; 51 maxScore = score; 52 } 53 } 54 context.write(key, new Text(name + spiltChar + age + spiltChar + maxScore)); 55 } 56 } 57 58 // 根據age年齡段將map輸出結果均勻分布在reduce 上 59 public static class GenderPartitioner extends Partitioner<Text, Text>{ 60 @Override 61 public int getPartition(Text key, Text value, int numReduceTasks) { 62 String [] nameAgeScore = value.toString().split(spiltChar); 63 int age = Integer.parseInt(nameAgeScore[1]); 64 65 // 默認指定分區0 66 if (numReduceTasks == 0) { 67 return 0; 68 } 69 // 年齡小於等於20,指定分區0 70 if (age <= 20) { 71 return 0; 72 }else if (age <= 50) { // 年齡大於20,小於等於50,指定分區1 73 return 1 % numReduceTasks; 74 }else { // 剩余年齡指定分區2 75 return 2 % numReduceTasks; 76 } 77 } 78 } 79 80 // 統計出不同性別的最高分 81 public static class GenderReducer extends Reducer<Text, Text, Text, Text>{ 82 @Override 83 protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context) 84 throws IOException, InterruptedException { 85 // super.reduce(arg0, arg1, arg2); 86 int maxScore = Integer.MIN_VALUE; 87 int score = 0; 88 String name = " "; 89 String age = " "; 90 String gender = " "; 91 92 // 根據key,叠代value集合,求出最高分 93 for(Text val:values){ 94 String[] valTokens = val.toString().split(spiltChar); 95 score = Integer.parseInt(valTokens[2]); 96 if (score > maxScore) { 97 name = valTokens[0]; 98 age = valTokens[1]; 99 gender = key.toString(); 100 maxScore = score; 101 } 102 } 103 context.write(new Text(name), new Text("age:" + age + spiltChar + "gender:" + gender + spiltChar + "score:" + maxScore)); 104 } 105 } 106 107 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 108 Configuration conf = new Configuration(); 109 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); 110 if(otherArgs.length!=2){ 111 System.out.println("Usage:wordcount <in> <out>"); 112 System.exit(2); 113 } 114 Job job = new Job(conf,"Gender"); 115 job.setJarByClass(Gender.class); 116 117 job.setMapperClass(GenderMapper.class); 118 job.setReducerClass(GenderReducer.class); 119 job.setMapOutputKeyClass(Text.class); 120 job.setMapOutputValueClass(Text.class); 121 job.setOutputKeyClass(Text.class); 122 job.setOutputValueClass(Text.class); 123 124 job.setCombinerClass(GenderCombiner.class); 125 job.setPartitionerClass(GenderPartitioner.class); 126 job.setNumReduceTasks(3); // reduce個數設置為3 127 128 FileInputFormat.addInputPath(job,new Path(args[0])); 129 FileOutputFormat.setOutputPath(job, new Path(args[1])); 130 System.exit(job.waitForCompletion(true)?0:1); 131 } 132 133 }
Hadoop Partition函數應用(歸檔)