hadoop MapReduce —— 輸出每個單詞所對應的檔案
阿新 • • 發佈:2018-12-15
下面是四個檔案及其內容。
程式碼實現:
Mapper:
package cn.tedu.invert; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class InvertMapper extends Mapper<LongWritable, Text, Text, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 獲取檔名 FileSplit fileSplit = (FileSplit)context.getInputSplit(); String pathName= fileSplit.getPath().getName(); // 將檔案中的內容提取 String[] words = value.toString().split(" "); // 每一個單詞都對應著自己所在檔案的檔名 for(String word:words){ context.write(new Text(word), new Text(pathName)); } } }
Reducer:
package cn.tedu.invert;import java.io.IOException; import java.util.HashSet; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class InvertReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 雜湊表不存重複元素,將重複的檔名去掉 HashSet<String> set = new HashSet<>(); for (Text text : values) { set.add(text.toString()); } StringBuilder sb = new StringBuilder(); for (String str : set) { sb.append(str.toString()).append(" "); } context.write(key, new Text(sb.toString())); } }
Driver:
package cn.tedu.invert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class InvertDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(cn.tedu.invert.InvertDriver.class); job.setMapperClass(InvertMapper.class); job.setReducerClass(InvertReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.74.129:9000/text/invert")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.74.129:9000/result/invert_result")); if (!job.waitForCompletion(true)) return; } }
結果: