1. 程式人生 > >17,reducejoin

17,reducejoin

需求:將對應商品表的id換成對應的名字

資料:
和上一節的order pd 一樣

Mapper:

public class JoinMapper extends Mapper<LongWritable, Text, Text,OrBeen >{

	@Override
	protected void map(LongWritable key,Text value, Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
	
		OrBeen or = new OrBeen();
		Text k = new Text();
		FileSplit inputSplit = (FileSplit)context.getInputSplit();
		//獲得讀取的路徑名
		String path = inputSplit.getPath().getName();
		String line = value.toString();
		String[] fields = line.split("\t");
		if(path.equals("order.txt")) {
			or.setP_time(fields[0]);
			or.setP_id(fields[1]);
			or.setP_name("");
			or.setP_num(Integer.parseInt(fields[2]));
			or.setP_flag(0);
			//0:訂單表
			k.set(fields[1]);
		}else {
			or.setP_time("");
			or.setP_id(fields[0]);
			or.setP_name(fields[1]);
			or.setP_num(0);
			or.setP_flag(1);
			k.set(fields[0]);
		}
		context.write(k, or);
	}
	

}

在Map端,我們通過自己構造一個型別,然後包括商品表pd.txt的欄位,通過flag來判斷是哪個表
在傳遞給Reduce,我們通過k來分別指定一組資料,例如商品id為01的為一組

OrBeen:

public class OrBeen implements Writable{

	@Override
	public String toString() {
		return p_time + "\t" + p_name +"\t" +p_num;
	}
	private String p_time ;
	private String p_id ;
	private int p_num;
	private String p_name;
	private int p_flag;
	public OrBeen(String p_time, String p_id, int p_num, String p_name, int p_flag) {
		super();
		this.p_time = p_time;
		this.p_id = p_id;
		this.p_num = p_num;
		this.p_name = p_name;
		this.p_flag = p_flag;
	}
	public OrBeen() {
		super();
		// TODO Auto-generated constructor stub
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		p_time = in.readUTF();
		p_id = in.readUTF();
		p_num = in.readInt();
		p_name = in.readUTF();
		p_flag = in.readInt();
	}
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(p_time);
		out.writeUTF(p_id);
		out.writeInt(p_num);
		out.writeUTF(p_name);
		out.writeInt(p_flag);
		
	}
	public String getP_time() {
		return p_time;
	}
	public void setP_time(String p_time) {
		this.p_time = p_time;
	}
	public String getP_id() {
		return p_id;
	}
	public void setP_id(String i) {
		this.p_id = i;
	}
	public int getP_num() {
		return p_num;
	}
	public void setP_num(int p_num) {
		this.p_num = p_num;
	}
	public String getP_name() {
		return p_name;
	}
	public void setP_name(String p_name) {
		this.p_name = p_name;
	}
	public int getP_flag() {
		return p_flag;
	}
	public void setP_flag(int p_flag) {
		this.p_flag = p_flag;
	}
	

}

Reduce:

public class JoinReducer extends Reducer<Text, OrBeen, OrBeen, NullWritable>{

	@Override
	protected void reduce(Text key, Iterable<OrBeen> value, Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		//存放訂單表
		ArrayList<OrBeen>  or = new ArrayList<OrBeen>();
		//存放商品表
		OrBeen o = new OrBeen();
		
		for(OrBeen orbeen:value) {
			if(orbeen.getP_flag()==0) {
				OrBeen A = new OrBeen(); 
				try {
					//注意 BeanUtils,copyProperties只能copy同類型的,將orbeen先拷貝到A裡面,然後通過or.add新增到集合中去
					BeanUtils.copyProperties(A, orbeen);
				} catch (IllegalAccessException | InvocationTargetException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				or.add(A);
			}else {
				try {
					BeanUtils.copyProperties(o, orbeen);
				} catch (IllegalAccessException | InvocationTargetException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
		//將訂單表對應的名字互換
		for(OrBeen k :or) {
			k.setP_name(o.getP_name());	
			context.write(k, NullWritable.get());
		
		}
		
	
		
		}
	
}

Job:

public class JoinJobDrive {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(JoinJobDrive.class);
		
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(OrBeen.class);
		
		job.setOutputKeyClass(OrBeen.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("B:/筆記/資料素材/in"));
		FileOutputFormat.setOutputPath(job, new Path("B:/筆記/資料素材/out"));
		
		boolean b = job.waitForCompletion(true);
		System.out.println(b);
	}

}

以上就是reducejoin的案例