Hadoop自帶的一些程式示例
阿新 • • 發佈:2019-02-17
一、PiEstimator.java
位置:E:\Hadoop\hadoop-0.20.1\src\examples\org\apache\hadoop\examples
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.examples; import java.io.IOException; import java.math.BigDecimal; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * A Map-reduce program to estimate the value of Pi * using quasi-Monte Carlo method. * * Mapper: * Generate points in a unit square * and then count points inside/outside of the inscribed circle of the square. * * Reducer: * Accumulate points inside/outside results from the mappers. * * Let numTotal = numInside + numOutside. * The fraction numInside/numTotal is a rational approximation of * the value (Area of the circle)/(Area of the square), * where the area of the inscribed circle is Pi/4 * and the area of unit square is 1. * Then, Pi is estimated value to be 4(numInside/numTotal). */ public class PiEstimator extends Configured implements Tool { /** tmp directory for input/output */ static private final Path TMP_DIR = new Path( PiEstimator.class.getSimpleName() + "_TMP_3_141592654"); /** 2-dimensional Halton sequence {H(i)}, * where H(i) is a 2-dimensional point and i >= 1 is the index. * Halton sequence is used to generate sample points for Pi estimation. */ private static class HaltonSequence { /** Bases */ static final int[] P = {2, 3}; /** Maximum number of digits allowed */ static final int[] K = {63, 40}; private long index; private double[] x; private double[][] q; private int[][] d; /** Initialize to H(startindex), * so the sequence begins with H(startindex+1). */ HaltonSequence(long startindex) { index = startindex; x = new double[K.length]; q = new double[K.length][]; d = new int[K.length][]; for(int i = 0; i < K.length; i++) { q[i] = new double[K[i]]; d[i] = new int[K[i]]; } for(int i = 0; i < K.length; i++) { long k = index; x[i] = 0; for(int j = 0; j < K[i]; j++) { q[i][j] = (j == 0? 1.0: q[i][j-1])/P[i]; d[i][j] = (int)(k % P[i]); k = (k - d[i][j])/P[i]; x[i] += d[i][j] * q[i][j]; } } } /** Compute next point. * Assume the current point is H(index). * Compute H(index+1). * * @return a 2-dimensional point with coordinates in [0,1)^2 */ double[] nextPoint() { index++; for(int i = 0; i < K.length; i++) { for(int j = 0; j < K[i]; j++) { d[i][j]++; x[i] += q[i][j]; if (d[i][j] < P[i]) { break; } d[i][j] = 0; x[i] -= (j == 0? 1.0: q[i][j-1]); } } return x; } } /** * Mapper class for Pi estimation. * Generate points in a unit square * and then count points inside/outside of the inscribed circle of the square. */ public static class PiMapper extends MapReduceBase implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> { /** Map method. * @param offset samples starting from the (offset+1)th sample. * @param size the number of samples for this map * @param out output {ture->numInside, false->numOutside} * @param reporter */ public void map(LongWritable offset, LongWritable size, OutputCollector<BooleanWritable, LongWritable> out, Reporter reporter) throws IOException { final HaltonSequence haltonsequence = new HaltonSequence(offset.get()); long numInside = 0L; long numOutside = 0L; for(long i = 0; i < size.get(); ) { //generate points in a unit square final double[] point = haltonsequence.nextPoint(); //count points inside/outside of the inscribed circle of the square final double x = point[0] - 0.5; final double y = point[1] - 0.5; if (x*x + y*y > 0.25) { numOutside++; } else { numInside++; } //report status i++; if (i % 1000 == 0) { reporter.setStatus("Generated " + i + " samples."); } } //output map results out.collect(new BooleanWritable(true), new LongWritable(numInside)); out.collect(new BooleanWritable(false), new LongWritable(numOutside)); } } /** * Reducer class for Pi estimation. * Accumulate points inside/outside results from the mappers. */ public static class PiReducer extends MapReduceBase implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> { private long numInside = 0; private long numOutside = 0; private JobConf conf; //configuration for accessing the file system /** Store job configuration. */ @Override public void configure(JobConf job) { conf = job; } /** * Accumulate number of points inside/outside results from the mappers. * @param isInside Is the points inside? * @param values An iterator to a list of point counts * @param output dummy, not used here. * @param reporter */ public void reduce(BooleanWritable isInside, Iterator<LongWritable> values, OutputCollector<WritableComparable<?>, Writable> output, Reporter reporter) throws IOException { if (isInside.get()) { for(; values.hasNext(); numInside += values.next().get()); } else { for(; values.hasNext(); numOutside += values.next().get()); } } /** * Reduce task done, write output to a file. */ @Override public void close() throws IOException { //write output to a file Path outDir = new Path(TMP_DIR, "out"); Path outFile = new Path(outDir, "reduce-out"); FileSystem fileSys = FileSystem.get(conf); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, LongWritable.class, LongWritable.class, CompressionType.NONE); writer.append(new LongWritable(numInside), new LongWritable(numOutside)); writer.close(); } } /** * Run a map/reduce job for estimating Pi. * * @return the estimated value of Pi */ public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf ) throws IOException { //setup job conf jobConf.setJobName(PiEstimator.class.getSimpleName()); jobConf.setInputFormat(SequenceFileInputFormat.class); jobConf.setOutputKeyClass(BooleanWritable.class); jobConf.setOutputValueClass(LongWritable.class); jobConf.setOutputFormat(SequenceFileOutputFormat.class); jobConf.setMapperClass(PiMapper.class); jobConf.setNumMapTasks(numMaps); jobConf.setReducerClass(PiReducer.class); jobConf.setNumReduceTasks(1); // turn off speculative execution, because DFS doesn't handle // multiple writers to the same file. jobConf.setSpeculativeExecution(false); //setup input/output directories final Path inDir = new Path(TMP_DIR, "in"); final Path outDir = new Path(TMP_DIR, "out"); FileInputFormat.setInputPaths(jobConf, inDir); FileOutputFormat.setOutputPath(jobConf, outDir); final FileSystem fs = FileSystem.get(jobConf); if (fs.exists(TMP_DIR)) { throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR) + " already exists. Please remove it first."); } if (!fs.mkdirs(inDir)) { throw new IOException("Cannot create input directory " + inDir); } try { //generate an input file for each map task for(int i=0; i < numMaps; ++i) { final Path file = new Path(inDir, "part"+i); final LongWritable offset = new LongWritable(i * numPoints); final LongWritable size = new LongWritable(numPoints); final SequenceFile.Writer writer = SequenceFile.createWriter( fs, jobConf, file, LongWritable.class, LongWritable.class, CompressionType.NONE); try { writer.append(offset, size); } finally { writer.close(); } System.out.println("Wrote input for Map #"+i); } //start a map/reduce job System.out.println("Starting Job"); final long startTime = System.currentTimeMillis(); JobClient.runJob(jobConf); final double duration = (System.currentTimeMillis() - startTime)/1000.0; System.out.println("Job Finished in " + duration + " seconds"); //read outputs Path inFile = new Path(outDir, "reduce-out"); LongWritable numInside = new LongWritable(); LongWritable numOutside = new LongWritable(); SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf); try { reader.next(numInside, numOutside); } finally { reader.close(); } //compute estimated value return BigDecimal.valueOf(4).setScale(20) .multiply(BigDecimal.valueOf(numInside.get())) .divide(BigDecimal.valueOf(numMaps)) .divide(BigDecimal.valueOf(numPoints)); } finally { fs.delete(TMP_DIR, true); } } /** * Parse arguments and then runs a map/reduce job. * Print output in standard out. * * @return a non-zero if there is an error. Otherwise, return 0. */ public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>"); ToolRunner.printGenericCommandUsage(System.err); return -1; } final int nMaps = Integer.parseInt(args[0]); final long nSamples = Long.parseLong(args[1]); System.out.println("Number of Maps = " + nMaps); System.out.println("Samples per Map = " + nSamples); final JobConf jobConf = new JobConf(getConf(), getClass()); System.out.println("Estimated value of Pi is " + estimate(nMaps, nSamples, jobConf)); return 0; } /** * main method for running it as a stand alone command. */ public static void main(String[] argv) throws Exception { System.exit(ToolRunner.run(null, new PiEstimator(), argv)); } }
二、MultiFileWordCount.java
位置:E:\Hadoop\hadoop-0.20.1\src\examples\org\apache\hadoop\examples
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.examples; import java.io.BufferedReader; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.InputStreamReader; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.MultiFileInputFormat; import org.apache.hadoop.mapred.MultiFileSplit; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.LongSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * MultiFileWordCount is an example to demonstrate the usage of * MultiFileInputFormat. This examples counts the occurrences of * words in the text files under the given input directory. */ public class MultiFileWordCount extends Configured implements Tool { /** * This record keeps <filename,offset> pairs. */ public static class WordOffset implements WritableComparable { private long offset; private String fileName; public void readFields(DataInput in) throws IOException { this.offset = in.readLong(); this.fileName = Text.readString(in); } public void write(DataOutput out) throws IOException { out.writeLong(offset); Text.writeString(out, fileName); } public int compareTo(Object o) { WordOffset that = (WordOffset)o; int f = this.fileName.compareTo(that.fileName); if(f == 0) { return (int)Math.signum((double)(this.offset - that.offset)); } return f; } @Override public boolean equals(Object obj) { if(obj instanceof WordOffset) return this.compareTo(obj) == 0; return false; } @Override public int hashCode() { assert false : "hashCode not designed"; return 42; //an arbitrary constant } } /** * To use {@link MultiFileInputFormat}, one should extend it, to return a * (custom) {@link RecordReader}. MultiFileInputFormat uses * {@link MultiFileSplit}s. */ public static class MyInputFormat extends MultiFileInputFormat<WordOffset, Text> { @Override public RecordReader<WordOffset,Text> getRecordReader(InputSplit split , JobConf job, Reporter reporter) throws IOException { return new MultiFileLineRecordReader(job, (MultiFileSplit)split); } } /** * RecordReader is responsible from extracting records from the InputSplit. * This record reader accepts a {@link MultiFileSplit}, which encapsulates several * files, and no file is divided. */ public static class MultiFileLineRecordReader implements RecordReader<WordOffset, Text> { private MultiFileSplit split; private long offset; //total offset read so far; private long totLength; private FileSystem fs; private int count = 0; private Path[] paths; private FSDataInputStream currentStream; private BufferedReader currentReader; public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split) throws IOException { this.split = split; fs = FileSystem.get(conf); this.paths = split.getPaths(); this.totLength = split.getLength(); this.offset = 0; //open the first file Path file = paths[count]; currentStream = fs.open(file); currentReader = new BufferedReader(new InputStreamReader(currentStream)); } public void close() throws IOException { } public long getPos() throws IOException { long currentOffset = currentStream == null ? 0 : currentStream.getPos(); return offset + currentOffset; } public float getProgress() throws IOException { return ((float)getPos()) / totLength; } public boolean next(WordOffset key, Text value) throws IOException { if(count >= split.getNumPaths()) return false; /* Read from file, fill in key and value, if we reach the end of file, * then open the next file and continue from there until all files are * consumed. */ String line; do { line = currentReader.readLine(); if(line == null) { //close the file currentReader.close(); offset += split.getLength(count); if(++count >= split.getNumPaths()) //if we are done return false; //open a new file Path file = paths[count]; currentStream = fs.open(file); currentReader=new BufferedReader(new InputStreamReader(currentStream)); key.fileName = file.getName(); } } while(line == null); //update the key and value key.offset = currentStream.getPos(); value.set(line); return true; } public WordOffset createKey() { WordOffset wo = new WordOffset(); wo.fileName = paths[0].toString(); //set as the first file return wo; } public Text createValue() { return new Text(); } } /** * This Mapper is similar to the one in {@link WordCount.MapClass}. */ public static class MapClass extends MapReduceBase implements Mapper<WordOffset, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(WordOffset key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } private void printUsage() { System.out.println("Usage : multifilewc <input_dir> <output>" ); } public int run(String[] args) throws Exception { if(args.length < 2) { printUsage(); return 1; } JobConf job = new JobConf(getConf(), MultiFileWordCount.class); job.setJobName("MultiFileWordCount"); //set the InputFormat of the job to our InputFormat job.setInputFormat(MyInputFormat.class); // the keys are words (strings) job.setOutputKeyClass(Text.class); // the values are counts (ints) job.setOutputValueClass(IntWritable.class); //use the defined mapper job.setMapperClass(MapClass.class); //use the WordCount Reducer job.setCombinerClass(LongSumReducer.class); job.setReducerClass(LongSumReducer.class); FileInputFormat.addInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new MultiFileWordCount(), args); System.exit(ret); } }
三、Python版WordCount.py
位置:E:\Hadoop\hadoop-0.20.1\src\examples\python
# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from org.apache.hadoop.fs import Path from org.apache.hadoop.io import * from org.apache.hadoop.mapred import * import sys import getopt class WordCountMap(Mapper, MapReduceBase): one = IntWritable(1) def map(self, key, value, output, reporter): for w in value.toString().split(): output.collect(Text(w), self.one) class Summer(Reducer, MapReduceBase): def reduce(self, key, values, output, reporter): sum = 0 while values.hasNext(): sum += values.next().get() output.collect(key, IntWritable(sum)) def printUsage(code): print "wordcount [-m <maps>] [-r <reduces>] <input> <output>" sys.exit(code) def main(args): conf = JobConf(WordCountMap); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text); conf.setOutputValueClass(IntWritable); conf.setMapperClass(WordCountMap); conf.setCombinerClass(Summer); conf.setReducerClass(Summer); try: flags, other_args = getopt.getopt(args[1:], "m:r:") except getopt.GetoptError: printUsage(1) if len(other_args) != 2: printUsage(1) for f,v in flags: if f == "-m": conf.setNumMapTasks(int(v)) elif f == "-r": conf.setNumReduceTasks(int(v)) conf.setInputPath(Path(other_args[0])) conf.setOutputPath(Path(other_args[1])) JobClient.runJob(conf); if __name__ == "__main__": main(sys.argv)