Hadoop中排序
阿新 • • 發佈:2019-02-10
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; class Map extends Mapper<Object, Text, IntWritable, IntWritable>{ public void map (Object key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); int num = Integer.parseInt(line); context.write(new IntWritable(num), new IntWritable(1)); } } class Reduce extends Reducer<IntWritable, IntWritable,IntWritable,IntWritable>{ private static int lineNum = 1; public void reduce (IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ for(IntWritable val:values){ context.write(new IntWritable(lineNum),key); lineNum++; } } } public class Sort{ public static void main (String[] args) throws Exception{ if (args.length != 2){ System.err.println("can shu shao yu liang ge"); System.exit(-1); } Job job = new Job(); job.setJarByClass(Sort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true)?0:1); } }