大數據學習日誌——java編寫hadoop的mapreduce實現wordcount功能
阿新 • • 發佈:2019-03-24
int bsp exceptio 輸入參數 長度 con 服務 配置參數 getc
1 package mapreduce; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 14 import java.io.IOException; 15 import java.util.Iterator;16 import java.util.StringTokenizer; 17 18 public class MyMapReduce { 19 //1自己的map類 20 //繼承mapper類,<輸入key,輸入value,輸出value,輸出key> 21 public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 22 //每個key設置輸出value為1 23 IntWritable i = newIntWritable(1); 24 Text keyStr = new Text(); 25 26 @Override 27 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 28 //TextInputFormat是Hadoop默認的數據輸入格式,但是它只能一行一行的讀記錄 29 30 StringTokenizer itr = new StringTokenizer(value.toString()); 31 while (itr.hasMoreTokens()) { 32 keyStr.set(itr.nextToken()); 33 context.write(keyStr, i); 34 } 35 36 } 37 } 38 39 //2自己的reducer類 40 //繼承reducer類,<輸入key,輸入value,輸出value,輸出key> 41 //reducer類的輸入,就是mapper的輸出 42 //mapper類map方法的數據輸入到Reduce類group方法中,對key的value進行分組得到values,再放入reduce方法中 43 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 44 IntWritable countWritable = new IntWritable(); 45 46 @Override 47 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 48 String keyStr = key.toString(); 49 //在map中每個key對應的value為1, 50 //那麽reduce每個key對應的集合便是重復key的個數的長度,並且每個值為1 51 //即集合元素值相加即為key的數量 52 int count = 0; 53 Iterator<IntWritable> it = values.iterator(); 54 while (it.hasNext()) { 55 count += it.next().get(); 56 } 57 countWritable.set(count); 58 System.out.println(keyStr + "---" + count); 59 context.write(key, countWritable); 60 } 61 } 62 63 //3運行類 64 public int run(String[] args) throws Exception { 65 //hadoop配置上下文 66 Configuration conf = new Configuration(); 67 //這裏要是沒有把配置文件放入resources中,需要手動添加配置文件,或者添加配置參數 68 // conf.addResource("core-site.xml"); 69 // conf.addResource("hdfs-site.xml"); 70 //通過上下文構建job實例,並傳入任務名稱 71 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); 72 //設置reduce數量 73 job.setNumReduceTasks(3); 74 //必須添加,否則本地運行沒問他,服務器報錯 75 job.setJarByClass(MyMapReduce.class); 76 //設置任務讀取數據 77 //調用時傳入參數,第一個參數為路徑輸入參數 78 Path inputPath = new Path(args[0]); 79 FileInputFormat.addInputPath(job, inputPath); 80 81 //調用時傳入參數,第二個參數為路徑輸出參數 82 Path outputPath = new Path(args[1]); 83 FileOutputFormat.setOutputPath(job, outputPath); 84 85 //設置mapper類參數 86 job.setMapperClass(MyMapper.class); 87 job.setMapOutputKeyClass(Text.class); 88 job.setMapOutputValueClass(IntWritable.class); 89 90 //設置reducer類參數 91 job.setReducerClass(MyReducer.class); 92 job.setOutputKeyClass(Text.class); 93 job.setOutputValueClass(IntWritable.class); 94 //設置任務保存結果數據 95 96 //設置combiner類,同reduce一樣,同樣繼承reduce類 97 //combiner將多個map的數據單獨處理,reduce處理所有map的所有數據 98 //job.setCombinerClass(); 99 boolean isSuccess = job.waitForCompletion(true); 100 return isSuccess ? 0 : 1; 101 } 102 103 public static void main(String[] args) throws Exception { 104 105 //將傳入的第一個參數作為文件輸入參數,第二個參數作為文件輸出參數 106 System.out.println(args[0]); 107 System.out.println(args[1]); 108 MyMapReduce mr = new MyMapReduce(); 109 int success = -1; 110 success = mr.run(args); 111 System.out.println(success); 112 113 } 114 }
大數據學習日誌——java編寫hadoop的mapreduce實現wordcount功能