MapReduce編程之Semi Join多種應用場景與使用
Map Join 實現方式一
● 使用場景:一個大表(整張表內存放不下,但表中的key內存放得下),一個超大表
● 實現方式:分布式緩存
● 用法:
SemiJoin就是所謂的半連接,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些數據,在網絡中只傳輸參與連接的數據不參與連接的數據不必在網絡中進行傳輸,從而減少了shuffle的網絡傳輸量,使整體效率得到提高,其他思想和reduce join是一模一樣的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來通過DistributedCach分發到相關節點,然後將其取出放到內存中(可以放到HashSet中),在map階段掃描連接表,將join key不在內存HashSet中的記錄過濾掉,讓那些參與join的記錄通過shuffle傳輸到reduce端進行join操作,其他的和reduce join都是一樣的。
代碼實現
package com.hadoop.reducejoin.test; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashSet; import java.util.List; importSemiJoinjava.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.bloom.Key; /* * 一個大表,一個小表(也很大,內存中放不下) * map 階段:Semi Join解決小表整個記錄內存放不下的場景,那麽就取出來一小部分關鍵字段放入內存,過濾大表 * 提前過濾,提前提取出小表中的連接字段放入內存中,在map階段就僅留下大表中那些小表中存在的連接字段key * reduce 階段:reduce side join */ public class SemiJoin { /** * 為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。 * 然後用連接字段作為key,其余部分和新加的標誌作為value,最後進行輸出。 */ public static class SemiJoinMapper extends Mapper<Object, Text, Text, Text> { // 定義Set集合保存小表中的key private Set<String> joinKeys = new HashSet<String>(); private Text joinKey = new Text(); private Text combineValue = new Text(); /** * 獲取分布式緩存文件 */ protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br; String infoAddr = null; // 返回緩存文件路徑 Path[] cacheFilesPaths = context.getLocalCacheFiles(); for (Path path : cacheFilesPaths) { String pathStr = path.toString(); br = new BufferedReader(new FileReader(pathStr)); while (null != (infoAddr = br.readLine())) { // 按行讀取並解析氣象站數據 String[] records = StringUtils.split(infoAddr.toString(), "\t"); if (null != records)// key為stationID joinKeys.add(records[0]); } } } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String pathName = ((FileSplit) context.getInputSplit()).getPath() .toString(); // 如果數據來自於records,加一個records的標記 if (pathName.endsWith("records-semi.txt")) { String[] valueItems = StringUtils.split(value.toString(), "\t"); // 過濾掉臟數據 if (valueItems.length != 3) { return; } // 提前過濾,提前提取出小表中的連接字段,在map階段就僅留下大表中那些小表中存在的連接字段key if (joinKeys.contains(valueItems[0])) { joinKey.set(valueItems[0]); combineValue.set("records-semi.txt" + valueItems[1] + "\t" + valueItems[2]); context.write(joinKey, combineValue); } } else if (pathName.endsWith("station.txt")) { // 如果數據來自於station,加一個station的標記 String[] valueItems = StringUtils.split(value.toString(), "\t"); // 過濾掉臟數據 if (valueItems.length != 2) { return; } joinKey.set(valueItems[0]); combineValue.set("station.txt" + valueItems[1]); context.write(joinKey, combineValue); } } } /* * reduce 端做笛卡爾積 */ public static class SemiJoinReducer extends Reducer<Text, Text, Text, Text> { private List<String> leftTable = new ArrayList<String>(); private List<String> rightTable = new ArrayList<String>(); private Text result = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 一定要清空數據 leftTable.clear(); rightTable.clear(); // 相同key的記錄會分組到一起,我們需要把相同key下來自於不同表的數據分開,然後做笛卡爾積 for (Text value : values) { String val = value.toString(); System.out.println("value=" + val); if (val.startsWith("station.txt")) { leftTable.add(val.replaceFirst("station.txt", "")); } else if (val.startsWith("records-semi.txt")) { rightTable.add(val.replaceFirst("records-semi.txt", "")); } } // 笛卡爾積 for (String leftPart : leftTable) { for (String rightPart : rightTable) { result.set(leftPart + "\t" + rightPart); context.write(key, result); } } } } public static void main(String[] arg0) throws Exception { Configuration conf = new Configuration(); String[] args = { "hdfs://sparks:9000/middle/reduceJoin/station.txt", "hdfs://sparks:9000/middle/reduceJoin/station.txt", "hdfs://sparks:9000/middle/reduceJoin/records-semi.txt", "hdfs://sparks:9000/middle/reduceJoin/SemiJoin-out" }; String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: semijoin <in> [<in>...] <out>"); System.exit(2); } //輸出路徑 Path mypath = new Path(otherArgs[otherArgs.length - 1]); FileSystem hdfs = mypath.getFileSystem(conf);// 創建輸出路徑 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance(conf, "SemiJoin"); //添加緩存文件 job.addCacheFile(new Path(otherArgs[0]).toUri()); job.setJarByClass(SemiJoin.class); job.setMapperClass(SemiJoinMapper.class); job.setReducerClass(SemiJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //添加輸入路徑 for (int i = 1; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //添加輸出路徑 FileOutputFormat.setOutputPath(job, new Path( otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Reduce join + BloomFilter
● 使用場景:一個大表(表中的key內存仍然放不下),一個超大表
在某些情況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可以使用BloomFiler以節省空間。
BloomFilter最常見的作用是:判斷某個元素是否在一個集合裏面。它最重要的兩個方法是:add() 和membershipTest ()。
因而可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關系,只不過增加了少量的網絡IO而已。
● BloomFilter參數計算方式:
n:小表中的記錄數。
m:位數組大小,一般m是n的倍數,倍數越大誤判率就越小,但是也有內存限制,不能太大,這個值需要反復測試得出。
k:hash個數,最優hash個數值為:k = ln2 * (m/n)
代碼實現
package com.hadoop.reducejoin.test; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; /* * 一個大表,一個小表 * map 階段:BloomFilter 解決小表的key集合在內存中仍然存放不下的場景,過濾大表 * reduce 階段:reduce side join */ public class BloomFilteringDriver { /** * 為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。 * 然後用連接字段作為key,其余部分和新加的標誌作為value,最後進行輸出。 */ public static class BloomFilteringMapper extends Mapper<Object, Text, Text, Text> { // 第一個參數是vector的大小,這個值盡量給的大,可以避免hash對象的時候出現索引重復 // 第二個參數是散列函數的個數 // 第三個是hash的類型,雖然是int型,但是只有默認兩個值 // 哈希函數個數k、位數組大小m及字符串數量n之間存在相互關系 //n 為小表記錄數,給定允許的錯誤率E,可以確定合適的位數組大小,即m >= log2(e) * (n * log2(1/E)) // 給定m和n,可以確定最優hash個數,即k = ln2 * (m/n),此時錯誤率最小 private BloomFilter filter = new BloomFilter(10000, 6, Hash.MURMUR_HASH); private Text joinKey = new Text(); private Text combineValue = new Text(); /** * 獲取分布式緩存文件 */ @SuppressWarnings("deprecation") protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br; String infoAddr = null; // 返回緩存文件路徑 Path[] cacheFilesPaths = context.getLocalCacheFiles(); for (Path path : cacheFilesPaths) { String pathStr = path.toString(); br = new BufferedReader(new FileReader(pathStr)); while (null != (infoAddr = br.readLine())) { // 按行讀取並解析氣象站數據 String[] records = StringUtils.split(infoAddr.toString(), "\t"); if (null != records)// key為stationID filter.add(new Key(records[0].getBytes())); } } } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String pathName = ((FileSplit) context.getInputSplit()).getPath() .toString(); // 如果數據來自於records,加一個records的標記 if (pathName.endsWith("records-semi.txt")) { String[] valueItems = StringUtils.split(value.toString(), "\t"); // 過濾掉臟數據 if (valueItems.length != 3) { return; } //通過filter 過濾大表中的數據 if (filter.membershipTest(new Key(valueItems[0].getBytes()))) { joinKey.set(valueItems[0]); combineValue.set("records-semi.txt" + valueItems[1] + "\t" + valueItems[2]); context.write(joinKey, combineValue); } } else if (pathName.endsWith("station.txt")) { // 如果數據來自於station,加一個station的標記 String[] valueItems = StringUtils.split(value.toString(), "\t"); // 過濾掉臟數據 if (valueItems.length != 2) { return; } joinKey.set(valueItems[0]); combineValue.set("station.txt" + valueItems[1]); context.write(joinKey, combineValue); } } } /* * reduce 端做笛卡爾積 */ public static class BloomFilteringReducer extends Reducer<Text, Text, Text, Text> { private List<String> leftTable = new ArrayList<String>(); private List<String> rightTable = new ArrayList<String>(); private Text result = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 一定要清空數據 leftTable.clear(); rightTable.clear(); // 相同key的記錄會分組到一起,我們需要把相同key下來自於不同表的數據分開,然後做笛卡爾積 for (Text value : values) { String val = value.toString(); System.out.println("value=" + val); if (val.startsWith("station.txt")) { leftTable.add(val.replaceFirst("station.txt", "")); } else if (val.startsWith("records-semi.txt")) { rightTable.add(val.replaceFirst("records-semi.txt", "")); } } // 笛卡爾積 for (String leftPart : leftTable) { for (String rightPart : rightTable) { result.set(leftPart + "\t" + rightPart); context.write(key, result); } } } } public static void main(String[] arg0) throws Exception { Configuration conf = new Configuration(); String[] args = { "hdfs://sparks:9000/middle/reduceJoin/station.txt", "hdfs://sparks:9000/middle/reduceJoin/station.txt", "hdfs://sparks:9000/middle/reduceJoin/records-semi.txt", "hdfs://sparks:9000/middle/reduceJoin/BloomFilte-out" }; String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: BloomFilter <in> [<in>...] <out>"); System.exit(2); } //輸出路徑 Path mypath = new Path(otherArgs[otherArgs.length - 1]); FileSystem hdfs = mypath.getFileSystem(conf);// 創建輸出路徑 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance(conf, "bloomfilter"); //添加緩存文件 job.addCacheFile(new Path(otherArgs[0]).toUri()); job.setJarByClass(BloomFilteringDriver.class); job.setMapperClass(BloomFilteringMapper.class); job.setReducerClass(BloomFilteringReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //添加輸入文件 for (int i = 1; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //設置輸出路徑 FileOutputFormat.setOutputPath(job, new Path( otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }BloomFilteringDriver
總結
三種join方式適用於不同的場景,其處理效率上相差很大,其主要導致因素是網絡傳輸。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,寫分布式大數據處理程序的時最好要對整體要處理的數據分布情況作一個了解,這可以提高我們代碼的效率,使數據的傾斜度降到最低,使我們的代碼傾向性更好。
MapReduce編程之Semi Join多種應用場景與使用