使用Hadoop 實現文件倒排索引
文件倒排索引主要是統計每個單詞在各個文件中出現的頻數,因此要以單詞為key,value為文件以及該單詞在此文件頻數,即輸出資料的格式形如:
< word1,[doc1,3] [doc2,4] ... > :表示word1這個單詞在doc1文件中出現了3次,在doc2文件中出現了4次。
整個程式的輸入是一系列檔案,比如file01.txt, file02.txt, file03.txt ....,首先要將這些檔案上傳到hadoop hdfs中作為程式的輸入。上傳過程以及Java類的編譯等可以參考這篇部落格:執行Hadoop示例程式WordCount,這裡不再詳細介紹。本程式的原始碼在文章最後面。
一、程式執行的大體思路
由於文件倒排索引考察的是一個單詞和文件的關係,而系統預設的LineRecordReader是按照每行的偏移量作為map輸入時的key值,每行的內容作為map的value值,這裡的key值(即行偏移量對我們的意義不大),我們這裡考慮將一個文件的名字作為關鍵字,而每一行的值作為value,這樣處理起來比較方便,(即:map的輸入形式為<fileName, a line>,主要是通過一個自定義的RecordReader類來實現,下面會有介紹)。整個程式資料處理流程如下面所示:
map類的主要作用是處理程式的輸入,這裡的輸入形式是<fileName,a line>,即輸入的關鍵字key是檔名如file01.txt,值value為一行資料,map的任務是將這一行資料進行分詞,並以圖中第一部分的形式進行輸出。
combine類的主要作用是將map輸出的相同的key的value進行合併(相加),這樣有利於減少資料傳輸,combine是在本節點進行的。
partition的主要作用是對combine的輸出進行分割槽,分割槽的目的是使key值相同的資料被分到同一個節點,這樣在進行reduce操作的時候僅需要本地的資料就足夠,不需要通過網路向其他節點尋找資料。上圖中的 "partitionby word1 rather than word1#doc1" 意思是將word1作為分割槽時的關鍵字,而不是word1#doc1,因為我們在之前的輸出的關鍵字的形式是word1#doc1
reduce的操作主要是將結果進行求和整理,並使結果符合我們所要的形式。
2、程式和各個類的設計說明
這部分按照程式執行的順序依次介紹每個類的設計和作用,有些子類繼承了父類,但是並沒有重新實現父類的方法,這裡不詳細介紹這些方法。
2.1、FileNameRecordReader類
FileNameRecordReader類繼承自RecordReader,是RecordReader類的自定義實現,主要作用是將記錄所在的檔名作為key,而不是記錄行所在檔案的偏移,獲取檔名所用的語句為:
fileName = ((FileSplit) arg0).getPath().getName();
2.2、FileNameInputFormat類
因為我們重寫了RecordReader類,這裡要重寫FileInputFormat類來使用我們的自定義FileNameRecordReader,這個類的主要作用就是返回一個FileNameRecordReader類的例項。
2.3、InvertedIndexMapper類
這個類繼承自Mapper,主要方法有setup和map方法,setup方法的主要作用是在執行map前初始化一個stopwords的list,主要在map處理輸入的單詞時,如果該單詞在stopwords的list中,則跳過該單詞,不進行處理。stopwords剛開始是以一個文字檔案的形式存放在hdfs中,程式在剛開始執行的時候通過Hadoop Configuration將這個文字檔案設定為CacheFile供各個節點共享,並在執行map前,初始化一個stopwords列表。
InvertedIndexMapper的主要操作是map,這個方法將讀入的一行資料進行分詞操作,並以<key: word1#doc1 value: 1>的鍵值對形式,向外寫資料,在map方法中,寫出的value都是1。InvertedIndexMapper類的類圖如下圖2所示。
2.4、SumCombiner類
這個類主要是將前面InvertedIndexMapper類的輸出結果進行合併,如果一個單詞在一個文件中出現了多次,則將value的值設定為出現的次數和。
2.5、NewPartitioner類
2.5、InvertedIndexReducer類
InvertedIndexReducerreduce的輸入形式為:<key: word1#doc1 value: 2> <key: word1#doc2 value: 1> <key: word2#doc1 value: 1>,如第一個圖中所示可見同一個單詞會作為多次輸入,傳遞給reduce,而最終的結果要求只輸出一次單詞,而不同的文件如doc1,doc2要作為這個單詞的value輸出,我們的reduce在實現此功能時,設定兩個變數CurrentItem和postingList,其中CurrentItem儲存每次每次讀入的key,初始值為空,postingList是一個列表,表示這個key對於的出現的文件以及在此文件中出現的次數。因為同一個key可能被讀入多次,每次在讀入key時,同上一個CurrentItem進行比較,如果跟上一個CurrentItem相同,表示讀入的是同一個key,進而將新讀入的key的文件追加到postingList中;如果根上一個CurrentItem不同,表示相同的單詞以及讀完了,這時候我們要統計上一個CurrentItem出現的總次數,以及含有此item的總的文章數,這些資訊我們之前都存放在postingList中,只要遍歷此時的postingList就能得到上述資訊,並在得到資訊之後重置CurrentItem和postingList。具體見程式碼實現。其類圖如上圖所示。
3、執行結果截圖
我編譯以及執行使用的命令如下,大家可以根據自己目錄情況適當調整
javac -classpath ~/hadoop-1.2.1/hadoop-core-1.2.1.jar -d ./ InvertedIndexer.java
jar -cfv inverted.jar -C ./* .
hadoop jar ./inverted.jar InvertedIndexer input output
#執行結束後顯示
hadoop fs -cat output/part-r-00000
結果截圖:
4、源程式
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.ArrayList;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndexer {
public static class FileNameInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,
InterruptedException {
FileNameRecordReader fnrr = new FileNameRecordReader();
fnrr.initialize(split, context);
return fnrr;
}
}
public static class FileNameRecordReader extends RecordReader<Text, Text> {
String fileName;
LineRecordReader lrr = new LineRecordReader();
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lrr.getCurrentValue();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
lrr.initialize(arg0, arg1);
fileName = ((FileSplit) arg0).getPath().getName();
}
public void close() throws IOException {
lrr.close();
}
public boolean nextKeyValue() throws IOException, InterruptedException {
return lrr.nextKeyValue();
}
public float getProgress() throws IOException, InterruptedException {
return lrr.getProgress();
}
}
public static class InvertedIndexMapper extends Mapper<Text, Text, Text, IntWritable> {
private Set<String> stopwords;
private Path[] localFiles;
private String pattern = "[^\\w]";
public void setup(Context context) throws IOException,InterruptedException {
stopwords = new TreeSet<String>();
Configuration conf = context.getConfiguration();
localFiles = DistributedCache.getLocalCacheFiles(conf);
for (int i = 0; i < localFiles.length; i++) {
String line;
BufferedReader br = new BufferedReader(new FileReader(localFiles[i].toString()));
while ((line = br.readLine()) != null) {
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
stopwords.add(itr.nextToken());
}
}
br.close();
}
}
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String temp = new String();
String line = value.toString().toLowerCase();
line = line.replaceAll(pattern, " ");
StringTokenizer itr = new StringTokenizer(line);
for (; itr.hasMoreTokens();) {
temp = itr.nextToken();
if (!stopwords.contains(temp)) {
Text word = new Text();
word.set(temp + "#" + key);
context.write(word, new IntWritable(1));
}
}
}
}
public static class SumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class NewPartitioner extends HashPartitioner<Text, IntWritable> {
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String term = new String();
term = key.toString().split("#")[0]; // <term#docid>=>term
return super.getPartition(new Text(term), value, numReduceTasks);
}
}
public static class InvertedIndexReducer extends Reducer<Text, IntWritable, Text, Text> {
private Text word1 = new Text();
private Text word2 = new Text();
String temp = new String();
static Text CurrentItem = new Text(" ");
static List<String> postingList = new ArrayList<String>();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
String keyWord = key.toString().split("#")[0];
int needBlank = 15-keyWord.length();
for(int i=0;i<needBlank;i++){
keyWord += " ";
}
word1.set(keyWord);
temp = key.toString().split("#")[1]; //key的形式為word1#doc1,所以temp為doc1
for (IntWritable val : values) { //得到某個單詞在一個檔案中的總數
sum += val.get();
}
word2.set("[" + temp + "," + sum + "]"); //word2的格式為:[doc1,3]
if (!CurrentItem.equals(word1) && !CurrentItem.equals(" ")) {
StringBuilder out = new StringBuilder();
long count = 0;
double fileCount = 0;
for (String p : postingList) {
out.append(p);
out.append(" ");
count = count + Long.parseLong(p.substring(p.indexOf(",") + 1,p.indexOf("]")));
fileCount++;
}
out.append("[total," + count + "] ");
double average = count/fileCount;
out.append("[average,"+String.format("%.3f", average)+"].");
if (count > 0)
context.write(CurrentItem, new Text(out.toString()));
postingList = new ArrayList<String>();
}
CurrentItem = new Text(word1);
postingList.add(word2.toString());
}
public void cleanup(Context context) throws IOException,InterruptedException {
StringBuilder out = new StringBuilder();
long count = 0;
for (String p : postingList) {
out.append(p);
out.append(" ");
count = count + Long.parseLong(p.substring(p.indexOf(",") + 1,p.indexOf("]")));
}
out.append("[total," + count + "].");
if (count > 0)
context.write(CurrentItem, new Text(out.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("hdfs://namenode:9000/user/hadoop/stop_word/stop_word.txt"),conf);
Job job = new Job(conf, "inverted index");
job.setJarByClass(InvertedIndexer.class);
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setPartitionerClass(NewPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4、參考文獻
《深入理解大資料 大資料處理與程式設計實戰》主編:黃宜華老師(南京大學)