MapReduce案例3——求簡單資料去重
阿新 • • 發佈:2019-01-01
資料去重源資料:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
最終結果:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
思路:將兩列資料作為key值,在reduce階段,每次輸出一個,就可以達到去重的目的
/** * @author: lpj * @date: 2018年3月16日 下午7:16:47 * @Description: */ package lpj.reduceWork; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * */ public class DistinctNumMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // conf.addResource("hdfs-site.xml");//使用配置檔案 // System.setProperty("HADOOP_USER_NAME", "hadoop");//使用叢集 FileSystem fs = FileSystem.get(conf);//預設使用本地 Job job = Job.getInstance(conf); job.setJarByClass(DistinctNumMR.class); job.setMapperClass(DistinctNumMR_Mapper.class); job.setReducerClass(DistinctNumMR_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // // String inputpath = args[0]; // String outpath = args[1]; Path inputPath = new Path("d:/a/homework3.txt"); Path outputPath = new Path("d:/a/homework3"); if (fs.exists(inputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class DistinctNumMR_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { //2012-3-3 c String [] reads = value.toString().trim().split(" "); String kk = reads[0] + "\t" + reads[1]; kout.set(kk); context.write(kout, NullWritable.get()); } } public static class DistinctNumMR_Reducer extends Reducer<Text, NullWritable,Text, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
執行結果:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d