1. 程式人生 > >Hadoop Mapreduce模板

Hadoop Mapreduce模板

-h util apr final drive extend extends rabl private

技術分享

Mapper

 1 package com.scb.jason.mapper;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 
 8 import java.io.IOException;
 9 import java.util.StringTokenizer;
10 
11 /**
12
* Created by Administrator on 2017/7/23. 13 */ 14 // Step 1: Map Class 15 public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> { 16 17 private Text mapOutputkey = new Text(); 18 private final static IntWritable mapOutputValue = new IntWritable(1); 19 20 @Override
21 protected void setup(Context context) throws IOException, InterruptedException { 22 super.setup(context); 23 } 24 25 @Override 26 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 27 String lineValue = value.toString();
28 StringTokenizer stringTokenizer = new StringTokenizer(lineValue); 29 while(stringTokenizer.hasMoreTokens()){ 30 String wordValue = stringTokenizer.nextToken(); 31 mapOutputkey.set(wordValue); 32 context.write(mapOutputkey,mapOutputValue); 33 } 34 } 35 36 @Override 37 protected void cleanup(Context context) throws IOException, InterruptedException { 38 super.cleanup(context); 39 } 40 41 @Override 42 public void run(Context context) throws IOException, InterruptedException { 43 super.run(context); 44 } 45 }

Reducer

 1 package com.scb.jason.reducer;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6 
 7 import java.io.IOException;
 8 
 9 /**
10  * Created by Administrator on 2017/7/23.
11  */
12 // Step 2: Reduce Class
13 public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
14 
15     private IntWritable outputValue = new IntWritable();
16 
17     @Override
18     protected void setup(Context context) throws IOException, InterruptedException {
19         super.setup(context);
20     }
21 
22     @Override
23     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
24         int sum = 0;
25         for(IntWritable value:values){
26             sum += value.get();
27         }
28         outputValue.set(sum);
29         context.write(key,outputValue);
30     }
31 
32     @Override
33     protected void cleanup(Context context) throws IOException, InterruptedException {
34         super.cleanup(context);
35     }
36 
37     @Override
38     public void run(Context context) throws IOException, InterruptedException {
39         super.run(context);
40     }
41 }

Driver

 1 package com.scb.jason.driver;
 2 
 3 import com.scb.jason.mapper.WordCountMapper;
 4 import com.scb.jason.reducer.WordCountReducer;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9 import org.apache.hadoop.io.IntWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.util.Tool;
15 import org.apache.hadoop.util.ToolRunner;
16 
17 import java.io.IOException;
18 
19 /**
20  * Created by Administrator on 2017/7/17.
21  */
22 public class WordCount extends Configured implements Tool {
23 
24     // Step 3: Driver
25     public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
26         Configuration configuration = new Configuration();
27         FileSystem fs = FileSystem.get(configuration);
28 
29         Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
30         job.setJarByClass(this.getClass());
31 
32         Path input = new Path(args[0]);
33         FileInputFormat.addInputPath(job,input);
34 
35         job.setMapperClass(WordCountMapper.class);
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(IntWritable.class);
38 
39         job.setReducerClass(WordCountReducer.class);
40         job.setMapOutputKeyClass(Text.class);
41         job.setMapOutputValueClass(IntWritable.class);
42 
43         Path outPath = new Path(args[1]);
44         if(fs.exists(outPath)){
45             fs.delete(outPath,true);
46         }
47         FileOutputFormat.setOutputPath(job,outPath);
48 
49         boolean isSuccess = job.waitForCompletion(true);
50         return isSuccess?1:0;
51     }
52 
53     public static void main(String[] args) throws Exception {
54         int exitCode = ToolRunner.run(new WordCount(),args);
55         System.exit(exitCode);
56     }
57 
58 }

Hadoop Mapreduce模板