HDPCD-Java --- Exam Test -- Map-Side Join
阿新 • • 發佈:2018-12-26
package task1; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Task1 extends Configured implements Tool{ //private static final String STATION_NAME = "StationName"; private static final String DESTINATION = "Destination"; public static class MapSideJoinMapper extends Mapper<LongWritable, Text, DateDelay, DelayWeather> { private Map<Datee, Weather> map = new HashMap<Datee, Weather>(); //private String stationName; private String destionation; @Override protected void setup( Mapper<LongWritable, Text, DateDelay, DelayWeather>.Context context) throws IOException, InterruptedException { //stationName = context.getConfiguration().get(STATION_NAME); destionation = context.getConfiguration().get(DESTINATION); BufferedReader reader = new BufferedReader(new FileReader("sfo_weather.csv")); String line; String[] wStr; Datee datee; Weather weather; while ((line = reader.readLine()) != null) { wStr = StringUtils.split(line, '\\', ','); if (wStr[1].equals("YEAR")) { continue; } datee = new Datee(Integer.parseInt(wStr[1]), Integer.parseInt(wStr[2]), Integer.parseInt(wStr[3])); weather = new Weather(Integer.parseInt(wStr[4]), Integer.parseInt(wStr[5]), Integer.parseInt(wStr[6])); map.put(datee, weather); } reader.close(); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DateDelay, DelayWeather>.Context context) throws IOException, InterruptedException { String[] delays = StringUtils.split(value.toString(), '\\', ','); DateDelay dateDelay; Datee date; if (delays[0].equals("Year")) { return; } if (delays[17].trim().equals(destionation)) { boolean xx = Utils.replaceNAWithZero(delays); if (xx) { return; } date = new Datee(Integer.parseInt(delays[0]), Integer.parseInt(delays[1]), Integer.parseInt(delays[2])); if (map.containsKey(date)) { dateDelay = new DateDelay(date, Integer.parseInt(delays[14])); FlightDelay flightDelay = new FlightDelay(Integer.parseInt(delays[4]), Integer.parseInt(delays[6]), delays[8], Integer.parseInt(delays[9]), Integer.parseInt(delays[11]), Integer.parseInt(delays[14]), Integer.parseInt(delays[15]), delays[16], delays[17]); DelayWeather delayWeather = new DelayWeather(); delayWeather.flightDelay = flightDelay; delayWeather.weather = map.get(date); context.write(dateDelay, delayWeather); } } } } public static final class MapSideJoinReducer extends Reducer<DateDelay, DelayWeather, DateDelay, DelayWeather>{ @Override protected void reduce( DateDelay key, Iterable<DelayWeather> values, Reducer<DateDelay, DelayWeather, DateDelay, DelayWeather>.Context context) throws IOException, InterruptedException { Iterator<DelayWeather> iterator = values.iterator(); while (iterator.hasNext()) { context.write(key, iterator.next()); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { int result = 0; try { result = ToolRunner.run(new Configuration(), new Task1(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(result); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "Task1"); job.setJarByClass(getClass()); Configuration conf = job.getConfiguration(); //conf.set(STATION_NAME, args[0]); conf.set(DESTINATION, args[0]); job.addCacheFile(new URI("/user/horton/weather/sfo_weather.csv")); Path out = new Path("task1"); out.getFileSystem(conf).delete(out, true); FileInputFormat.setInputPaths(job, new Path("/user/horton/flightdelays")); FileOutputFormat.setOutputPath(job, out); conf.set(TextOutputFormat.SEPERATOR, ","); job.setMapperClass(MapSideJoinMapper.class); //job.setGroupingComparatorClass(DateDelayComparator.class); job.setReducerClass(MapSideJoinReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DelayFileOutputFormat.class); job.setMapOutputKeyClass(DateDelay.class); job.setMapOutputValueClass(DelayWeather.class); job.setOutputKeyClass(DateDelay.class); job.setOutputValueClass(DelayWeather.class); job.setNumReduceTasks(2); //job.setNumReduceTasks(0); return job.waitForCompletion(true) ? 0 : 1; } }
package task1; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Datee implements WritableComparable<Datee> { public int year; public int month; public int day; public Datee() { } public Datee(int year, int month, int day) { this.year = year; this.month = month; this.day = day; } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); } @Override public void readFields(DataInput in) throws IOException { year = in.readInt(); month = in.readInt(); day = in.readInt(); } @Override public int compareTo(Datee date) { int response = this.year - date.year; if (response == 0) { response = this.month - date.month; } if (response == 0) { response = this.day - date.day; } return response; } @Override public int hashCode() { return year + month + day; } @Override public boolean equals(Object obj) { if (obj instanceof Datee) { Datee date = (Datee) obj; if (year == date.year && month == date.month && day == date.day) { return true; } } return false; } public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } @Override public String toString() { return this.year + "," + this.month + "," + this.day; } }
package task1; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Weather implements Writable { private int prcp; private int tMax; private int tMin; public Weather() { } public Weather(int prcp, int tMax, int tMin){ this.prcp = prcp; this.tMax = tMax; this.tMin = tMin; } @Override public void write(DataOutput out) throws IOException { out.writeInt(prcp); out.writeInt(tMax); out.writeInt(tMin); } @Override public void readFields(DataInput in) throws IOException { this.prcp = in.readInt(); this.tMax = in.readInt(); this.tMin = in.readInt(); } @Override public String toString() { return this.prcp + "," + this.tMax + "," + this.tMin; } }
package task1;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class DateDelay implements WritableComparable<DateDelay> {
public Datee date;
public int arriveDelay;
public DateDelay() {
}
public DateDelay(Datee date, int arriveDelay) {
this.date = date;
this.arriveDelay = arriveDelay;
}
@Override
public void write(DataOutput out) throws IOException {
date.write(out);
out.writeInt(arriveDelay);
}
@Override
public void readFields(DataInput in) throws IOException {
date = new Datee();
date.readFields(in);
arriveDelay = in.readInt();
}
@Override
public int compareTo(DateDelay dateDelay) {
int response = this.date.compareTo(dateDelay.date);
if (response == 0) {
response = dateDelay.arriveDelay - this.arriveDelay;
}
return response;
}
@Override
public String toString() {
return this.date + "," + this.arriveDelay;
}
}
package task1;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlightDelay implements Writable{
public int depTime;
public int arrTime;
public String uniqueCarrier;
public int flightNum;
public int actualElapsedTime;
public int arrDelay;
public int depDelay;
public String origin;
public String destionation;
public FlightDelay() {
}
public FlightDelay(int depTime, int arrTime, String uniqueCarrier, int flightNum,
int actualElapsedTime, int arrDelay, int depDelay, String origin, String destionation) {
this.depTime = depTime;
this.arrTime = arrTime;
this.uniqueCarrier = uniqueCarrier;
this.flightNum = flightNum;
this.actualElapsedTime = actualElapsedTime;
this.arrDelay = arrDelay;
this.depDelay = depDelay;
this.origin = origin;
this.destionation = destionation;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(depTime);
out.writeInt(arrTime);
out.writeUTF(uniqueCarrier);
out.writeInt(flightNum);
out.writeInt(actualElapsedTime);
out.writeInt(arrDelay);
out.writeInt(depDelay);
out.writeUTF(origin);
out.writeUTF(destionation);
}
@Override
public void readFields(DataInput in) throws IOException {
this.depTime = in.readInt();
this.arrTime = in.readInt();
this.uniqueCarrier = in.readUTF();
this.flightNum = in.readInt();
this.actualElapsedTime = in.readInt();
this.arrDelay = in.readInt();
this.depDelay = in.readInt();
this.origin = in.readUTF();
this.destionation = in.readUTF();
}
@Override
public String toString() {
return this.depTime + "," + this.arrTime + "," + this.uniqueCarrier + "," + this.flightNum + "," + this.actualElapsedTime + "," + this.arrDelay
+ "," + this.depDelay + "," + this.origin + "," + this.destionation;
}
}
package task1;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DelayWeather implements Writable {
public FlightDelay flightDelay;
public Weather weather;
@Override
public void write(DataOutput out) throws IOException {
flightDelay.write(out);
weather.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
flightDelay = new FlightDelay();
weather = new Weather();
flightDelay.readFields(in);
weather.readFields(in);
}
@Override
public String toString() {
return this.flightDelay + "," + this.weather;
}
}