1. 程式人生 > >Map端的join -- 商品跟訂單合併

Map端的join -- 商品跟訂單合併

參考部落格:
https://my.oschina.net/leejun2005/blog/111963

需求:
之所以存在reduce join,是因為在map階段不能獲取所需要的join欄位,即同一個key對應的欄位可能位於不同的map中。但是Reduce side join 是非常低效的,因為shuffle階段要經過大量的資料傳輸。
解決辦法:
Map side join 是針對一下場景進行優化:兩個待連線的表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task 記憶體中存在一份(比如存放到hash table中)然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key 的記錄,如果有,則連線後輸出。為了支援檔案的複製,Hadoop提供了一個類DistributedCache。

DistributedCache是hadoop框架提供的一種機制,可以將job指定的檔案,在job執行前,先行分發到task執行的機器上,並有相關機制對cache檔案進行管理.

主要的注意事項有:

1.DistributedCache只能應用於分散式的情況,包括偽分散式,完全分散式.有些api在這2種情況下有移植性問題.

2.需要分發的檔案,必須提前放到hdfs上.預設的路徑字首是hdfs://的,不是file://

3.需要分發的檔案,最好在執行期間是隻讀的.

4.不建議分發較大的檔案,比如壓縮檔案,可能會影響task的啟動速度.

Hadoop DistributedCache詳解 :

http://dongxicheng.org/mapreduce-nextgen/hadoop-distributedcache-details/

package com.thp.bigdata.mapSideJoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 在Map端實現join,不需要在join端實現join,這個可以減少資料傾斜
 * 適用於關聯表中有小表的情況
 * 可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表資料進行join並輸出最終結果
 * 可以大大提高join操作的併發操作,加快處理速度
 * 
 * 【部落格連結】
 * 		http://dongxicheng.org/mapreduce-nextgen/hadoop-distributedcache-details/
 * 		https://blog.csdn.net/xiaolang85/article/details/11782539
 * 
 * @author 湯小萌
 *
 */
public class MapSideJoin {
	
	static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		// 使用一個HashMap來載入儲存產品資訊表
		Map<String, String> pdInfoMap = new HashMap<String, String>();
		
		Text k = new Text();
		
		// 在maptask處理資料之前呼叫一次,可以用來做一些初始化的操作
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			// 已經載入到當前 maptask的工作目錄下了,可以直接寫檔名讀取檔案
			// hadoop 所有讀取資料都是使用的utf-8
			BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt")));
			String line;
			while(StringUtils.isNotEmpty(line = br.readLine())) {
				String[] fields = line.split("\t");
				pdInfoMap.put(fields[0], fields[1]);
			}
			br.close();
		}
		
		// 由於已經持有完整的產品資訊表,所以可以直接在map方法中就能實現join的邏輯
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String orderLine = value.toString();
			String[] fields = orderLine.split("\t");
			String pdName = pdInfoMap.get(fields[1]);
			k.set(orderLine + "\t" + pdName);
			context.write(k, NullWritable.get());
		}
		
		
		public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
			Configuration conf = new Configuration();
			
			Job job = Job.getInstance(conf);
			
			
			job.setJarByClass(MapSideJoin.class);
			
			job.setMapperClass(MapSideJoinMapper.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(NullWritable.class);
			
			
			FileInputFormat.setInputPaths(job, new Path("F:/mapSideJoin/input"));
			FileOutputFormat.setOutputPath(job, new Path("F:/mapSideJoin/output"));
			
			// 指定需要快取一個檔案到所有的maptask執行節點工作目錄
			/* job.addArchiveToClassPath(archive); */// 快取jar包到task執行節點的classpath中
			/* job.addFileToClassPath(file); */// 快取普通檔案到task執行節點的classpath中
			/* job.addCacheArchive(uri); */// 快取壓縮包檔案到task執行節點的工作目錄
			/* job.addCacheFile(uri) */// 快取普通檔案到task執行節點的工作目錄
			
			// 將產品表文件快取到task工作節點的工作目錄中去
			job.addCacheFile(new URI("file:/F:/mapSideJoin/pdts.txt"));
			
			// map端的join的邏輯不需要reduce階段,設定reducetask數量為0,不設定是會出現問題的
			job.setNumReduceTasks(0);
			
			boolean res = job.waitForCompletion(true);
			
			System.exit(res ? 0 : 1);
			
		}
		
	}
	
	
}

我在執行的時候會出現這個問題:
hadoop在本地調試出現的錯誤——建立符號連結失敗

failed 1 with: CreateSymbolicLink error (1314): ???????????

網上有好幾種這個問題的解決辦法,我是在啟動eclipse的時候直接以管理員的身份啟動的,這個許可權問題就可以解決。

還有一個問題就是編碼的問題,我的pdts.txt檔案裡面出現了中文,但是我使用的是記事本編輯的,預設儲存的編碼不是uft-8,所以最終在輸出檔案的時候會出現亂碼,因為hadoop所有的檔案處理都是使用utf-8,我們在儲存檔案的時候,將編碼格式改為utf-8就可以解決亂碼問題。

程式碼地址:
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/mapSideJoin