MapReduce——WordCount
阿新 • • 發佈:2018-12-19
新增依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.6.0</version> </dependency>
一、jar方式
package Hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { //四個引數,前兩個為輸入<key,value>對,後兩個為輸出<key,value>對; //LongWritable、IntWritable、Text可視為Java 的long、int、String替代品; public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //一個標記單詞個數的常量,值為1,這個常量也可以不定義,在後面程式直接用整數1代替,private final static定義的是常量; private final static IntWritable one = new IntWritable(1); //充當中間變數,儲存詞; private Text word = new Text(); //map方法,key為偏移量,對value進行拆分,<span style="font-family: Arial, Helvetica, sans-serif;">context為上下文機制;</span> public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //對轉換的字串進行分隔; StringTokenizer itr = new StringTokenizer(value.toString()); //利用迴圈函式進行依次處理; while (itr.hasMoreTokens()) { //返回從當前位置到下一個分隔符的字串; word.set(itr.nextToken()); //如 context.write("hello",1); context.write(word, one); } } } //四個引數,前兩個為輸入<key,value>對,後兩個為輸出<key,value>對; public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //定義一個變數; private IntWritable result = new IntWritable(); //reduce方法,key為如 "hello",Iterable遍歷所有key的個數; public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 用於記錄key個數的變數; int sum = 0; //求key的個數; for (IntWritable val : values) { sum += val.get(); } //把sum個數存到result中去; result.set(sum); //如 context.write("hello",7); context.write(key, result); } } //主方法; public static void main(String[] args) throws Exception { //指定作業執行規範; Configuration conf = new Configuration(); //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } //設定Job名稱、執行物件; Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); //為job設定map類; job.setMapperClass(TokenizerMapper.class); //為job設定Combiner類; job.setCombinerClass(IntSumReducer.class); //為job設定 reduce類; job.setReducerClass(IntSumReducer.class); //設定輸出key型別; job.setOutputKeyClass(Text.class); //設定輸出value型別; job.setOutputValueClass(IntWritable.class); //設定輸入路徑; for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //設定輸出路徑; FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
生成jar,並將其拷貝到/usr/local/hadoop 目錄下,執行以下命令
hadoop jar /usr/local/hadoop/MavenMapReduceHelloWorld-1.0-SNAPSHOT.jar
Hadoop.WordCount /input /output
結果
二、IDEA 遠端執行
package Hadoop; import java.io.IOException; import java.net.URI; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount2 { //四個引數,前兩個為輸入<key,value>對,後兩個為輸出<key,value>對; //LongWritable、IntWritable、Text可視為Java 的long、int、String替代品; public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //一個標記單詞個數的常量,值為1,這個常量也可以不定義,在後面程式直接用整數1代替,private final static定義的是常量; private final static IntWritable one = new IntWritable(1); //充當中間變數,儲存詞; private Text word = new Text(); //map方法,key為偏移量,對value進行拆分,<span style="font-family: Arial, Helvetica, sans-serif;">context為上下文機制;</span> public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println("Map key:" + key + ",value:" + value); //對轉換的字串進行分隔; StringTokenizer itr = new StringTokenizer(value.toString()); //利用迴圈函式進行依次處理; while (itr.hasMoreTokens()) { //返回從當前位置到下一個分隔符的字串; word.set(itr.nextToken()); //如 context.write("hello",1); context.write(word, one); } } } //四個引數,前兩個為輸入<key,value>對,後兩個為輸出<key,value>對; public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //定義一個變數; private IntWritable result = new IntWritable(); //reduce方法,key為如 "hello",Iterable遍歷所有key的個數; public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); sb.append("Reduce key:" + key + ",value:"); // 用於記錄key個數的變數; int sum = 0; //求key的個數; for (IntWritable val : values) { sb.append(val.get()+" "); sum += val.get(); } System.out.println(sb.toString()); //把sum個數存到result中去; result.set(sum); //如 context.write("hello",7); context.write(key, result); } } //主方法; public static void main(String[] args) throws Exception { //指定作業執行規範; Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.255.128:9000"); System.setProperty("HADOOP_USER_NAME", "root"); //hadoop2.6 資料夾放在 hadoop2.6.rar 中 System.setProperty("hadoop.home.dir", "E:/hadoop2.6"); final String OUTPUT_PATH="hdfs://192.168.255.128:9000/output"; Path outpath = new Path(OUTPUT_PATH); //清空原先的資料 FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf); if(fs.exists(outpath)){ fs.delete(outpath,true); } //設定Job名稱、執行物件; Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); //為job設定map類; job.setMapperClass(TokenizerMapper.class); //為job設定Combiner類; job.setCombinerClass(IntSumReducer.class); //為job設定 reduce類; job.setReducerClass(IntSumReducer.class); //設定輸出key型別; job.setOutputKeyClass(Text.class); //設定輸出value型別; job.setOutputValueClass(IntWritable.class); //設定輸入路徑 FileInputFormat.addInputPath(job, new Path("hdfs://192.168.255.128:9000/input")); //設定輸出路徑; FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.255.128:9000/output")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
將 hadoop2.6/bin 資料夾下面 的hadoop.dll拷貝到C:\Windows\ System32