0011.MapReduce程式設計案例2
阿新 • • 發佈:2020-10-23
目錄
- 05-26-實現自連線的MapReduce程式
- 05-27-分析倒排索引的過程
- 05-28-使用MapReduce實現倒排索引1
- 05-29-使用MapReduce實現倒排索引2
- 05-30-使用MRUnit
- 05-31-第一個階段小結
05-26-實現自連線的MapReduce程式
05-27-分析倒排索引的過程
倒排索引資料處理的過程.png
05-28-使用MapReduce實現倒排索引1
05-29-使用MapReduce實現倒排索引2
使用MapReduce實現倒排索引
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { //資料:/indexdata/data01.txt //得到對應檔名 String path = ((FileSplit)context.getInputSplit()).getPath().toString(); //解析出檔名 //得到最後一個斜線的位置 int index = path.lastIndexOf("/"); String fileName = path.substring(index+1); //資料:I love Beijing and love Shanghai String data = value1.toString(); String[] words = data.split(" "); //輸出 for(String word:words){ context.write(new Text(word+":"+fileName), new Text("1")); } } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text k3, Iterable<Text> v3, Context context) throws IOException, InterruptedException { String str = ""; for(Text t:v3){ str = "("+t.toString()+")"+str; } context.write(k3, new Text(str)); } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text k21, Iterable<Text> v21, Context context) throws IOException, InterruptedException { // 求和:對同一個檔案中的單詞進行求和 int total = 0; for(Text v:v21){ total = total + Integer.parseInt(v.toString()); } //k21是:love:data01.txt String data = k21.toString(); //找到:冒號的位置 int index = data.indexOf(":"); String word = data.substring(0, index); //單詞 String fileName = data.substring(index + 1); //檔名 //輸出: context.write(new Text(word), new Text(fileName+":"+total)); } }
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RevertedIndexMain {
public static void main(String[] args) throws Exception {
//1、建立一個任務
Job job = Job.getInstance(new Configuration());
job.setJarByClass(RevertedIndexMain.class); //任務的入口
//2、指定任務的map和map輸出的資料型別
job.setMapperClass(RevertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class); //k2的資料型別
job.setMapOutputValueClass(Text.class); //v2的型別
//指定任務的Combiner
job.setCombinerClass(RevertedIndexCombiner.class);
//3、指定任務的reduce和reduce的輸出資料的型別
job.setReducerClass(RevertedIndexReducer.class);
job.setOutputKeyClass(Text.class); //k4的型別
job.setOutputValueClass(Text.class); //v4的型別
//4、指定任務的輸入路徑、任務的輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、執行任務
job.waitForCompletion(true);
}
}