Hadoop:mapreduce程式碼統計文字單詞
阿新 • • 發佈:2022-05-07
首先編寫wordcountMap類
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable> {/* * LongWritable:偏移量,表示該行在檔案中的位置,而不是行號 * Text map階段的輸入資料,一行文字資訊,字串型別String * Text map階段的資料字串型別String * IntWritable map階段輸出的value型別,對應Java中的int型別,表示行號 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//讀取每行文字 String line = value.toString(); //splite拆分 String[] words= line.split(" "); //取出每個單詞 for (String word:words){ //將單詞轉換為Text型別的 Text wordText = new Text(word); //將1轉變為IntWritablele IntWritable outValue = new IntWritable(1);//寫出單詞跟對應1 context.write(wordText,outValue); } } }
再編寫wordcountreduce類
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> { /* * Text:輸入的字串型別,序列化 * IntWritable:輸入一串1,序列化 * Text:輸出的字串型別,序列化 * IntWritable:輸出的求和陣列,序列化 * */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { /* * key:輸入的單詞的名字 * values:輸入一串1 * context:輸入的工具 * */ int sum=0; for(IntWritable number:values){ sum+=number.get(); } context.write(key,new IntWritable(sum)); } }
最後編寫wordcount類將前面的兩個類結合起來
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static void main(String[] args) throws Exception { // 建立本次mr程式的job例項 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定本次job執行的主類 job.setJarByClass(WordCount.class); // 指定本次job的具體mapper reducer實現類 job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); // 指定本次job map階段的輸出資料型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定本次job reduce階段的輸出資料型別 也就是整個mr任務的最終輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定本次job待處理資料的目錄 和程式執行完輸出結果存放的目錄 FileInputFormat.setInputPaths(job, "E:\\Demo\\hadoop\\input\\Wordcount.txt"); FileOutputFormat.setOutputPath(job, new Path("E:\\Demo\\hadoop\\output")); // 提交本次job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
(需要提前在Wordcount.txt中寫入文字)