基於Hadoop的帶詞頻屬性的文件倒排索引
Inverted Index(倒排索引)是目前幾乎所有支援全文檢索的搜尋引擎都要依賴的一個數據結構。基於索引結構,給出一個詞(term),能取得含有這個term的文件列表(the list of documents)。例如:
如果考慮單詞在每個文件中出現的詞頻、位置、對應Web文件的URL等諸多屬性,簡單的倒排演算法就不足以有效工作。我們把這些詞頻、位置等諸多屬性稱為有效負載(Payload)。
其Map和Reduce實現虛擬碼如下:
1: class Mapper
2: procedure Map(docid n, doc d)
3: H ← new AssociativeArray
4: for all term t ∈ doc d do
5: H{t} ← H{t} + 1 //詞頻屬性
6: for all term t ∈ H do
7: Emit(term t, posting <n, H{t}>)
1: class Reducer
2: procedure Reduce(term t, postings [<n1, f1>, <n2, f2>…])
3: P ← new List
4: for all posting <a, f> ∈ postings [<n1, f1>, <n2, f2>…] do
5: Append(P, <a, f>)
6: Sort(P)
7: Emit(term t; postings P)
mapreduce過程如下:
下面貼程式碼
import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>
{ @Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String word;
IntWritable frequence=new IntWritable();
int one=1;
Hashtable<String,Integer> hashmap=new Hashtable(); //key關鍵字選擇String而不是Text,選擇Text會出錯
StringTokenizer itr = new StringTokenizer(value.toString());
for(;itr.hasMoreTokens(); )
{
word=itr.nextToken();
if(hashmap.containsKey(word)){
hashmap.put(word,hashmap.get(word)+1); //由於Map的輸入key是每一行對應的偏移量,
、 //所以只能統計每一行中相同單詞的個數,
}else{
hashmap.put(word, one);
}
}
for(Iterator<String> it=hashmap.keySet().iterator();it.hasNext();){
word=it.next();
frequence=new IntWritable(hashmap.get(word));
Text fileName_frequence = new Text(fileName+"@"+frequence.toString());
context.write(new Text(word),fileName_frequence); //以”fish [email protected]“ 的格式輸出
}
}
}
public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text key,Iterable<Text> values,Context context)
throws IOException ,InterruptedException{ //合併mapper函式的輸出
String fileName="";
int sum=0;
String num;
String s;
for (Text val : values) {
s= val.toString();
fileName=s.substring(0, val.find("@"));
num=s.substring(val.find("@")+1, val.getLength()); //提取“ [email protected]”中‘@’後面的詞頻
sum+=Integer.parseInt(num);
}
IntWritable frequence=new IntWritable(sum);
context.write(key,new Text(fileName+"@"+frequence.toString()));
}
}
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>
{ @Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{ Iterator<Text> it = values.iterator();
StringBuilder all = new StringBuilder();
if(it.hasNext()) all.append(it.next().toString());
for(;it.hasNext();) {
all.append(";");
all.append(it.next().toString());
}
context.write(key, new Text(all.toString()));
} //最終輸出鍵值對示例:(“fish", “[email protected]; [email protected];[email protected];[email protected] ")
}
public static void main(String[] args)
{
if(args.length!=2){
System.err.println("Usage: InvertedIndex <in> <out>");
System.exit(2);
}
try {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "invertedindex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
參考文獻:Jimmy Lin Lin and Chris Dyer:Data-Intensive Text Processing with MapReduce