Hadoop MapReduce開發--取TopN資料
阿新 • • 發佈:2019-01-12
測試資料:
file1.txt
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
file2.txt
100,3333,10,100
101,9321,1000,293
102,3881,701,20
103,6791,910,30
104,8888,11,39
105,2345,880,40
106,1234,700,40
mapper程式碼
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Arrays; public class TOPNMapper extends Mapper<LongWritable, Text, Text, IntWritable> { int len; int[] top; private static final Text KEY = new Text("K"); @Override protected void setup(Context context) throws IOException, InterruptedException { len = context.getConfiguration().getInt("N", 10); top = new int[len + 1]; } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); String[] arr = line.split(","); if(arr.length > 0 && arr.length == 4) { int payment = Integer.valueOf(arr[2]); add(payment); } } private void add(int payment) { top[0] = payment; Arrays.sort(top); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(int i = 1; i < top.length; i++) { context.write(KEY, new IntWritable(top[i])); } } }
reducer程式碼
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Arrays; public class TOPNReducer extends Reducer<Text, IntWritable, IntWritable, IntWritable> { int len; int[] top; @Override protected void setup(Context context) throws IOException, InterruptedException { len = context.getConfiguration().getInt("N", 10); top = new int[len + 1]; } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for(IntWritable val : values) { int payment = val.get(); add(payment); } } private void add(int payment) { top[0] = payment; Arrays.sort(top); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(int i = len; i >= 1; i--) { context.write(new IntWritable(len - i + 1), new IntWritable(top[i])); } } }
job程式碼
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 該案例對輸入資料進行取前N最大值處理,降序排列。 */ public class JobMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if(args.length != 2) { System.err.println("Usage: MaxTemperature<input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); conf.setInt("N", 5); Job job = Job.getInstance(conf, "TOP N job"); job.setJarByClass(Job.class); job.setMapperClass(TOPNMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(TOPNReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); Path outDirPath = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if (fs.exists(outDirPath)) { fs.delete(outDirPath, true); } FileOutputFormat.setOutputPath(job, outDirPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
結果:
1 9000
2 2000
3 1234
4 1000
5 910