1. 程式人生 > >HDPCD-Java-複習筆記(14)- lab

HDPCD-Java-複習筆記(14)- lab

Java lab booklet

package mapjoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapSideJoin extends Configured implements Tool {
	private static final String STOCK_SYMBOL = "stockSymbol";
	public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Stock, StockPrices> {
		private Map<Stock, Double> stocks = new HashMap<Stock, Double>();
		private String stockSymbol;
		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			stockSymbol = context.getConfiguration().get(STOCK_SYMBOL);
			BufferedReader reader = new BufferedReader(new FileReader("NYSE_dividends_A.csv"));
			String currentLine = "";
			String[] words;
			while((currentLine = reader.readLine()) != null){
				words = StringUtils.split(currentLine, '\\', ',');
				if (words[1].equals(stockSymbol)) {
					stocks.put(new Stock(words[1], words[2]), Double.parseDouble(words[3]));
				}
			}
			reader.close();
		}

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] words = StringUtils.split(value.toString(), '\\', ',');
			if (words[1].equals(stockSymbol)) {
				Stock stock = new Stock(words[1], words[2]);
				if (stocks.containsKey(stock)) {
					StockPrices stockPrices = new StockPrices(stocks.get(stock), Double.parseDouble(words[6]));
					context.write(stock, stockPrices);
				}
			}
		}
	}


	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), "MapSideJoinJob");
		job.setJarByClass(getClass());
		//Distribute LocalResouces.
		job.addCacheFile(new URI("dividends/NYSE_dividends_A.csv"));
		Configuration conf = job.getConfiguration();
		conf.set(STOCK_SYMBOL, args[0]);
		//The output to be a comma-delimited file.
		conf.set(TextOutputFormat.SEPERATOR, ",");
		

		Path out = new Path("joinoutput");
		out.getFileSystem(conf).delete(out,true);
		FileInputFormat.setInputPaths(job, new Path("stocks"));
		FileOutputFormat.setOutputPath(job, out);
		
		
		job.setMapperClass(MapSideJoinMapper.class);
		job.setInputFormatClass(TextInputFormat.class);
		
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapOutputKeyClass(Stock.class);
		job.setMapOutputValueClass(StockPrices.class);
		
		job.setNumReduceTasks(0);
		

		return job.waitForCompletion(true)?0:1;

	}


	public static void main(String[] args) {
		int result = 0;
		try {
			result = ToolRunner.run(new Configuration(),  new MapSideJoin(), args);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.exit(result);

	}

}
package mapjoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Stock implements WritableComparable<Stock> {

	private String symbol;
	private String date;
	
	public Stock(String symbol, String date) {
		this.symbol = symbol;
		this.date = date;
	}
	
	public Stock() {}

	@Override
	public boolean equals(Object obj) {
		if(obj instanceof Stock) {
			Stock other = (Stock) obj;
			if(symbol.equals(other.symbol) && date.equals(other.date)) {
				return true;
			}
		} 
		return false;
	}

	@Override
	public int hashCode() {
		return (symbol + date).hashCode();
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		symbol = in.readUTF();
		date = in.readUTF();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(symbol);
		out.writeUTF(date);
	}

	@Override
	public int compareTo(Stock arg0) {
		int response = this.symbol.compareTo(arg0.symbol);
		if(response == 0) {
			response = this.date.compareTo(arg0.date);
		}
		return response;
	}

	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

	public String getDate() {
		return date;
	}

	public void setDate(String date) {
		this.date = date;
	}

	@Override
	public String toString() {
		return symbol + "," + date;
	}

}
package mapjoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class StockPrices implements Writable {
	
	private double dividend;
	private double closingPrice;

	public StockPrices() {}
	
	public StockPrices(double dividend, double closingPrice) {
		this.dividend = dividend;
		this.closingPrice = closingPrice;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		dividend = in.readDouble();
		closingPrice = in.readDouble();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeDouble(dividend);
		out.writeDouble(closingPrice);
	}

	public double getDividend() {
		return dividend;
	}

	public void setDividend(double dividend) {
		this.dividend = dividend;
	}

	public double getClosingPrice() {
		return closingPrice;
	}

	public void setClosingPrice(double closingPrice) {
		this.closingPrice = closingPrice;
	}

	@Override
	public String toString() {
		return dividend + "," + closingPrice;
	}
}


Lab: Using a Bloom Filter

Stock’s closing prices only on the dates that a dividend price was announced for that stock.

This application consists of two MapReduce jobs: the first job will create the Bloom filter and save it in a file named filters/dividendfilter. The second job inputs the stock prices along with their corresponding dividend prices, and outputs only those stock prices that have a dividend granted on the same date (which is about once every three months for most stocks).

package bloom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class StockDividendFilter extends Configured implements Tool {
  private static final String FILTER_FILE = "filters/dividendfilter";

  private enum BloomCounters {
    FALSE_POSITIVES;  
  }

  private enum JoinData {
    DIVIDENDS, STOCKS;
  }

  public static class BloomMapper extends Mapper<LongWritable, Text, NullWritable, BloomFilter> {
    private String stockSymbol;
    private NullWritable outputKey = NullWritable.get();
    private BloomFilter outputValue;
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        stockSymbol = context.getConfiguration().get("stockSymbol");
        outputValue = new  BloomFilter(1000, 20, Hash.MURMUR_HASH);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String [] words = StringUtils.split(value.toString(),'\\',',');
        if(words[1].equals(stockSymbol)) {
            Stock stock = new Stock(words[1], words[2]);
            Key stockKey = new Key(stock.toString().getBytes());
            outputValue.add(stockKey);
        }
    }


    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
    	context.write(outputKey, outputValue);
    }   
  }

  public static class BloomReducer extends Reducer<NullWritable, BloomFilter, NullWritable, NullWritable> {
    
	private BloomFilter allValues;  
    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        allValues = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
    }

    @Override
    protected void reduce(NullWritable key, Iterable<BloomFilter> values, Context context)
            throws IOException, InterruptedException {          
    	for (BloomFilter bloomFilter : values) {
			allValues.or(bloomFilter);
		}
    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
    	Configuration conf = context.getConfiguration();
    	Path path = new Path(FILTER_FILE);
    	FSDataOutputStream out = path.getFileSystem(conf).create(path);
    	allValues.write(out);
    	out.close();
    }
  }


  public static class StockFilterMapper extends Mapper<LongWritable, Text, StockTaggedKey, DoubleWritable> {
    private BloomFilter dividends;
    private String stockSymbol;
    private DoubleWritable outputValue = new DoubleWritable();
    private Stock outputKey = new Stock();
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        Path filterFile = new Path(FILTER_FILE);
        stockSymbol = context.getConfiguration().get("stockSymbol");

        //Initialize the dividends field
        dividends = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream in = fs.open(filterFile);
        dividends.readFields(in);
        in.close();
    }
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String [] words = StringUtils.split(value.toString(),'\\',',');
        if(words[1].equals(stockSymbol)) {
            outputKey.setSymbol(words[1]);
            outputKey.setDate(words[2]);
            //Instantiate a Bloom Key using outputKey, then check for membership in the Bloom filter
            Key stockKey = new Key(outputKey.toString().getBytes());
            if(dividends.membershipTest(stockKey)){
            	outputValue.set(Double.parseDouble(words[6]));
            	context.write(new StockTaggedKey(JoinData.STOCKS.ordinal(), outputKey), outputValue);
            }
        }
    }
  }

  public static class DividendMapper extends Mapper<LongWritable, Text, StockTaggedKey, DoubleWritable> {
    private String stockSymbol;
    private DoubleWritable outputValue = new DoubleWritable();
    Stock outputKey = new Stock();

    @Override
    protected void setup(Context context) throws IOException,
    InterruptedException {
      stockSymbol = context.getConfiguration().get("stockSymbol");
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String[] words = StringUtils.split(value.toString(), '\\', ',');
      if (words[1].equals(stockSymbol)) {
        outputKey.setSymbol(words[1]);
        outputKey.setDate(words[2]);
        outputValue.set(Double.parseDouble(words[3]));
        context.write(new StockTaggedKey(JoinData.DIVIDENDS.ordinal(),
            outputKey), outputValue);
      }
    }
  }
  
  public static class StockFilterReducer extends
      Reducer<StockTaggedKey, DoubleWritable, Text, DoubleWritable> {
    private static final Logger LOG = LoggerFactory
        .getLogger(StockFilterReducer.class);
    private Text outputKey = new Text();

    @Override
    protected void reduce(StockTaggedKey key, Iterable<DoubleWritable> values,
        Context context) throws IOException, InterruptedException {
      DoubleWritable dividend = null;

      for (DoubleWritable value : values) {
        // The dividend record (if any) should appear first. Only output the
        // stock data if there's a matching dividend record. False positives
        // from the bloom filter could have caused some extra stock records to
        // be sent to the reducer
        if (key.getTag() == JoinData.DIVIDENDS.ordinal()) {
          // Copy the dividend so that the framework doesn't overwrite it the
          // next time through the loop
          dividend = new DoubleWritable(value.get());
        }
        else if (dividend != null) {
          outputKey.set(key.getKey().toString());
          context.write(outputKey, value);
        }
      }

      if (dividend == null) {
        LOG.warn("False positive detected for stock: {}", key.getKey()
            .toString());
        context.getCounter(BloomCounters.FALSE_POSITIVES).increment(1);
      }
    }

  }

  @Override
  public int run(String[] args) throws Exception {
    Job job1 = Job.getInstance(getConf(), "CreateBloomFilter");
    job1.setJarByClass(getClass());
    Configuration conf = job1.getConfiguration();
    conf.set("stockSymbol", args[0]);

    FileInputFormat.setInputPaths(job1, new Path("dividends"));

    job1.setMapperClass(BloomMapper.class);
    job1.setReducerClass(BloomReducer.class);
    job1.setInputFormatClass(TextInputFormat.class);
    job1.setOutputFormatClass(NullOutputFormat.class);
    job1.setMapOutputKeyClass(NullWritable.class);
    job1.setMapOutputValueClass(BloomFilter.class);
    job1.setOutputKeyClass(NullWritable.class);
    job1.setOutputValueClass(NullWritable.class);
    job1.setNumReduceTasks(1);

    boolean job1success = job1.waitForCompletion(true);
    if (!job1success) {
      System.out.println("The CreateBloomFilter job failed!");
      return -1;
    }

    Job job2 = Job.getInstance(conf, "FilterStocksJob");
    job2.setJarByClass(getClass());
    conf = job2.getConfiguration();

    Path out = new Path("bloomoutput");
    out.getFileSystem(conf).delete(out, true);
    FileInputFormat.setInputPaths(job2, new Path("stocks"));
    FileOutputFormat.setOutputPath(job2, out);

    Path stocks = new Path("stocks");
    Path dividends = new Path("dividends");
    MultipleInputs.addInputPath(job2, stocks, TextInputFormat.class,
        StockFilterMapper.class);
    MultipleInputs.addInputPath(job2, dividends, TextInputFormat.class,
        DividendMapper.class);
    job2.setReducerClass(StockFilterReducer.class);

    job2.setOutputFormatClass(TextOutputFormat.class);
    job2.setMapOutputKeyClass(StockTaggedKey.class);
    job2.setMapOutputValueClass(DoubleWritable.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(DoubleWritable.class);

    job2.setPartitionerClass(TaggedKeyHashPartitioner.class);
    job2.setGroupingComparatorClass(StockTaggedKeyGroupingComparator.class);

    boolean job2success = job2.waitForCompletion(true);
    if (!job2success) {
      System.out.println("The FilterStocksJob failed!");
      return -1;
    }
    return 1;
  }

  public static void main(String[] args) {
    int result = 0;
    try {
      result = ToolRunner.run(new Configuration(), new StockDividendFilter(),
          args);
    }
    catch (Exception e) {
      e.printStackTrace();
    }
    System.exit(result);

  }

}
package bloom;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Stock implements WritableComparable<Stock> {

	private String symbol;
	private String date;
	private static final String COMMA = ",";
	
	public Stock() {}
	
	public Stock(String symbol, String date) {
		this.symbol = symbol;
		this.date = date;
	}

	@Override
	public boolean equals(Object obj) {
		if(obj instanceof Stock) {
			Stock other = (Stock) obj;
			if(symbol.equals(other.symbol) && date.equals(other.date)) {
				return true;
			}
		} 
		return false;
	}

	@Override
	public int hashCode() {
		return (symbol + date).hashCode();
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		symbol = in.readUTF();
		date = in.readUTF();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(symbol);
		out.writeUTF(date);
	}

	@Override
	public int compareTo(Stock arg0) {
		int response = this.symbol.compareTo(arg0.symbol);
		if(response == 0) {
			response = this.date.compareTo(arg0.date);
		}
		return response;
	}

	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

	public String getDate() {
		return date;
	}

	public void setDate(String date) {
		this.date = date;
	}

	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		sb.append(symbol).append(COMMA).append(date);
		return sb.toString();
	}
}
package bloom;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public abstract class TaggedKey<K extends WritableComparable<K>> implements
    WritableComparable<TaggedKey<K>> {
  private int tag = 0;

  public TaggedKey() {
  }

  public TaggedKey(int tag) {
    this.tag = tag;
  }

  public int getTag() {
    return tag;
  }

  public abstract K getKey();
   
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeInt(tag);
    getKey().write(out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    tag = in.readInt();
    getKey().readFields(in);
  }
  
  @Override
  public int compareTo(TaggedKey<K> o) {
    int result = getKey().compareTo(o.getKey());
    return result != 0 ? result : (tag - o.tag);
  }

  @Override
  public String toString() {
    return String.format("%s: %s\n", tag, getKey());
  }

}
package bloom;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public abstract class TaggedKeyGroupingComparator extends WritableComparator {
  public TaggedKeyGroupingComparator(Class<? extends WritableComparable<?>> keyClass) {
    super(keyClass, true);
  }
  
  @SuppressWarnings({ "rawtypes", "unchecked" })
  @Override
  public int compare(WritableComparable a, WritableComparable b) {
    TaggedKey lhs = (TaggedKey) a;
    TaggedKey rhs = (TaggedKey) b;
    return lhs.getKey().compareTo(rhs.getKey());
  }

}
package bloom;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class TaggedKeyHashPartitioner<K extends WritableComparable<K>, V>
    extends Partitioner<TaggedKey<K>, V> {

  HashPartitioner<K, V> partitioner = new HashPartitioner<K, V>();

  @Override
  public int getPartition(TaggedKey<K> key, V value, int numPartitions) {
    return partitioner.getPartition(key.getKey(), value, numPartitions);
  }
}
package bloom;

public class StockTaggedKey extends TaggedKey<Stock> {

  protected Stock key;
  
  public StockTaggedKey() {
    key = new Stock();
  }
  
  public StockTaggedKey(int tag, Stock key) {
    super(tag);
    this.key = key;
  }
 
  @Override
  public Stock getKey() {
    return key;
  }
  
}
package bloom;

public class StockTaggedKeyGroupingComparator extends TaggedKeyGroupingComparator {

  public StockTaggedKeyGroupingComparator() {
    super(StockTaggedKey.class);
  }
}