mapreduce演算法之倒排索引
阿新 • • 發佈:2018-12-30
package mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class indexSearch {
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
private Text keyInfo = new Text(); // 儲存單詞和URI的組合
private Text valueInfo = new Text(); //儲存詞頻
private FileSplit split; // 儲存split物件。
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//獲得<key,value>對所屬的FileSplit物件。
split = (FileSplit) context.getInputSplit();
System.out.println("偏移量"+key);
System.out.println("值"+value);
//StringTokenizer是用來把字串擷取成一個個標記或單詞的,預設是空格或多個空格(\t\n\r等等)擷取
StringTokenizer itr = new StringTokenizer( value.toString());
while( itr.hasMoreTokens() ){
// key值由單詞和URI組成。
keyInfo.set( itr.nextToken()+":"+split.getPath().getName().toString());
//詞頻初始為1
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
System.out.println("key"+keyInfo);
System.out.println("value"+valueInfo);
}
}
public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text info = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString() );
}
int splitIndex = key.toString().indexOf(":");
//重新設定value值由URI和詞頻組成
info.set( key.toString().substring( splitIndex + 1) +":"+sum );
//重新設定key值為單詞
key.set( key.toString().substring(0,splitIndex));
context.write(key, info);
System.out.println("key"+key);
System.out.println("value"+info);
}
}
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//生成文件列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString()+";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"InvertedIndex");
job.setJarByClass(indexSearch.class);
//實現map函式,根據輸入的<key,value>對生成中間結果。
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//我把那兩個檔案上傳到這個index目錄下了
FileInputFormat.addInputPath(job, new Path("hdfs://192.168.120.128:9000/input/"));
//把結果輸出到out_index+時間戳的目錄下
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.120.128:9000/out_index"+System.currentTimeMillis()+"/"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class indexSearch {
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
private Text keyInfo = new Text(); // 儲存單詞和URI的組合
private Text valueInfo = new Text(); //儲存詞頻
private FileSplit split; // 儲存split物件。
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//獲得<key,value>對所屬的FileSplit物件。
split = (FileSplit) context.getInputSplit();
System.out.println("偏移量"+key);
System.out.println("值"+value);
//StringTokenizer是用來把字串擷取成一個個標記或單詞的,預設是空格或多個空格(\t\n\r等等)擷取
StringTokenizer itr = new StringTokenizer( value.toString());
while( itr.hasMoreTokens() ){
// key值由單詞和URI組成。
keyInfo.set( itr.nextToken()+":"+split.getPath().getName().toString());
//詞頻初始為1
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
System.out.println("key"+keyInfo);
System.out.println("value"+valueInfo);
}
}
public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text info = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//統計詞頻
for (Text value : values) {
sum += Integer.parseInt(value.toString() );
}
int splitIndex = key.toString().indexOf(":");
//重新設定value值由URI和詞頻組成
info.set( key.toString().substring( splitIndex + 1) +":"+sum );
//重新設定key值為單詞
key.set( key.toString().substring(0,splitIndex));
context.write(key, info);
System.out.println("key"+key);
System.out.println("value"+info);
}
}
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//生成文件列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString()+";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"InvertedIndex");
job.setJarByClass(indexSearch.class);
//實現map函式,根據輸入的<key,value>對生成中間結果。
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//我把那兩個檔案上傳到這個index目錄下了
FileInputFormat.addInputPath(job, new Path("hdfs://192.168.120.128:9000/input/"));
//把結果輸出到out_index+時間戳的目錄下
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.120.128:9000/out_index"+System.currentTimeMillis()+"/"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}