Map端的join -- 商品跟訂單合併
參考部落格:
https://my.oschina.net/leejun2005/blog/111963
需求:
之所以存在reduce join,是因為在map階段不能獲取所需要的join欄位,即同一個key對應的欄位可能位於不同的map中。但是Reduce side join 是非常低效的,因為shuffle階段要經過大量的資料傳輸。
解決辦法:
Map side join 是針對一下場景進行優化:兩個待連線的表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task 記憶體中存在一份(比如存放到hash table中)然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key 的記錄,如果有,則連線後輸出。為了支援檔案的複製,Hadoop提供了一個類DistributedCache。
DistributedCache是hadoop框架提供的一種機制,可以將job指定的檔案,在job執行前,先行分發到task執行的機器上,並有相關機制對cache檔案進行管理.
主要的注意事項有:
1.DistributedCache只能應用於分散式的情況,包括偽分散式,完全分散式.有些api在這2種情況下有移植性問題.
2.需要分發的檔案,必須提前放到hdfs上.預設的路徑字首是hdfs://的,不是file://
3.需要分發的檔案,最好在執行期間是隻讀的.
4.不建議分發較大的檔案,比如壓縮檔案,可能會影響task的啟動速度.
Hadoop DistributedCache詳解 :
package com.thp.bigdata.mapSideJoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 在Map端實現join,不需要在join端實現join,這個可以減少資料傾斜 * 適用於關聯表中有小表的情況 * 可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表資料進行join並輸出最終結果 * 可以大大提高join操作的併發操作,加快處理速度 * * 【部落格連結】 * http://dongxicheng.org/mapreduce-nextgen/hadoop-distributedcache-details/ * https://blog.csdn.net/xiaolang85/article/details/11782539 * * @author 湯小萌 * */ public class MapSideJoin { static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 使用一個HashMap來載入儲存產品資訊表 Map<String, String> pdInfoMap = new HashMap<String, String>(); Text k = new Text(); // 在maptask處理資料之前呼叫一次,可以用來做一些初始化的操作 @Override protected void setup(Context context) throws IOException, InterruptedException { // 已經載入到當前 maptask的工作目錄下了,可以直接寫檔名讀取檔案 // hadoop 所有讀取資料都是使用的utf-8 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt"))); String line; while(StringUtils.isNotEmpty(line = br.readLine())) { String[] fields = line.split("\t"); pdInfoMap.put(fields[0], fields[1]); } br.close(); } // 由於已經持有完整的產品資訊表,所以可以直接在map方法中就能實現join的邏輯 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String[] fields = orderLine.split("\t"); String pdName = pdInfoMap.get(fields[1]); k.set(orderLine + "\t" + pdName); context.write(k, NullWritable.get()); } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); job.setMapperClass(MapSideJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("F:/mapSideJoin/input")); FileOutputFormat.setOutputPath(job, new Path("F:/mapSideJoin/output")); // 指定需要快取一個檔案到所有的maptask執行節點工作目錄 /* job.addArchiveToClassPath(archive); */// 快取jar包到task執行節點的classpath中 /* job.addFileToClassPath(file); */// 快取普通檔案到task執行節點的classpath中 /* job.addCacheArchive(uri); */// 快取壓縮包檔案到task執行節點的工作目錄 /* job.addCacheFile(uri) */// 快取普通檔案到task執行節點的工作目錄 // 將產品表文件快取到task工作節點的工作目錄中去 job.addCacheFile(new URI("file:/F:/mapSideJoin/pdts.txt")); // map端的join的邏輯不需要reduce階段,設定reducetask數量為0,不設定是會出現問題的 job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } } }
我在執行的時候會出現這個問題:
hadoop在本地調試出現的錯誤——建立符號連結失敗
failed 1 with: CreateSymbolicLink error (1314): ???????????
網上有好幾種這個問題的解決辦法,我是在啟動eclipse的時候直接以管理員的身份啟動的,這個許可權問題就可以解決。
還有一個問題就是編碼的問題,我的pdts.txt檔案裡面出現了中文,但是我使用的是記事本編輯的,預設儲存的編碼不是uft-8,所以最終在輸出檔案的時候會出現亂碼,因為hadoop所有的檔案處理都是使用utf-8,我們在儲存檔案的時候,將編碼格式改為utf-8就可以解決亂碼問題。
程式碼地址:
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/mapSideJoin