1. 程式人生 > >HDPCD-Java --- Exam Test -- Map-Side Join

HDPCD-Java --- Exam Test -- Map-Side Join

package task1;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Task1 extends Configured implements Tool{
	//private static final String STATION_NAME = "StationName";
	private static final String DESTINATION = "Destination";
	public static class MapSideJoinMapper extends Mapper<LongWritable, Text, DateDelay, DelayWeather> {
		private Map<Datee, Weather> map = new HashMap<Datee, Weather>();
		//private String stationName;
		private String destionation;
		@Override
		protected void setup(
				Mapper<LongWritable, Text, DateDelay, DelayWeather>.Context context)
				throws IOException, InterruptedException {
			//stationName = context.getConfiguration().get(STATION_NAME);
			destionation = context.getConfiguration().get(DESTINATION);
			BufferedReader reader = new BufferedReader(new FileReader("sfo_weather.csv"));
			String line;
			String[] wStr;
			Datee datee;
			Weather weather;
			while ((line = reader.readLine()) != null) {
				wStr = StringUtils.split(line, '\\', ',');
				if (wStr[1].equals("YEAR")) {
					continue;
				}
				datee = new Datee(Integer.parseInt(wStr[1]), Integer.parseInt(wStr[2]), Integer.parseInt(wStr[3]));
				weather = new Weather(Integer.parseInt(wStr[4]), Integer.parseInt(wStr[5]), Integer.parseInt(wStr[6]));
				map.put(datee, weather);
				
			}
			reader.close();
		}
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, DateDelay, DelayWeather>.Context context)
				throws IOException, InterruptedException {
			String[] delays = StringUtils.split(value.toString(), '\\', ',');
			DateDelay dateDelay;
			Datee date;
			if (delays[0].equals("Year")) {
				return;
			}
			if (delays[17].trim().equals(destionation)) {
				boolean xx =  Utils.replaceNAWithZero(delays);
				if (xx) {
					return;
				}
				date = new Datee(Integer.parseInt(delays[0]), Integer.parseInt(delays[1]), Integer.parseInt(delays[2]));
				if (map.containsKey(date)) {
					dateDelay = new DateDelay(date, Integer.parseInt(delays[14]));
					FlightDelay flightDelay = new FlightDelay(Integer.parseInt(delays[4]), Integer.parseInt(delays[6]), delays[8], 
							Integer.parseInt(delays[9]), Integer.parseInt(delays[11]), Integer.parseInt(delays[14]),
							Integer.parseInt(delays[15]), delays[16], delays[17]);
					DelayWeather delayWeather = new DelayWeather();
					delayWeather.flightDelay = flightDelay;
					delayWeather.weather = map.get(date);
					context.write(dateDelay, delayWeather);
		        }
			}
		}
	}
	
	public static final class MapSideJoinReducer extends Reducer<DateDelay, DelayWeather, DateDelay, DelayWeather>{
		@Override
		protected void reduce(
				DateDelay key,
				Iterable<DelayWeather> values,
				Reducer<DateDelay, DelayWeather, DateDelay, DelayWeather>.Context context)
				throws IOException, InterruptedException {
			Iterator<DelayWeather> iterator = values.iterator();
			while (iterator.hasNext()) {
				context.write(key, iterator.next());
			}
		}
	}
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		int result = 0;  
        try {  
            result = ToolRunner.run(new Configuration(),  new Task1(), args);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
        System.exit(result);  
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), "Task1");
		job.setJarByClass(getClass());
		Configuration conf = job.getConfiguration();
		//conf.set(STATION_NAME, args[0]);
		conf.set(DESTINATION, args[0]);
		job.addCacheFile(new URI("/user/horton/weather/sfo_weather.csv"));
		Path out = new Path("task1");
		out.getFileSystem(conf).delete(out, true);
		FileInputFormat.setInputPaths(job, new Path("/user/horton/flightdelays"));
		FileOutputFormat.setOutputPath(job, out);
		
		conf.set(TextOutputFormat.SEPERATOR, ",");
		
		job.setMapperClass(MapSideJoinMapper.class);
		//job.setGroupingComparatorClass(DateDelayComparator.class);
		job.setReducerClass(MapSideJoinReducer.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(DelayFileOutputFormat.class);
		job.setMapOutputKeyClass(DateDelay.class);
		job.setMapOutputValueClass(DelayWeather.class);
		job.setOutputKeyClass(DateDelay.class);
		job.setOutputValueClass(DelayWeather.class);
		job.setNumReduceTasks(2);
		//job.setNumReduceTasks(0);
		return job.waitForCompletion(true) ? 0 : 1;
	}
}

package task1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Datee implements WritableComparable<Datee> {
	public int year;
	public int month;
	public int day;
	
	public Datee() {
	}
	
	public Datee(int year, int month, int day) {
		this.year = year;
		this.month = month;
		this.day = day;
	}
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(year);
		out.writeInt(month);
		out.writeInt(day);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		year = in.readInt();
		month = in.readInt();
		day = in.readInt();
	}

	@Override
	public int compareTo(Datee date) {
		int response = this.year - date.year;
		if (response == 0) {
			response = this.month - date.month;
		}
		if (response == 0) {
			response = this.day - date.day;
		}
		return response;
	}
	
	@Override
	public int hashCode() {
		return year + month + day;
	}
	
	@Override
	public boolean equals(Object obj) {
		if (obj instanceof Datee) {
			Datee date = (Datee) obj;
			if (year == date.year && month == date.month && day == date.day) {
				return true;
			}
		}
		return false;
	}

	public int getYear() {
		return year;
	}

	public void setYear(int year) {
		this.year = year;
	}

	public int getMonth() {
		return month;
	}

	public void setMonth(int month) {
		this.month = month;
	}

	public int getDay() {
		return day;
	}

	public void setDay(int day) {
		this.day = day;
	}
	
	@Override
	public String toString() {
		
		return this.year + "," + this.month + "," + this.day;
	}
	

}

package task1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Weather implements Writable {
	private int prcp;
	private int tMax;
	private int tMin;
	public Weather() {
	}
	public Weather(int prcp, int tMax, int tMin){
		this.prcp = prcp;
		this.tMax = tMax;
		this.tMin = tMin;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(prcp);
		out.writeInt(tMax);
		out.writeInt(tMin);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.prcp = in.readInt();
		this.tMax = in.readInt();
		this.tMin = in.readInt();
	}
	
	@Override
	public String toString() {
		return this.prcp + "," + this.tMax + "," + this.tMin;
	}

}

package task1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class DateDelay implements WritableComparable<DateDelay> {
	public Datee date;
	public int arriveDelay;
	
	public DateDelay() {
	}
	
	public DateDelay(Datee date, int arriveDelay) {
		this.date = date;
		this.arriveDelay = arriveDelay;
	}
	
	@Override
	public void write(DataOutput out) throws IOException {
		date.write(out);
		out.writeInt(arriveDelay);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		date = new Datee();
		date.readFields(in);
		arriveDelay = in.readInt();
	}

	@Override
	public int compareTo(DateDelay dateDelay) {
		int response = this.date.compareTo(dateDelay.date);
		if (response == 0) {
		    response = dateDelay.arriveDelay - this.arriveDelay;
		}
		return response;
	}
	@Override
	public String toString() {
		return this.date + "," + this.arriveDelay;
	}

}

package task1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlightDelay implements Writable{
	public int depTime;
	public int arrTime;
	public String uniqueCarrier;
	public int flightNum;
	public int actualElapsedTime;
	public int arrDelay;
	public int depDelay;
	public String origin;
	public String destionation;
	public FlightDelay() {
	}
	public FlightDelay(int depTime, int arrTime, String uniqueCarrier, int flightNum, 
			int actualElapsedTime, int arrDelay, int depDelay, String origin, String destionation) {
		this.depTime = depTime;
		this.arrTime = arrTime;
		this.uniqueCarrier = uniqueCarrier;
		this.flightNum = flightNum;
		this.actualElapsedTime = actualElapsedTime;
		this.arrDelay = arrDelay;
		this.depDelay = depDelay;
		this.origin = origin;
		this.destionation = destionation;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(depTime);
		out.writeInt(arrTime);
        out.writeUTF(uniqueCarrier);
		out.writeInt(flightNum);
		out.writeInt(actualElapsedTime);
		out.writeInt(arrDelay);
		out.writeInt(depDelay);
        out.writeUTF(origin);
		out.writeUTF(destionation);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.depTime = in.readInt();
		this.arrTime = in.readInt();
		this.uniqueCarrier = in.readUTF();
		this.flightNum = in.readInt();
		this.actualElapsedTime = in.readInt();
		this.arrDelay = in.readInt();
		this.depDelay = in.readInt();
		this.origin = in.readUTF();
		this.destionation = in.readUTF();
		
	}
	
	@Override
	public String toString() {
		return this.depTime + "," + this.arrTime + "," + this.uniqueCarrier + "," + this.flightNum + "," + this.actualElapsedTime + "," + this.arrDelay
				 + "," + this.depDelay + "," + this.origin + "," + this.destionation;
	}

}

package task1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DelayWeather implements Writable {
	public FlightDelay flightDelay;
	public Weather weather;
	@Override
	public void write(DataOutput out) throws IOException {
		flightDelay.write(out);
		weather.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		flightDelay = new FlightDelay();
		weather = new Weather();
		flightDelay.readFields(in);
		weather.readFields(in);
	}

	@Override
	public String toString() {
		return this.flightDelay + "," + this.weather;
	}
}