1. 程式人生 > 實用技巧 >MapReduce之ReduceJoin案例

MapReduce之ReduceJoin案例

@目錄

Reduce Join原理

  • Map端的主要工作:為來自不同表或檔案的key/value對,打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。
  • Reduce端的主要工作:在Reduce端以連線欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同檔案的記錄(在Map階段已經打標誌)分開,最後進行合併就ok了。
  • 該方法的缺點:這種方式的缺點很明顯就是會造成Map和Reduce端也就是shuffle階段出現大量的資料傳輸,效率很低

案例實操


需求分析

通過將關聯條件作為Map輸出的key,將兩表滿足Join條件的資料並攜帶資料所來源的檔案資訊,發往同一個ReduceTask,在Reduce中進行資料的串聯。

MR分析

替換的前提是: 相同pid的資料,需要分到同一個區

0號區: 1001 01 1
01 小米

1號區: 1002 02 2
02 華為

注意:

  • 分割槽時,以pid為條件進行分割槽!
  • 兩種不同的資料,經過同一個Mapper的map()處理,因此需要在map()中,判斷切片資料的來源,根據來源執行不同的封裝策略
  • 一個Mapper只能處理一種切片的資料,所以在Map階段無法完成join操作,需要在reduce中實現Join
  • 在Map階段,封裝資料。 自定義的Bean需要能夠封裝,兩個切片中的所有的資料
  • 在reduce輸出時,只需要將來自於order.txt中的資料,將pid替換為pname,而不需要輸出所有的key-value
  • 在Map階段對資料打標記,標記哪些key-value屬於order.txt,哪些屬於pd.txt

order.txt---->切片(orderId,pid,amount)----JoinMapper.map()------>JoinReducer
pd.txt----->切片(pid,pname)----JoinMapper.map()

MR實現

Mapper:
keyin-valuein:
map:
keyout=valueout:

Reducer:
keyin-valuein:
reduce:
keyout=valueout:

ReduceJoin

ReduceJoin需要在Reduce階段實現Join功能,一旦資料量過大,效率低!
後面有一種方法使用MapJoin解決ReduceJoin低效的問題!

程式碼實現

建立商品和訂合併後的Bean類,JoinBean.java

public class JoinBean implements Writable{
	
	private String orderId;
	private String pid;
	private String pname;
	private String amount;
	private String source;
	
	

	@Override
	public String toString() {
		return  orderId + "\t" +  pname + "\t" + amount ;
	}

	public String getOrderId() {
		return orderId;
	}

	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}

	public String getPid() {
		return pid;
	}

	public void setPid(String pid) {
		this.pid = pid;
	}

	public String getPname() {
		return pname;
	}

	public void setPname(String pname) {
		this.pname = pname;
	}

	public String getAmount() {
		return amount;
	}

	public void setAmount(String amount) {
		this.amount = amount;
	}

	public String getSource() {
		return source;
	}

	public void setSource(String source) {
		this.source = source;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(orderId);
		out.writeUTF(pid);
		out.writeUTF(pname);
		out.writeUTF(amount);
		out.writeUTF(source);
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		orderId=in.readUTF();
		pid=in.readUTF();
		pname=in.readUTF();
		amount=in.readUTF();
		source=in.readUTF();
		
	}

}

編寫Mapper類,ReduceJoinMapper.java

/*
 * Map階段無法完成Join,只能封裝資料,在Reduce階段完成Join
 * 
 * 1. order.txt: 1001	01	1
 * 	 pd.txt :  01 小米
 * 
 * 2. Bean必須能封裝所有的資料
 * 
 * 3. Reduce只需要輸出來自於order.txt的資料,需要在Mapper中對資料打標記,標記資料的來源
 * 
 * 4. 在Mapper中需要獲取當前切片的來源,根據來源執行不同的封裝邏輯
 */
public class ReduceJoinMapper extends Mapper<LongWritable, Text, NullWritable, JoinBean>{
	
	private NullWritable out_key=NullWritable.get();
	private JoinBean out_value=new JoinBean();
	private String source;
	
	// setUp()在map()之前先執行,只執行一次
	@Override
	protected void setup(Mapper<LongWritable, Text, NullWritable, JoinBean>.Context context)
			throws IOException, InterruptedException {
		
		InputSplit inputSplit = context.getInputSplit();
		
		FileSplit split=(FileSplit) inputSplit;
		
		source=split.getPath().getName();
	}
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, JoinBean>.Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split("\t");
		//打標記
		out_value.setSource(source);
		
		if (source.equals("order.txt")) {
			out_value.setOrderId(words[0]);
			out_value.setPid(words[1]);
			out_value.setAmount(words[2]);
			// 保證所有的屬性不為null
			out_value.setPname("nodata");
		}else {
			out_value.setPid(words[0]);
			out_value.setPname(words[1]);
			// 保證所有的屬性不為null
			out_value.setOrderId("nodata");
			out_value.setAmount("nodata");			
		}
		
		context.write(out_key, out_value);
	}
}

自定義分割槽器,MyPartitioner.java

/*
 * 1.保證pid相同的key-value分到同一個區
 */
public class MyPartitioner extends Partitioner<NullWritable, JoinBean>{

	@Override
	public int getPartition(NullWritable key, JoinBean value, int numPartitions) {
		
		return (value.getPid().hashCode() & Integer.MAX_VALUE) % numPartitions;
	}

}

編寫Reducer類,JoinBeanReducer.java

/*
 *  order.txt: 1001	01	1
 * 	 pd.txt :  01 小米
 *          orderid,pid,amount,source,pname
 * 1. (null,1001,01,1,order.txt,nodata)
 * (null,nodata,01,nodata,pd.txt,小米)
 * 
 * 2. 在輸出之前,需要把資料按照source屬性分類
 * 		只能在reduce中分類
 */
public class JoinBeanReducer extends Reducer<NullWritable, JoinBean, NullWritable, JoinBean>{

	//分類的集合
	private List<JoinBean> orderDatas=new ArrayList<>();
	private Map<String, String> pdDatas=new HashMap<>();
	
	//根據source分類
	@Override
	protected void reduce(NullWritable key, Iterable<JoinBean> values,
			Reducer<NullWritable, JoinBean, NullWritable, JoinBean>.Context arg2)
			throws IOException, InterruptedException {
		
		for (JoinBean value : values) {
			
			if (value.getSource().equals("order.txt")) {
				
				// 將value物件的屬性資料取出,封裝到一個新的JoinBean中
				// 因為value至始至終都是同一個物件,只不過每次迭代,屬性會隨之變化
				JoinBean joinBean = new JoinBean();
				
				try {
					BeanUtils.copyProperties(joinBean, value);
				} catch (IllegalAccessException e) {
					e.printStackTrace();
				} catch (InvocationTargetException e) {
					e.printStackTrace();
				}
				
				orderDatas.add(joinBean);
				
			}else {
				
				//資料來源於pd.txt
				pdDatas.put(value.getPid(), value.getPname());
				
			}
			
		}
		
	}
	
	// Join資料,寫出
	@Override
	protected void cleanup(Reducer<NullWritable, JoinBean, NullWritable, JoinBean>.Context context)
			throws IOException, InterruptedException {
		
		//只輸出來自orderDatas的資料
		for (JoinBean joinBean : orderDatas) {	
			// 從Map中根據pid取出pname,設定到bean的pname屬性中
			joinBean.setPname(pdDatas.get(joinBean.getPid()));
			context.write(NullWritable.get(), joinBean);
		}
		
	}
}

編寫驅動類,CustomIFDriver.java

public class CustomIFDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/reducejoin");
		Path outputPath=new Path("e:/mroutput/reducejoin");
		

		//作為整個Job的配置
		Configuration conf = new Configuration();
		//保證輸出目錄不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			
			fs.delete(outputPath, true);
			
		}
		
		// ①建立Job
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(CustomIFDriver.class);
		
		
		// 為Job建立一個名字
		job.setJobName("wordcount");
		
		// ②設定Job
		// 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
		job.setMapperClass(ReduceJoinMapper.class);
		job.setReducerClass(JoinBeanReducer.class);
		
		// Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
		// 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(JoinBean.class);
		
		// 設定輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 設定分割槽器
		job.setPartitionerClass(MyPartitioner.class);
		
		//需要Join的資料量過大 order.txt 10億,pd.txt 100w,提高MR並行執行的效率
		// Map階段:  修改片大小,切的片多,MapTask執行就多
		// Reduce階段:  修改ReduceTask數量
		
		//可以設定ReduceTasks的數量,預設為1,將輸出在一個檔案。
		//在此案例中,如果是3,則分為三個檔案。如果超過三,其餘檔案則是空的。
		//job.setNumReduceTasks(3);
		
		// ③執行Job
		job.waitForCompletion(true);
		
	}
}

執行結果: