hadoop的二級排序——例項
阿新 • • 發佈:2018-12-17
想弄懂hadoop的二級排序,需要對hadoop的shufle過程非常清楚。
(注:在reduce接受資料階段,setSortComparatorClass是對key進行的排序處理,setGroupingComparatorClass是對相同值的value進行排序處理)
下面是一個hadoop的二級排序案例:
編寫mapReduce程式,將下面的左邊內容進行排序,排序後變成右邊的樣子。
mapReduce的程式如下:
第一步:定義一個組合鍵的bean
第二步:自定義一個比較器,對key進行排序時使用package com.bigdata.demo15_two_class_paixu; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定義組合鍵,用於map階段的sort小階段 * @author Administrator * 2018年5月31日上午8:16:38 */ public class CombinationKey implements WritableComparable<CombinationKey>{ private String firstKey; private Integer secondKey; public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public Integer getSecondKey() { return secondKey; } public void setSecondKey(Integer secondKey) { this.secondKey = secondKey; } public void write(DataOutput out) throws IOException { out.writeUTF(this.firstKey); out.writeInt(this.secondKey); } public void readFields(DataInput in) throws IOException { this.firstKey=in.readUTF(); this.secondKey=in.readInt(); } public int compareTo(CombinationKey o) { return this.firstKey.compareTo(o.getFirstKey()); } }
package com.bigdata.demo15_two_class_paixu; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定義比較器 * @author Administrator * 2018年5月31日上午8:40:58 */ public class DefineCompparator extends WritableComparator{ protected DefineCompparator() { super(CombinationKey.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { CombinationKey ck1=(CombinationKey) a; CombinationKey ck2=(CombinationKey) b; int cp1 = ck1.getFirstKey().compareTo(ck2.getFirstKey()); if(cp1!=0) { //結束排序 return cp1; }else { return ck1.getSecondKey()-ck2.getSecondKey(); } } }
第三步:自定義一個分割槽器,在shuffle階段使用
package com.bigdata.demo15_two_class_paixu; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 自定義分割槽 * @author Administrator * 2018年5月31日上午8:20:58 */ public class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{ /** * @param key map輸出,這裡根據組合鍵的第一個值進行分割槽 * @param value map輸出的key * @param numPartitions 分割槽總數,即reduce的個數 */ @Override public int getPartition(CombinationKey key, IntWritable value, int numPartitions) { return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } }
第四步:自定義一個分割槽器,在shuffle階段使用
package com.bigdata.demo15_two_class_paixu;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 自定義分割槽
* @author Administrator
* 2018年5月31日上午8:20:58
*/
public class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{
/**
* @param key map輸出,這裡根據組合鍵的第一個值進行分割槽
* @param value map輸出的key
* @param numPartitions 分割槽總數,即reduce的個數
*/
@Override
public int getPartition(CombinationKey key, IntWritable value, int numPartitions) {
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
}
}
第五步:編寫mapReduce程式
package com.bigdata.demo15_two_class_paixu;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SecondSortMapReduce {
/**
* 使用內部類的形式,定義mapper程式
* @author Administrator
* 2018年5月31日上午11:06:30
*/
static class SecondSortMapper extends Mapper<LongWritable, Text, CombinationKey, IntWritable>{
String[] split=null;
CombinationKey kv=new CombinationKey();
IntWritable v=new IntWritable();
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
split = value.toString().split(" ");
kv.setFirstKey(split[0]);
int vv = Integer.parseInt(split[1]);
v.set(vv);
kv.setSecondKey(vv);
context.write(kv, v);
}
}
/**
* 使用內部類的形式,定義reduce程式
* @author Administrator
* 2018年5月31日上午11:06:51
*/
static class SecondSortReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{
Text k=new Text();
Text v=new Text();
@Override
protected void reduce(CombinationKey first_second, Iterable<IntWritable> seconds,
Context context)
throws IOException, InterruptedException {
StringBuilder sb=new StringBuilder();
for(IntWritable second:seconds) {
sb.append(second.get()+",");
}
k.set(first_second.getFirstKey());
v.set(sb.toString().substring(0, sb.toString().length()-1));
context.write(k, v);
}
}
/**
* 主函式
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondSortMapReduce.class);
job.setMapperClass(SecondSortMapper.class);
job.setReducerClass(SecondSortReducer.class);
//設定分割槽和reduce數目
job.setPartitionerClass(DefinedPartition.class);
job.setNumReduceTasks(1);
//設定自定義的分組策略
job.setGroupingComparatorClass(DefinedGroupSort.class);
//設定自定義的比較策略
job.setSortComparatorClass(DefineCompparator.class);
job.setMapOutputKeyClass(CombinationKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//設定輸入資料
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
第六步:在hadoop叢集上執行
將程式打包,放到hadoop叢集,執行
[[email protected] ~]$ hadoop jar \
> ./jars/Review06_hdfs-0.0.1-SNAPSHOT.jar \
> com.bigdata.demo15_two_class_paixu.SecondSortMapReduce \
> /paixu/input01 \
> /paixu/output03
檢視排序結果:
[[email protected] ~]$ hdfs dfs -cat /paixu/output02/part-r-00000
hadoop 23,32,342
hive 12,42,204,2345
spark 3,16,349