自定義combiner實現文件倒排索引
阿新 • • 發佈:2019-01-17
ide import int exce light main onf dex ins
package com.zuoyan.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * @author root * 1:輸入文件中並沒有地址的輸入,那麽我們需要在mapper端讀取數據的時候,插入其地址。 * 按“”空格分割字符串,mapper的輸出 <key,value>=<值 地址,1>或者<值 地址,(1,1)> * 2:利用mapper和reducer之間一個極其重要的組件combiner進行首次的處理, * 並且分離key中的值與地址,此時的輸出結果<key,value>=<值,地址 1>或者<值,地址 2> * 註意:此組件是屬於mapper端階段的。 * 3:reducer開始進行最後的處理。 */ public class CombinerTest { // main public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CombinerTest.class); //1 job.setMapperClass(LastSearchMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //2 job.setCombinerClass(LastSearchComb.class); //3 job.setReducerClass(LastSearchReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean x = job.waitForCompletion(true); System.out.println(x); } // mapper public class LastSearchMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String words[] = line.split(" "); InputSplit input = context.getInputSplit(); String pathname = ((FileSplit) input).getPath().getName();// 得到此時數據的地址 for (String word : words) { String word1 = word + " " + pathname; context.write(new Text(word1), new Text("1")); } } } // combiner public class LastSearchComb extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException { int sum = 0; for (Text arg : arg1) { String word = arg.toString(); int wordINT = Integer.parseInt(word); sum = wordINT + sum; } String line = arg0.toString(); String word[] = line.split(" "); arg2.write(new Text(word[0]), new Text(word[1] + ":" + sum)); } } // reducer public class LastSearchReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException { String newword = new String(); for (Text word : arg1) { String wordString = word.toString(); newword = newword + wordString + " "; } arg2.write(arg0, new Text(newword)); } } }
pom導入hadoop-Client即可
自定義combiner實現文件倒排索引