HDPCD-Java-複習筆記(14)- lab
阿新 • • 發佈:2018-12-26
Java lab booklet
package mapjoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapSideJoin extends Configured implements Tool { private static final String STOCK_SYMBOL = "stockSymbol"; public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Stock, StockPrices> { private Map<Stock, Double> stocks = new HashMap<Stock, Double>(); private String stockSymbol; @Override protected void setup(Context context) throws IOException, InterruptedException { stockSymbol = context.getConfiguration().get(STOCK_SYMBOL); BufferedReader reader = new BufferedReader(new FileReader("NYSE_dividends_A.csv")); String currentLine = ""; String[] words; while((currentLine = reader.readLine()) != null){ words = StringUtils.split(currentLine, '\\', ','); if (words[1].equals(stockSymbol)) { stocks.put(new Stock(words[1], words[2]), Double.parseDouble(words[3])); } } reader.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = StringUtils.split(value.toString(), '\\', ','); if (words[1].equals(stockSymbol)) { Stock stock = new Stock(words[1], words[2]); if (stocks.containsKey(stock)) { StockPrices stockPrices = new StockPrices(stocks.get(stock), Double.parseDouble(words[6])); context.write(stock, stockPrices); } } } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "MapSideJoinJob"); job.setJarByClass(getClass()); //Distribute LocalResouces. job.addCacheFile(new URI("dividends/NYSE_dividends_A.csv")); Configuration conf = job.getConfiguration(); conf.set(STOCK_SYMBOL, args[0]); //The output to be a comma-delimited file. conf.set(TextOutputFormat.SEPERATOR, ","); Path out = new Path("joinoutput"); out.getFileSystem(conf).delete(out,true); FileInputFormat.setInputPaths(job, new Path("stocks")); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapSideJoinMapper.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Stock.class); job.setMapOutputValueClass(StockPrices.class); job.setNumReduceTasks(0); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) { int result = 0; try { result = ToolRunner.run(new Configuration(), new MapSideJoin(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(result); } }
package mapjoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Stock implements WritableComparable<Stock> { private String symbol; private String date; public Stock(String symbol, String date) { this.symbol = symbol; this.date = date; } public Stock() {} @Override public boolean equals(Object obj) { if(obj instanceof Stock) { Stock other = (Stock) obj; if(symbol.equals(other.symbol) && date.equals(other.date)) { return true; } } return false; } @Override public int hashCode() { return (symbol + date).hashCode(); } @Override public void readFields(DataInput in) throws IOException { symbol = in.readUTF(); date = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(symbol); out.writeUTF(date); } @Override public int compareTo(Stock arg0) { int response = this.symbol.compareTo(arg0.symbol); if(response == 0) { response = this.date.compareTo(arg0.date); } return response; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } @Override public String toString() { return symbol + "," + date; } }
package mapjoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class StockPrices implements Writable { private double dividend; private double closingPrice; public StockPrices() {} public StockPrices(double dividend, double closingPrice) { this.dividend = dividend; this.closingPrice = closingPrice; } @Override public void readFields(DataInput in) throws IOException { dividend = in.readDouble(); closingPrice = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeDouble(dividend); out.writeDouble(closingPrice); } public double getDividend() { return dividend; } public void setDividend(double dividend) { this.dividend = dividend; } public double getClosingPrice() { return closingPrice; } public void setClosingPrice(double closingPrice) { this.closingPrice = closingPrice; } @Override public String toString() { return dividend + "," + closingPrice; } }
Lab: Using a Bloom Filter
Stock’s closing prices only on the dates that a dividend price was announced for that stock.
This application consists of two MapReduce jobs: the first job will create the Bloom filter and save it in a file named filters/dividendfilter. The second job inputs the stock prices along with their
corresponding dividend prices, and outputs only those stock prices that have a dividend granted on the same date (which is about once every three months for most stocks).
package bloom;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StockDividendFilter extends Configured implements Tool {
private static final String FILTER_FILE = "filters/dividendfilter";
private enum BloomCounters {
FALSE_POSITIVES;
}
private enum JoinData {
DIVIDENDS, STOCKS;
}
public static class BloomMapper extends Mapper<LongWritable, Text, NullWritable, BloomFilter> {
private String stockSymbol;
private NullWritable outputKey = NullWritable.get();
private BloomFilter outputValue;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");
outputValue = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] words = StringUtils.split(value.toString(),'\\',',');
if(words[1].equals(stockSymbol)) {
Stock stock = new Stock(words[1], words[2]);
Key stockKey = new Key(stock.toString().getBytes());
outputValue.add(stockKey);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(outputKey, outputValue);
}
}
public static class BloomReducer extends Reducer<NullWritable, BloomFilter, NullWritable, NullWritable> {
private BloomFilter allValues;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
allValues = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
}
@Override
protected void reduce(NullWritable key, Iterable<BloomFilter> values, Context context)
throws IOException, InterruptedException {
for (BloomFilter bloomFilter : values) {
allValues.or(bloomFilter);
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
Path path = new Path(FILTER_FILE);
FSDataOutputStream out = path.getFileSystem(conf).create(path);
allValues.write(out);
out.close();
}
}
public static class StockFilterMapper extends Mapper<LongWritable, Text, StockTaggedKey, DoubleWritable> {
private BloomFilter dividends;
private String stockSymbol;
private DoubleWritable outputValue = new DoubleWritable();
private Stock outputKey = new Stock();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Path filterFile = new Path(FILTER_FILE);
stockSymbol = context.getConfiguration().get("stockSymbol");
//Initialize the dividends field
dividends = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream in = fs.open(filterFile);
dividends.readFields(in);
in.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] words = StringUtils.split(value.toString(),'\\',',');
if(words[1].equals(stockSymbol)) {
outputKey.setSymbol(words[1]);
outputKey.setDate(words[2]);
//Instantiate a Bloom Key using outputKey, then check for membership in the Bloom filter
Key stockKey = new Key(outputKey.toString().getBytes());
if(dividends.membershipTest(stockKey)){
outputValue.set(Double.parseDouble(words[6]));
context.write(new StockTaggedKey(JoinData.STOCKS.ordinal(), outputKey), outputValue);
}
}
}
}
public static class DividendMapper extends Mapper<LongWritable, Text, StockTaggedKey, DoubleWritable> {
private String stockSymbol;
private DoubleWritable outputValue = new DoubleWritable();
Stock outputKey = new Stock();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), '\\', ',');
if (words[1].equals(stockSymbol)) {
outputKey.setSymbol(words[1]);
outputKey.setDate(words[2]);
outputValue.set(Double.parseDouble(words[3]));
context.write(new StockTaggedKey(JoinData.DIVIDENDS.ordinal(),
outputKey), outputValue);
}
}
}
public static class StockFilterReducer extends
Reducer<StockTaggedKey, DoubleWritable, Text, DoubleWritable> {
private static final Logger LOG = LoggerFactory
.getLogger(StockFilterReducer.class);
private Text outputKey = new Text();
@Override
protected void reduce(StockTaggedKey key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
DoubleWritable dividend = null;
for (DoubleWritable value : values) {
// The dividend record (if any) should appear first. Only output the
// stock data if there's a matching dividend record. False positives
// from the bloom filter could have caused some extra stock records to
// be sent to the reducer
if (key.getTag() == JoinData.DIVIDENDS.ordinal()) {
// Copy the dividend so that the framework doesn't overwrite it the
// next time through the loop
dividend = new DoubleWritable(value.get());
}
else if (dividend != null) {
outputKey.set(key.getKey().toString());
context.write(outputKey, value);
}
}
if (dividend == null) {
LOG.warn("False positive detected for stock: {}", key.getKey()
.toString());
context.getCounter(BloomCounters.FALSE_POSITIVES).increment(1);
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job1 = Job.getInstance(getConf(), "CreateBloomFilter");
job1.setJarByClass(getClass());
Configuration conf = job1.getConfiguration();
conf.set("stockSymbol", args[0]);
FileInputFormat.setInputPaths(job1, new Path("dividends"));
job1.setMapperClass(BloomMapper.class);
job1.setReducerClass(BloomReducer.class);
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(NullOutputFormat.class);
job1.setMapOutputKeyClass(NullWritable.class);
job1.setMapOutputValueClass(BloomFilter.class);
job1.setOutputKeyClass(NullWritable.class);
job1.setOutputValueClass(NullWritable.class);
job1.setNumReduceTasks(1);
boolean job1success = job1.waitForCompletion(true);
if (!job1success) {
System.out.println("The CreateBloomFilter job failed!");
return -1;
}
Job job2 = Job.getInstance(conf, "FilterStocksJob");
job2.setJarByClass(getClass());
conf = job2.getConfiguration();
Path out = new Path("bloomoutput");
out.getFileSystem(conf).delete(out, true);
FileInputFormat.setInputPaths(job2, new Path("stocks"));
FileOutputFormat.setOutputPath(job2, out);
Path stocks = new Path("stocks");
Path dividends = new Path("dividends");
MultipleInputs.addInputPath(job2, stocks, TextInputFormat.class,
StockFilterMapper.class);
MultipleInputs.addInputPath(job2, dividends, TextInputFormat.class,
DividendMapper.class);
job2.setReducerClass(StockFilterReducer.class);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setMapOutputKeyClass(StockTaggedKey.class);
job2.setMapOutputValueClass(DoubleWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(DoubleWritable.class);
job2.setPartitionerClass(TaggedKeyHashPartitioner.class);
job2.setGroupingComparatorClass(StockTaggedKeyGroupingComparator.class);
boolean job2success = job2.waitForCompletion(true);
if (!job2success) {
System.out.println("The FilterStocksJob failed!");
return -1;
}
return 1;
}
public static void main(String[] args) {
int result = 0;
try {
result = ToolRunner.run(new Configuration(), new StockDividendFilter(),
args);
}
catch (Exception e) {
e.printStackTrace();
}
System.exit(result);
}
}
package bloom;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Stock implements WritableComparable<Stock> {
private String symbol;
private String date;
private static final String COMMA = ",";
public Stock() {}
public Stock(String symbol, String date) {
this.symbol = symbol;
this.date = date;
}
@Override
public boolean equals(Object obj) {
if(obj instanceof Stock) {
Stock other = (Stock) obj;
if(symbol.equals(other.symbol) && date.equals(other.date)) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return (symbol + date).hashCode();
}
@Override
public void readFields(DataInput in) throws IOException {
symbol = in.readUTF();
date = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(symbol);
out.writeUTF(date);
}
@Override
public int compareTo(Stock arg0) {
int response = this.symbol.compareTo(arg0.symbol);
if(response == 0) {
response = this.date.compareTo(arg0.date);
}
return response;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(symbol).append(COMMA).append(date);
return sb.toString();
}
}
package bloom;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public abstract class TaggedKey<K extends WritableComparable<K>> implements
WritableComparable<TaggedKey<K>> {
private int tag = 0;
public TaggedKey() {
}
public TaggedKey(int tag) {
this.tag = tag;
}
public int getTag() {
return tag;
}
public abstract K getKey();
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(tag);
getKey().write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
tag = in.readInt();
getKey().readFields(in);
}
@Override
public int compareTo(TaggedKey<K> o) {
int result = getKey().compareTo(o.getKey());
return result != 0 ? result : (tag - o.tag);
}
@Override
public String toString() {
return String.format("%s: %s\n", tag, getKey());
}
}
package bloom;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public abstract class TaggedKeyGroupingComparator extends WritableComparator {
public TaggedKeyGroupingComparator(Class<? extends WritableComparable<?>> keyClass) {
super(keyClass, true);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public int compare(WritableComparable a, WritableComparable b) {
TaggedKey lhs = (TaggedKey) a;
TaggedKey rhs = (TaggedKey) b;
return lhs.getKey().compareTo(rhs.getKey());
}
}
package bloom;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class TaggedKeyHashPartitioner<K extends WritableComparable<K>, V>
extends Partitioner<TaggedKey<K>, V> {
HashPartitioner<K, V> partitioner = new HashPartitioner<K, V>();
@Override
public int getPartition(TaggedKey<K> key, V value, int numPartitions) {
return partitioner.getPartition(key.getKey(), value, numPartitions);
}
}
package bloom;
public class StockTaggedKey extends TaggedKey<Stock> {
protected Stock key;
public StockTaggedKey() {
key = new Stock();
}
public StockTaggedKey(int tag, Stock key) {
super(tag);
this.key = key;
}
@Override
public Stock getKey() {
return key;
}
}
package bloom;
public class StockTaggedKeyGroupingComparator extends TaggedKeyGroupingComparator {
public StockTaggedKeyGroupingComparator() {
super(StockTaggedKey.class);
}
}