利用MapReduce來實現文件全域性搜尋引擎
阿新 • • 發佈:2019-01-07
利用MapReduce來實現全域性搜尋引擎
根據內容來檢視文件,可以統計每個單詞在一些文件中出現了幾次,來實現全文檢索的這樣的一個功能
預備檔案:
hadoop中分三步走:
1.mapper對文件初步處理, 獲得每個單詞以及單詞的路徑,設定每個單詞出現的次數都初步設定為1;
輸出格式 : 單詞||文件uri 1;
2.combiner對於每個文件同樣的單詞初步的合計統計次數並輸出到reducer
合併每個檔案單詞出現的次數,也就是詞頻
輸出格式: 單詞 uri------詞頻
3.reducer經過shuffer處理形成最終的檔案
輸出格式; 單詞 uri------詞頻;uri-------詞頻;
程式碼展示:
package demo01.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class InvertedIndex extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.out.println("args error!"); return -1; } Path src = new Path(args[0]); Path desc = new Path(args[1]); Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); if (fs.exists(desc)) { fs.delete(desc, true); } Job job = Job.getInstance(conf,"倒排索引"); job.setJarByClass(getClass()); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); //這是reducer的東西 job.setMapOutputValueClass(Text.class); job.setCombinerClass(MyCombiner.class); //job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, src); FileOutputFormat.setOutputPath(job, desc); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int code = ToolRunner.run(new InvertedIndex(), args); System.exit(code); } /** * * @author hp 輸出格式 : key單詞:文件uri value每個單詞設定出現次數為1; * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { String uri; Text key2 = new Text(); Text value2 = new Text(); @Override public void setup(Mapper<LongWritable, Text, Text,Text>.Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); this.uri = split.getPath().toString(); } @Override public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,Text>.Context context) throws IOException, InterruptedException { String[] strs = value.toString().split("\\s+"); for (String str : strs) { key2.set(str + "||" + uri); value2.set("1"); context.write(key2, value2); } } } /** * 合併每個檔案單詞出現的次數,也就是詞頻 輸出格式: key單詞 value uri+每個文件中的詞頻 */ public static class MyCombiner extends Reducer<Text, Text, Text, Text> { Text key4 = new Text(); Text value4 = new Text(); @Override public void reduce(Text key3, Iterable<Text> value3, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { int sum = 0; for (Text v3 : value3) { sum += Integer.parseInt(v3.toString()); } String word = key3.toString().substring(0, key3.toString().indexOf("||")); key4.set(word); int pos = key3.toString().length(); String uri = key3.toString().substring(key3.toString().indexOf("||") + 2, pos); value4.set(uri + "-------" +sum); context.write(key4, value4); } } public static class MyReducer extends Reducer<Text, Text, Text,Text>{ Text key6 = new Text(); Text value6 = new Text(); @Override public void reduce(Text key5, Iterable<Text> value5, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text v5 : value5) { sb.append(v5 + ";"); } key6.set(key5.toString()); value6.set(sb.toString()); context.write(key6, value6); } } }
歡迎提出見解跟指導