1. 程式人生 > >Hadoop中倒排索引

Hadoop中倒排索引

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.StringTokenizer;
class Map extends Mapper < Object, Text, Text, Text>{
    private Text keyinfo = new Text();
    private Text valueinfo = new Text();
    private FileSplit split;
     
    public void map ( Object key, Text value, Context context)
        throws IOException, InterruptedException{
        split = ( FileSplit ) context.getInputSplit();
        String line = value.toString();
        StringTokenizer str = new StringTokenizer( line );
        while ( str.hasMoreTokens() ){
            String fileName = split.getPath().toString();
            int splitIndex = fileName.indexOf("Inver");
            keyinfo.set( str.nextToken() + ":" + fileName.substring(splitIndex));
            valueinfo.set("1");
            context.write(keyinfo, valueinfo);
        }
    }
}
 
class Combine extends Reducer < Text, Text, Text, Text >{
    private Text info = new Text();
    public void reduce ( Text key, Iterable < Text > values, Context context )
        throws IOException, InterruptedException{
        int sum = 0;
        for ( Text value : values ){
            sum += Integer.parseInt(value.toString());
        }
        int splitindex = key.toString().indexOf(":");
        info.set(key.toString().substring(splitindex+1) + ":" + sum);
        key.set(key.toString().substring(0, splitindex));
        context.write(key, info);
    }
}
 
class Reduce extends Reducer < Text, Text, Text, Text >{
    public void reduce ( Text key, Iterable < Text > values, Context context )
        throws IOException, InterruptedException{
        String Filelist = new String();
        for ( Text value : values ){
            Filelist +=  value.toString() + ";";
        }
        context.write(key, new Text( Filelist ));
    }
}
 
public class InvertedIndex {
    public static void main ( String[] args ) throws Exception {
        Job job = new Job();
        job.setJarByClass(InvertedIndex.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Combine.class);
        job.setReducerClass(Reduce.class);
         
        job.setJobName("InvertedIndex");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
         
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
         
        System.exit(job.waitForCompletion(true)?0:1);
    }
}