1. 程式人生 > 實用技巧 >MapReduce之MapJoin案例

MapReduce之MapJoin案例

@目錄

使用場景

Map Join 適用於一張表十分小、一張表很大的場景。

優點

思考:在Reduce 端處理過多的表,非常容易產生資料傾斜。怎麼辦?
在Map端快取多張表,提前處理業務邏輯,這樣增加Map 端業務,減少Reduce 端資料的壓力,儘可能的減少資料傾斜。

具體辦法:採用DistributedCache

(1)在Mapper的setup階段,將檔案讀取到快取集合中。
(2)在驅動函式中載入快取。

/快取普通檔案到Task執行節點。
job.addCacheFile(new URI("file://e:/cache/pd.txt");

案例

每個MapTask在map()中完成Join
注意:

  • 只需要將要Join的資料order.txt作為切片,讓MapTask讀取
  • pd.txt不以切片形式讀入,而直接在MapTask中使用HDFS下載此檔案,下載後,使用輸入流手動讀取其中的資料
  • 在map()之前通常是將大檔案以切片形式讀取,小檔案手動讀取!

order.txt---->切片(orderId,pid,amount)----JoinMapper.map()
pd.txt----->切片(pid,pname)----JoinMapper.map()

需求分析

MapJoin適用於關聯表中有小表的情形

程式碼實現

JoinBean.java

public class JoinBean {
	
	private String orderId;
	private String pid;
	private String pname;
	private String amount;
	
	@Override
	public String toString() {
		return  orderId + "\t" +  pname + "\t" + amount ;
	}

	public String getOrderId() {
		return orderId;
	}

	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}

	public String getPid() {
		return pid;
	}

	public void setPid(String pid) {
		this.pid = pid;
	}

	public String getPname() {
		return pname;
	}

	public void setPname(String pname) {
		this.pname = pname;
	}

	public String getAmount() {
		return amount;
	}

	public void setAmount(String amount) {
		this.amount = amount;
	}


}

MapJoinMapper.java

/*
 * 1. 在Hadoop中,hadoop為MR提供了分散式快取
 * 			①用來快取一些Job執行期間的需要的檔案(普通檔案,jar,歸檔檔案(har))
 * 			②通過在Job的Configuration中,使用uri代替要快取的檔案
 * 			③分散式快取會假設當前的檔案已經上傳到了HDFS,並且在叢集的任意一臺機器都可以訪問到這個URI所代表的檔案
 * 			④分散式快取會在每個節點的task執行之前,提前將檔案傳送到節點
 * 			⑤分散式快取的高效是由於每個Job只會複製一次檔案,且可以自動在從節點對歸檔檔案解歸檔
 * 
 * 		
 * 
 * 
 */
public class MapJoinMapper extends Mapper<LongWritable, Text, JoinBean, NullWritable>{

	private JoinBean out_key=new JoinBean();
	private Map<String, String> pdDatas=new HashMap<String, String>();
	//在map之前手動讀取pd.txt中的內容
	
	@Override
	protected void setup(Mapper<LongWritable, Text, JoinBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		
		//從分散式快取中讀取資料
		URI[] files = context.getCacheFiles();
		
		for (URI uri : files) {
			
			BufferedReader reader = new BufferedReader(new FileReader(new File(uri)));
			
			String line="";
			
			//迴圈讀取pd.txt中的每一行
			while(StringUtils.isNotBlank(line=reader.readLine())) {
				
				String[] words = line.split("\t");
				
				pdDatas.put(words[0], words[1]);

			}
			
			reader.close();
			
		}
		
	}
	
	//對切片中order.txt的資料進行join,輸出
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, JoinBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split("\t");
		
		out_key.setOrderId(words[0]);
		out_key.setPname(pdDatas.get(words[1]));
		out_key.setAmount(words[2]);
		
		context.write(out_key, NullWritable.get());
			
	}
	
}

MapJoinDriver.java

public class MapJoinDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/mapjoin");
		Path outputPath=new Path("e:/mroutput/mapjoin");
		

		//作為整個Job的配置
		Configuration conf = new Configuration();
		//保證輸出目錄不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			
			fs.delete(outputPath, true);
			
		}
		
		// ①建立Job
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(MapJoinDriver.class);
		
		
		// 為Job建立一個名字
		job.setJobName("wordcount");
		
		// ②設定Job
		// 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
		job.setMapperClass(MapJoinMapper.class);
		
		// 設定輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 設定分散式快取
		job.addCacheFile(new URI("file:///e:/pd.txt"));
		
		//取消reduce階段
		job.setNumReduceTasks(0);

		// ③執行Job
		job.waitForCompletion(true);
		
	}

}