1. 程式人生 > >Hadoop自帶的一些程式示例

Hadoop自帶的一些程式示例

一、PiEstimator.java

位置:E:\Hadoop\hadoop-0.20.1\src\examples\org\apache\hadoop\examples

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.Iterator;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A Map-reduce program to estimate the value of Pi
 * using quasi-Monte Carlo method.
 *
 * Mapper:
 *   Generate points in a unit square
 *   and then count points inside/outside of the inscribed circle of the square.
 *
 * Reducer:
 *   Accumulate points inside/outside results from the mappers.
 *
 * Let numTotal = numInside + numOutside.
 * The fraction numInside/numTotal is a rational approximation of
 * the value (Area of the circle)/(Area of the square),
 * where the area of the inscribed circle is Pi/4
 * and the area of unit square is 1.
 * Then, Pi is estimated value to be 4(numInside/numTotal).  
 */
public class PiEstimator extends Configured implements Tool {
  /** tmp directory for input/output */
  static private final Path TMP_DIR = new Path(
      PiEstimator.class.getSimpleName() + "_TMP_3_141592654");
  
  /** 2-dimensional Halton sequence {H(i)},
   * where H(i) is a 2-dimensional point and i >= 1 is the index.
   * Halton sequence is used to generate sample points for Pi estimation. 
   */
  private static class HaltonSequence {
    /** Bases */
    static final int[] P = {2, 3}; 
    /** Maximum number of digits allowed */
    static final int[] K = {63, 40}; 

    private long index;
    private double[] x;
    private double[][] q;
    private int[][] d;

    /** Initialize to H(startindex),
     * so the sequence begins with H(startindex+1).
     */
    HaltonSequence(long startindex) {
      index = startindex;
      x = new double[K.length];
      q = new double[K.length][];
      d = new int[K.length][];
      for(int i = 0; i < K.length; i++) {
        q[i] = new double[K[i]];
        d[i] = new int[K[i]];
      }

      for(int i = 0; i < K.length; i++) {
        long k = index;
        x[i] = 0;
        
        for(int j = 0; j < K[i]; j++) {
          q[i][j] = (j == 0? 1.0: q[i][j-1])/P[i];
          d[i][j] = (int)(k % P[i]);
          k = (k - d[i][j])/P[i];
          x[i] += d[i][j] * q[i][j];
        }
      }
    }

    /** Compute next point.
     * Assume the current point is H(index).
     * Compute H(index+1).
     * 
     * @return a 2-dimensional point with coordinates in [0,1)^2
     */
    double[] nextPoint() {
      index++;
      for(int i = 0; i < K.length; i++) {
        for(int j = 0; j < K[i]; j++) {
          d[i][j]++;
          x[i] += q[i][j];
          if (d[i][j] < P[i]) {
            break;
          }
          d[i][j] = 0;
          x[i] -= (j == 0? 1.0: q[i][j-1]);
        }
      }
      return x;
    }
  }

  /**
   * Mapper class for Pi estimation.
   * Generate points in a unit square
   * and then count points inside/outside of the inscribed circle of the square.
   */
  public static class PiMapper extends MapReduceBase
    implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {

    /** Map method.
     * @param offset samples starting from the (offset+1)th sample.
     * @param size the number of samples for this map
     * @param out output {ture->numInside, false->numOutside}
     * @param reporter
     */
    public void map(LongWritable offset,
                    LongWritable size,
                    OutputCollector<BooleanWritable, LongWritable> out,
                    Reporter reporter) throws IOException {

      final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
      long numInside = 0L;
      long numOutside = 0L;

      for(long i = 0; i < size.get(); ) {
        //generate points in a unit square
        final double[] point = haltonsequence.nextPoint();

        //count points inside/outside of the inscribed circle of the square
        final double x = point[0] - 0.5;
        final double y = point[1] - 0.5;
        if (x*x + y*y > 0.25) {
          numOutside++;
        } else {
          numInside++;
        }

        //report status
        i++;
        if (i % 1000 == 0) {
          reporter.setStatus("Generated " + i + " samples.");
        }
      }

      //output map results
      out.collect(new BooleanWritable(true), new LongWritable(numInside));
      out.collect(new BooleanWritable(false), new LongWritable(numOutside));
    }
  }

  /**
   * Reducer class for Pi estimation.
   * Accumulate points inside/outside results from the mappers.
   */
  public static class PiReducer extends MapReduceBase
    implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
    
    private long numInside = 0;
    private long numOutside = 0;
    private JobConf conf; //configuration for accessing the file system
      
    /** Store job configuration. */
    @Override
    public void configure(JobConf job) {
      conf = job;
    }

    /**
     * Accumulate number of points inside/outside results from the mappers.
     * @param isInside Is the points inside? 
     * @param values An iterator to a list of point counts
     * @param output dummy, not used here.
     * @param reporter
     */
    public void reduce(BooleanWritable isInside,
                       Iterator<LongWritable> values,
                       OutputCollector<WritableComparable<?>, Writable> output,
                       Reporter reporter) throws IOException {
      if (isInside.get()) {
        for(; values.hasNext(); numInside += values.next().get());
      } else {
        for(; values.hasNext(); numOutside += values.next().get());
      }
    }

    /**
     * Reduce task done, write output to a file.
     */
    @Override
    public void close() throws IOException {
      //write output to a file
      Path outDir = new Path(TMP_DIR, "out");
      Path outFile = new Path(outDir, "reduce-out");
      FileSystem fileSys = FileSystem.get(conf);
      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
          outFile, LongWritable.class, LongWritable.class, 
          CompressionType.NONE);
      writer.append(new LongWritable(numInside), new LongWritable(numOutside));
      writer.close();
    }
  }

  /**
   * Run a map/reduce job for estimating Pi.
   *
   * @return the estimated value of Pi
   */
  public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
      ) throws IOException {
    //setup job conf
    jobConf.setJobName(PiEstimator.class.getSimpleName());

    jobConf.setInputFormat(SequenceFileInputFormat.class);

    jobConf.setOutputKeyClass(BooleanWritable.class);
    jobConf.setOutputValueClass(LongWritable.class);
    jobConf.setOutputFormat(SequenceFileOutputFormat.class);

    jobConf.setMapperClass(PiMapper.class);
    jobConf.setNumMapTasks(numMaps);

    jobConf.setReducerClass(PiReducer.class);
    jobConf.setNumReduceTasks(1);

    // turn off speculative execution, because DFS doesn't handle
    // multiple writers to the same file.
    jobConf.setSpeculativeExecution(false);

    //setup input/output directories
    final Path inDir = new Path(TMP_DIR, "in");
    final Path outDir = new Path(TMP_DIR, "out");
    FileInputFormat.setInputPaths(jobConf, inDir);
    FileOutputFormat.setOutputPath(jobConf, outDir);

    final FileSystem fs = FileSystem.get(jobConf);
    if (fs.exists(TMP_DIR)) {
      throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
          + " already exists.  Please remove it first.");
    }
    if (!fs.mkdirs(inDir)) {
      throw new IOException("Cannot create input directory " + inDir);
    }

    try {
      //generate an input file for each map task
      for(int i=0; i < numMaps; ++i) {
        final Path file = new Path(inDir, "part"+i);
        final LongWritable offset = new LongWritable(i * numPoints);
        final LongWritable size = new LongWritable(numPoints);
        final SequenceFile.Writer writer = SequenceFile.createWriter(
            fs, jobConf, file,
            LongWritable.class, LongWritable.class, CompressionType.NONE);
        try {
          writer.append(offset, size);
        } finally {
          writer.close();
        }
        System.out.println("Wrote input for Map #"+i);
      }
  
      //start a map/reduce job
      System.out.println("Starting Job");
      final long startTime = System.currentTimeMillis();
      JobClient.runJob(jobConf);
      final double duration = (System.currentTimeMillis() - startTime)/1000.0;
      System.out.println("Job Finished in " + duration + " seconds");

      //read outputs
      Path inFile = new Path(outDir, "reduce-out");
      LongWritable numInside = new LongWritable();
      LongWritable numOutside = new LongWritable();
      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
      try {
        reader.next(numInside, numOutside);
      } finally {
        reader.close();
      }

      //compute estimated value
      return BigDecimal.valueOf(4).setScale(20)
          .multiply(BigDecimal.valueOf(numInside.get()))
          .divide(BigDecimal.valueOf(numMaps))
          .divide(BigDecimal.valueOf(numPoints));
    } finally {
      fs.delete(TMP_DIR, true);
    }
  }

  /**
   * Parse arguments and then runs a map/reduce job.
   * Print output in standard out.
   * 
   * @return a non-zero if there is an error.  Otherwise, return 0.  
   */
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
    
    final int nMaps = Integer.parseInt(args[0]);
    final long nSamples = Long.parseLong(args[1]);
        
    System.out.println("Number of Maps  = " + nMaps);
    System.out.println("Samples per Map = " + nSamples);
        
    final JobConf jobConf = new JobConf(getConf(), getClass());
    System.out.println("Estimated value of Pi is "
        + estimate(nMaps, nSamples, jobConf));
    return 0;
  }

  /**
   * main method for running it as a stand alone command. 
   */
  public static void main(String[] argv) throws Exception {
    System.exit(ToolRunner.run(null, new PiEstimator(), argv));
  }
}

二、MultiFileWordCount.java

位置:E:\Hadoop\hadoop-0.20.1\src\examples\org\apache\hadoop\examples

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MultiFileInputFormat;
import org.apache.hadoop.mapred.MultiFileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * MultiFileWordCount is an example to demonstrate the usage of 
 * MultiFileInputFormat. This examples counts the occurrences of
 * words in the text files under the given input directory.
 */
public class MultiFileWordCount extends Configured implements Tool {

  /**
   * This record keeps <filename,offset> pairs.
   */
  public static class WordOffset implements WritableComparable {

    private long offset;
    private String fileName;

    public void readFields(DataInput in) throws IOException {
      this.offset = in.readLong();
      this.fileName = Text.readString(in);
    }

    public void write(DataOutput out) throws IOException {
      out.writeLong(offset);
      Text.writeString(out, fileName);
    }

    public int compareTo(Object o) {
      WordOffset that = (WordOffset)o;

      int f = this.fileName.compareTo(that.fileName);
      if(f == 0) {
        return (int)Math.signum((double)(this.offset - that.offset));
      }
      return f;
    }
    @Override
    public boolean equals(Object obj) {
      if(obj instanceof WordOffset)
        return this.compareTo(obj) == 0;
      return false;
    }
    @Override
    public int hashCode() {
      assert false : "hashCode not designed";
      return 42; //an arbitrary constant
    }
  }


  /**
   * To use {@link MultiFileInputFormat}, one should extend it, to return a 
   * (custom) {@link RecordReader}. MultiFileInputFormat uses 
   * {@link MultiFileSplit}s. 
   */
  public static class MyInputFormat 
    extends MultiFileInputFormat<WordOffset, Text>  {

    @Override
    public RecordReader<WordOffset,Text> getRecordReader(InputSplit split
        , JobConf job, Reporter reporter) throws IOException {
      return new MultiFileLineRecordReader(job, (MultiFileSplit)split);
    }
  }

  /**
   * RecordReader is responsible from extracting records from the InputSplit. 
   * This record reader accepts a {@link MultiFileSplit}, which encapsulates several 
   * files, and no file is divided.
   */
  public static class MultiFileLineRecordReader 
    implements RecordReader<WordOffset, Text> {

    private MultiFileSplit split;
    private long offset; //total offset read so far;
    private long totLength;
    private FileSystem fs;
    private int count = 0;
    private Path[] paths;
    
    private FSDataInputStream currentStream;
    private BufferedReader currentReader;
    
    public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)
      throws IOException {
      
      this.split = split;
      fs = FileSystem.get(conf);
      this.paths = split.getPaths();
      this.totLength = split.getLength();
      this.offset = 0;
      
      //open the first file
      Path file = paths[count];
      currentStream = fs.open(file);
      currentReader = new BufferedReader(new InputStreamReader(currentStream));
    }

    public void close() throws IOException { }

    public long getPos() throws IOException {
      long currentOffset = currentStream == null ? 0 : currentStream.getPos();
      return offset + currentOffset;
    }

    public float getProgress() throws IOException {
      return ((float)getPos()) / totLength;
    }

    public boolean next(WordOffset key, Text value) throws IOException {
      if(count >= split.getNumPaths())
        return false;

      /* Read from file, fill in key and value, if we reach the end of file,
       * then open the next file and continue from there until all files are
       * consumed.  
       */
      String line;
      do {
        line = currentReader.readLine();
        if(line == null) {
          //close the file
          currentReader.close();
          offset += split.getLength(count);
          
          if(++count >= split.getNumPaths()) //if we are done
            return false;
          
          //open a new file
          Path file = paths[count];
          currentStream = fs.open(file);
          currentReader=new BufferedReader(new InputStreamReader(currentStream));
          key.fileName = file.getName();
        }
      } while(line == null);
      //update the key and value
      key.offset = currentStream.getPos();
      value.set(line);
      
      return true;
    }

    public WordOffset createKey() {
      WordOffset wo = new WordOffset();
      wo.fileName = paths[0].toString(); //set as the first file
      return wo;
    }

    public Text createValue() {
      return new Text();
    }
  }

  /**
   * This Mapper is similar to the one in {@link WordCount.MapClass}.
   */
  public static class MapClass extends MapReduceBase
    implements Mapper<WordOffset, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(WordOffset key, Text value,
        OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {
      
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }
  
  
  private void printUsage() {
    System.out.println("Usage : multifilewc <input_dir> <output>" );
  }

  public int run(String[] args) throws Exception {

    if(args.length < 2) {
      printUsage();
      return 1;
    }

    JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
    job.setJobName("MultiFileWordCount");

    //set the InputFormat of the job to our InputFormat
    job.setInputFormat(MyInputFormat.class);
    
    // the keys are words (strings)
    job.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    job.setOutputValueClass(IntWritable.class);

    //use the defined mapper
    job.setMapperClass(MapClass.class);
    //use the WordCount Reducer
    job.setCombinerClass(LongSumReducer.class);
    job.setReducerClass(LongSumReducer.class);

    FileInputFormat.addInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    JobClient.runJob(job);
    
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new MultiFileWordCount(), args);
    System.exit(ret);
  }

}

三、Python版WordCount.py

位置:E:\Hadoop\hadoop-0.20.1\src\examples\python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred import *

import sys
import getopt

class WordCountMap(Mapper, MapReduceBase):
    one = IntWritable(1)
    def map(self, key, value, output, reporter):
        for w in value.toString().split():
            output.collect(Text(w), self.one)

class Summer(Reducer, MapReduceBase):
    def reduce(self, key, values, output, reporter):
        sum = 0
        while values.hasNext():
            sum += values.next().get()
        output.collect(key, IntWritable(sum))

def printUsage(code):
    print "wordcount [-m <maps>] [-r <reduces>] <input> <output>"
    sys.exit(code)

def main(args):
    conf = JobConf(WordCountMap);
    conf.setJobName("wordcount");
 
    conf.setOutputKeyClass(Text);
    conf.setOutputValueClass(IntWritable);
    
    conf.setMapperClass(WordCountMap);        
    conf.setCombinerClass(Summer);
    conf.setReducerClass(Summer);
    try:
        flags, other_args = getopt.getopt(args[1:], "m:r:")
    except getopt.GetoptError:
        printUsage(1)
    if len(other_args) != 2:
        printUsage(1)
    
    for f,v in flags:
        if f == "-m":
            conf.setNumMapTasks(int(v))
        elif f == "-r":
            conf.setNumReduceTasks(int(v))
    conf.setInputPath(Path(other_args[0]))
    conf.setOutputPath(Path(other_args[1]))
    JobClient.runJob(conf);

if __name__ == "__main__":
    main(sys.argv)