1. 程式人生 > >Hadoop中MapReduce多種join實現例項分析

Hadoop中MapReduce多種join實現例項分析

感謝分享:http://database.51cto.com/art/201410/454277.htm

1、在Reudce端進行連線。

在Reudce端進行連線是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下:

Map端的主要工作:為來自不同表(檔案)的key/value對打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。

reduce端的主要工作:在reduce端以連線欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同檔案的記錄(在map階段已經打標誌)分開,最後進行笛卡爾只就ok了。原理非常簡單,下面來看一個例項:

(1)自定義一個value返回型別:

  1. package com.mr.reduceSizeJoin;   
  2. import java.io.DataInput;   
  3. import java.io.DataOutput;   
  4. import java.io.IOException;   
  5. import org.apache.hadoop.io.Text;   
  6. import org.apache.hadoop.io.WritableComparable;   
  7. publicclass CombineValues implements WritableComparable<CombineValues>{   
  8. //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class); 
  9. private Text joinKey;//連結關鍵字 
  10. private Text flag;//檔案來源標誌 
  11. private Text secondPart;//除了連結鍵外的其他部分 
  12. publicvoid setJoinKey(Text joinKey) {   
  13. this.joinKey = joinKey;   
  14.     }   
  15. publicvoid setFlag(Text flag) {   
  16. this.flag = flag;   
  17.     }   
  18. publicvoid setSecondPart(Text secondPart) {   
  19. this.secondPart = secondPart;   
  20.     }   
  21. public Text getFlag() {   
  22. return flag;   
  23.     }   
  24. public Text getSecondPart() {   
  25. return secondPart;   
  26.     }   
  27. public Text getJoinKey() {   
  28. return joinKey;   
  29.     }   
  30. public CombineValues() {   
  31. this.joinKey =  new Text();   
  32. this.flag = new Text();   
  33. this.secondPart = new Text();   
  34.     }
  35. @Override
  36. publicvoid write(DataOutput out) throws IOException {   
  37. this.joinKey.write(out);   
  38. this.flag.write(out);   
  39. this.secondPart.write(out);   
  40.     }   
  41. @Override
  42. publicvoid readFields(DataInput in) throws IOException {   
  43. this.joinKey.readFields(in);   
  44. this.flag.readFields(in);   
  45. this.secondPart.readFields(in);   
  46.     }   
  47. @Override
  48. publicint compareTo(CombineValues o) {   
  49. returnthis.joinKey.compareTo(o.getJoinKey());   
  50.     }   
  51. @Override
  52. public String toString() {   
  53. // TODO Auto-generated method stub 
  54. return"[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   
  55.     }   

(2)map、reduce主體程式碼

  1. package com.mr.reduceSizeJoin;   
  2. import java.io.IOException;   
  3. import java.util.ArrayList;   
  4. import org.apache.hadoop.conf.Configuration;   
  5. import org.apache.hadoop.conf.Configured;   
  6. import org.apache.hadoop.fs.Path;   
  7. import org.apache.hadoop.io.Text;   
  8. import org.apache.hadoop.mapreduce.Job;   
  9. import org.apache.hadoop.mapreduce.Mapper;   
  10. import org.apache.hadoop.mapreduce.Reducer;   
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  16. import org.apache.hadoop.util.Tool;   
  17. import org.apache.hadoop.util.ToolRunner;   
  18. import org.slf4j.Logger;   
  19. import org.slf4j.LoggerFactory;   
  20. /**   
  21.  * @author zengzhaozheng   
  22.  * 用途說明:   
  23.  * reudce side join中的left outer join   
  24.  * 左連線,兩個檔案分別代表2個表,連線欄位table1的id欄位和table2的cityID欄位   
  25.  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
  26.  * tb_dim_city.dat檔案內容,分隔符為"|":   
  27.  * id     name  orderid  city_code  is_show   
  28.  * 0       其他        9999     9999         0   
  29.  * 1       長春        1        901          1   
  30.  * 2       吉林        2        902          1   
  31.  * 3       四平        3        903          1   
  32.  * 4       松原        4        904          1   
  33.  * 5       通化        5        905          1   
  34.  * 6       遼源        6        906          1   
  35.  * 7       白城        7        907          1   
  36.  * 8       白山        8        908          1   
  37.  * 9       延吉        9        909          1   
  38.  * -------------------------風騷的分割線-------------------------------   
  39.  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
  40.  * tb_user_profiles.dat檔案內容,分隔符為"|":   
  41.  * userID   network     flow    cityID   
  42.  * 1           2G       123      1   
  43.  * 2           3G       333      2   
  44.  * 3           3G       555      1   
  45.  * 4           2G       777      3   
  46.  * 5           3G       666      4   
  47.  *   
  48.  * -------------------------風騷的分割線-------------------------------   
  49.  *  結果:   
  50.  *  1   長春  1   901 1   1   2G  123   
  51.  *  1   長春  1   901 1   3   3G  555   
  52.  *  2   吉林  2   902 1   2   3G  333   
  53.  *  3   四平  3   903 1   4   2G  777   
  54.  *  4   松原  4   904 1   5   3G  666   
  55.  */
  56. publicclass ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   
  57. privatestaticfinal Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   
  58. publicstaticclass LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
  59. private CombineValues combineValues = new CombineValues();   
  60. private Text flag = new Text();   
  61. private Text joinKey = new Text();   
  62. private Text secondPart = new Text();   
  63. @Override
  64. protectedvoid map(Object key, Text value, Context context)   
  65. throws IOException, InterruptedException {   
  66. //獲得檔案輸入路徑 
  67.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
  68. //資料來自tb_dim_city.dat檔案,標誌即為"0" 
  69. if(pathName.endsWith("tb_dim_city.dat")){   
  70.                 String[] valueItems = value.toString().split("\\|");   
  71. //過濾格式錯誤的記錄 
  72. if(valueItems.length != 5){   
  73. return;   
  74.                 }   
  75.                 flag.set("0");   
  76.                 joinKey.set(valueItems[0]);   
  77.                 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
  78.                 combineValues.setFlag(flag);   
  79.                 combineValues.setJoinKey(joinKey);   
  80.                 combineValues.setSecondPart(secondPart);   
  81.                 context.write(combineValues.getJoinKey(), combineValues);
  82. }//資料來自於tb_user_profiles.dat,標誌即為"1" 
  83. elseif(pathName.endsWith("tb_user_profiles.dat")){   
  84.                 String[] valueItems = value.toString().split("\\|");   
  85. //過濾格式錯誤的記錄 
  86. if(valueItems.length != 4){   
  87. return;   
  88.                 }   
  89.                 flag.set("1");   
  90.                 joinKey.set(valueItems[3]);   
  91.                 secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
  92.                 combineValues.setFlag(flag);   
  93.                 combineValues.setJoinKey(joinKey);   
  94.                 combineValues.setSecondPart(secondPart);   
  95.                 context.write(combineValues.getJoinKey(), combineValues);   
  96.             }   
  97.         }   
  98.     }   
  99. publicstaticclass LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
  100. //儲存一個分組中的左表資訊 
  101. private ArrayList<Text> leftTable = new ArrayList<Text>();   
  102. //儲存一個分組中的右表資訊 
  103. private ArrayList<Text> rightTable = new ArrayList<Text>();   
  104. private Text secondPar = null;   
  105. private Text output = new Text();   
  106. /**   
  107.          * 一個分組呼叫一次reduce函式   
  108.          */
  109. @Override
  110. protectedvoid reduce(Text key, Iterable<CombineValues> value, Context context)   
  111. throws IOException, InterruptedException {   
  112.             leftTable.clear();   
  113.             rightTable.clear();   
  114. /**   
  115.              * 將分組中的元素按照檔案分別進行存放   
  116.              * 這種方法要注意的問題:   
  117.              * 如果一個分組內的元素太多的話,可能會導致在reduce階段出現OOM,   
  118.              * 在處理分散式問題之前最好先了解資料的分佈情況,根據不同的分佈採取最   
  119.              * 適當的處理方法,這樣可以有效的防止導致OOM和資料過度傾斜問題。   
  120.              */
  121. for(CombineValues cv : value){   
  122.                 secondPar = new Text(cv.getSecondPart().toString());   
  123. //左表tb_dim_city 
  124. if("0".equals(cv.getFlag().toString().trim())){   
  125.                     leftTable.add(secondPar);   
  126.                 }   
  127. //右表tb_user_profiles 
  128. elseif("1".equals(cv.getFlag().toString().trim())){   
  129.                     rightTable.add(secondPar);   
  130.                 }   
  131.             }   
  132.             logger.info("tb_dim_city:"+leftTable.toString());   
  133.             logger.info("tb_user_profiles:"+rightTable.toString());   
  134. for(Text leftPart : leftTable){   
  135. for(Text rightPart : rightTable){   
  136.                     output.set(leftPart+ "\t" + rightPart);   
  137.                     context.write(key, output);   
  138.                 }   
  139.             }   
  140.         }   
  141.     }   
  142. @Override
  143. publicint run(String[] args) throws Exception {   
  144.           Configuration conf=getConf(); //獲得配置檔案物件 
  145.             Job job=new Job(conf,"LeftOutJoinMR");   
  146.             job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
  147.             FileInputFormat.addInputPath(job, new Path(args[0])); //設定map輸入檔案路徑 
  148.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //設定reduce輸出檔案路徑
  149.             job.setMapperClass(LeftOutJoinMapper.class);   
  150.             job.setReducerClass(LeftOutJoinReducer.class);
  151.             job.setInputFormatClass(TextInputFormat.class); //設定檔案輸入格式 
  152.             job.setOutputFormatClass(TextOutputFormat.class);//使用預設的output格格式
  153. //設定map的輸出key和value型別 
  154.             job.setMapOutputKeyClass(Text.class);   
  155.             job.setMapOutputValueClass(CombineValues.class);
  156. //設定reduce的輸出key和value型別 
  157.             job.setOutputKeyClass(Text.class);   
  158.             job.setOutputValueClass(Text.class);   
  159.             job.waitForCompletion(true);   
  160. return job.isSuccessful()?0:1;   
  161.     }   
  162. publicstaticvoid main(String[] args) throws IOException,   
  163.             ClassNotFoundException, InterruptedException {   
  164. try {   
  165. int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   
  166.             System.exit(returnCode);   
  167.         } catch (Exception e) {   
  168. // TODO Auto-generated catch block 
  169.             logger.error(e.getMessage());   
  170.         }   
  171.     }   

其中具體的分析以及資料的輸出輸入請看程式碼中的註釋已經寫得比較清楚了,這裡主要分析一下reduce join的一些不足。之所以會存在reduce join這種方式,我們可以很明顯的看出原:因為整體資料被分割了,每個map task只處理一部分資料而不能夠獲取到所有需要的join欄位,因此我們需要在講join key作為reduce端的分組將所有join key相同的記錄集中起來進行處理,所以reduce join這種方式就出現了。這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現大量的資料傳輸,效率很低。

2、在Map端進行連線。

使用場景:一張表十分小、一張表很大。

用法:在提交作業的時候先將小表文件放到該作業的DistributedCache中,然後從DistributeCache中取出該小表進行join key / value解釋分割放到記憶體中(可以放大Hash Map等等容器中)。然後掃描大表,看大表中的每條記錄的join key /value值是否能夠在記憶體中找到相同join key的記錄,如果有則直接輸出結果。

直接上程式碼,比較簡單:

  1. package com.mr.mapSideJoin;   
  2. import java.io.BufferedReader;   
  3. import java.io.FileReader;   
  4. import java.io.IOException;   
  5. import java.util.HashMap;   
  6. import org.apache.hadoop.conf.Configuration;   
  7. import org.apache.hadoop.conf.Configured;   
  8. import org.apache.hadoop.filecache.DistributedCache;   
  9. import org.apache.hadoop.fs.Path;   
  10. import org.apache.hadoop.io.Text;   
  11. import org.apache.hadoop.mapreduce.Job;   
  12. import org.apache.hadoop.mapreduce.Mapper;   
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  17. import org.apache.hadoop.util.Tool;   
  18. import org.apache.hadoop.util.ToolRunner;   
  19. import org.slf4j.Logger;   
  20. import org.slf4j.LoggerFactory;   
  21. /**   
  22.  * @author zengzhaozheng   
  23.  *   
  24.  * 用途說明:   
  25.  * Map side join中的left outer join   
  26.  * 左連線,兩個檔案分別代表2個表,連線欄位table1的id欄位和table2的cityID欄位   
  27.  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),   
  28.  * 假設tb_dim_city檔案記錄數很少,tb_dim_city.dat檔案內容,分隔符為"|":   
  29.  * id     name  orderid  city_code  is_show   
  30.  * 0       其他        9999     9999         0   
  31.  * 1       長春        1        901          1   
  32.  * 2       吉林        2        902          1   
  33.  * 3       四平        3        903          1   
  34.  * 4       松原        4        904          1   
  35.  * 5       通化        5        905          1   
  36.  * 6       遼源        6        906          1   
  37.  * 7       白城        7        907          1   
  38.  * 8       白山        8        908          1   
  39.  * 9       延吉        9        909          1   
  40.  * -------------------------風騷的分割線-------------------------------   
  41.  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
  42.  * tb_user_profiles.dat檔案內容,分隔符為"|":   
  43.  * userID   network     flow    cityID   
  44.  * 1           2G       123      1   
  45.  * 2           3G       333      2   
  46.  * 3           3G       555      1   
  47.  * 4           2G       777      3   
  48.  * 5           3G       666      4   
  49.  * -------------------------風騷的分割線-------------------------------   
  50.  *  結果:   
  51.  *  1   長春  1   901 1   1   2G  123   
  52.  *  1   長春  1   901 1   3   3G  555   
  53.  *  2   吉林  2   902 1   2   3G  333   
  54.  *  3   四平  3   903 1   4   2G  777   
  55.  *  4   松原  4   904 1   5   3G  666   
  56.  */
  57. publicclass MapSideJoinMain extends Configured implements Tool{   
  58. privatestaticfinal Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
  59. publicstaticclass LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {
  60. private HashMap<String,String> city_info = new HashMap<String, String>();   
  61. private Text outPutKey = new Text();   
  62. private Text outPutValue = new Text();   
  63. private String mapInputStr = null;   
  64. private String mapInputSpit[] = null;   
  65. private String city_secondPart = null;   
  66. /**   
  67.          * 此方法在每個task開始之前執行,這裡主要用作從DistributedCache   
  68.          * 中取到tb_dim_city檔案,並將裡邊記錄取出放到記憶體中。   
  69.          */
  70. @Override
  71. protectedvoid setup(Context context)   
  72. throws IOException, InterruptedException {   
  73.             BufferedReader br = null;   
  74. //獲得當前作業的DistributedCache相關檔案 
  75.             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
  76.             String cityInfo = null;   
  77. for(Path p : distributePaths){   
  78. if(p.toString().endsWith("tb_dim_city.dat")){   
  79. //讀快取檔案,並放到mem中 
  80.                     br = new BufferedReader(new FileReader(p.toString()));   
  81. while(null!=(cityInfo=br.readLine())){   
  82.                         String[] cityPart = cityInfo.split("\\|",5);   
  83. if(cityPart.length ==5){   
  84.                             city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   
  85.                         }   
  86.                     }   
  87.                 }   
  88.             }   
  89.         }
  90. /**   
  91.          * Map端的實現相當簡單,直接判斷tb_user_profiles.dat中的   
  92.          * cityID是否存在我的map中就ok了,這樣就可以實現Map Join了   
  93.          */
  94. @Override
  95. protectedvoid map(Object key, Text value, Context context)   
  96. throws IOException, InterruptedException {   
  97. //排掉空行 
  98. if(value == null || value.toString().equals("")){   
  99. return;   
  100.             }   
  101.             mapInputStr = value.toString();   
  102.             mapInputSpit = mapInputStr.split("\\|",4);   
  103. //過濾非法記錄 
  104. if(mapInputSpit.length != 4){   
  105. return;   
  106.             }   
  107. //判斷連結欄位是否在map中存在 
  108.             city_secondPart = city_info.get(mapInputSpit[3]);   
  109. if(city_secondPart != null){   
  110. this.outPutKey.set(mapInputSpit[3]);   
  111. this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   
  112.                 context.write(outPutKey, outPutValue);   
  113.             }   
  114.         }   
  115.     }   
  116. @Override
  117. publicint run(String[] args) throws Exception {   
  118.             Configuration conf=getConf(); //獲得配置檔案物件 
  119.             DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//為該job新增快取檔案 
  120.             Job job=new Job(conf,"MapJoinMR");   
  121.             job.setNumReduceTasks(0);
  122.             FileInputFormat.addInputPath(job, new Path(args[0])); //設定map輸入檔案路徑 
  123.             FileOutputFormat.setOutputPath(job, new Path(args[2])); //設定reduce輸出檔案路徑
  124.             job.setJarByClass(MapSideJoinMain.class);   
  125.             job.setMapperClass(LeftOutJoinMapper.class);
  126.             job.setInputFormatClass(TextInputFormat.class); //設定檔案輸入格式 
  127.             job.setOutputFormatClass(TextOutputFormat.class);//使用預設的output格式
  128. //設定map的輸出key和value型別 
  129.             job.setMapOutputKeyClass(Text.class);
  130. //設定reduce的輸出key和value型別 
  131.             job.setOutputKeyClass(Text.class);   
  132.             job.setOutputValueClass(Text.class);   
  133.             job.waitForCompletion(true);   
  134. return job.isSuccessful()?0:1;   
  135.     }   
  136. publicstaticvoid main(String[] args) throws IOException,   
  137.             ClassNotFoundException, InterruptedException {   
  138. try {   
  139. int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
  140.             System.exit(returnCode);   
  141.         } catch (Exception e) {   
  142. // TODO Auto-generated catch block 
  143.             logger.error(e.getMessage());   
  144.         }   
  145.     }   

這裡說說DistributedCache。DistributedCache是分散式快取的一種實現,它在整個MapReduce框架中起著相當重要的作用,他可以支撐我們寫一些相當複雜高效的分散式程式。說回到這裡,JobTracker在作業啟動之前會獲取到DistributedCache的資源uri列表,並將對應的檔案分發到各個涉及到該作業的任務的TaskTracker上。另外,關於DistributedCache和作業的關係,比如許可權、儲存路徑區分、public和private等屬性,接下來有用再整理研究一下寫一篇blog,這裡就不詳細說了。

另外還有一種比較變態的Map Join方式,就是結合HBase來做Map Join操作。這種方式完全可以突破記憶體的控制,使你毫無忌憚的使用Map Join,而且效率也非常不錯。

3、SemiJoin。

SemiJoin就是所謂的半連線,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些資料,在網路中只傳輸參與連線的資料不參與連線的資料不必在網路中進行傳輸,從而減少了shuffle的網路傳輸量,使整體效率得到提高,其他思想和reduce join是一模一樣的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來通過DistributedCach分發到相關節點,然後將其取出放到記憶體中(可以放到HashSet中),在map階段掃描連線表,將join key不在記憶體HashSet中的記錄過濾掉,讓那些參與join的記錄通過shuffle傳輸到reduce端進行join操作,其他的和reduce join都是一樣的。看程式碼:

  1. package com.mr.SemiJoin;   
  2. import java.io.BufferedReader;   
  3. import java.io.FileReader;   
  4. import java.io.IOException;   
  5. import java.util.ArrayList;   
  6. import java.util.HashSet;   
  7. import org.apache.hadoop.conf.Configuration;   
  8. import org.apache.hadoop.conf.Configured;   
  9. import org.apache.hadoop.filecache.DistributedCache;   
  10. import org.apache.hadoop.fs.Path;   
  11. import org.apache.hadoop.io.Text;   
  12. import org.apache.hadoop.mapreduce.Job;   
  13. import org.apache.hadoop.mapreduce.Mapper;   
  14. import org.apache.hadoop.mapreduce.Reducer;   
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  16. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  17. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  20. import org.apache.hadoop.util.Tool;   
  21. import org.apache.hadoop.util.ToolRunner;   
  22. import org.slf4j.Logger;   
  23. import org.slf4j.LoggerFactory;   
  24. /**   
  25.  * @author zengzhaozheng   
  26.  *   
  27.  * 用途說明:   
  28.  * reudce side join中的left outer join   
  29.  * 左連線,兩個檔案分別代表2個表,連線欄位table1的id欄位和table2的cityID欄位   
  30.  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
  31.  * tb_dim_city.dat檔案內容,分隔符為"|":   
  32.  * id     name  orderid  city_code  is_show   
  33.  * 0       其他        9999     9999         0   
  34.  * 1       長春        1        901          1   
  35.  * 2       吉林        2        902          1   
  36.  * 3       四平        3        903          1   
  37.  * 4       松原        4        904          1   
  38.  * 5       通化        5        905          1   
  39.  * 6       遼源        6        906          1   
  40.  * 7       白城        7        907          1   
  41.  * 8       白山        8        908          1   
  42.  * 9       延吉        9        909          1   
  43.  * -------------------------風騷的分割線-------------------------------   
  44.  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
  45.  * tb_user_profiles.dat檔案內容,分隔符為"|":   
  46.  * userID   network     flow    cityID   
  47.  * 1           2G       123      1   
  48.  * 2           3G       333      2   
  49.  * 3           3G       555      1   
  50.  * 4           2G       777      3   
  51.  * 5           3G       666      4   
  52.  * -------------------------風騷的分割線-------------------------------   
  53.  * joinKey.dat內容:   
  54.  * city_code   
  55.  * 1   
  56.  * 2   
  57.  * 3   
  58.  * 4   
  59.  * -------------------------風騷的分割線-------------------------------   
  60.  *  結果:   
  61.  *  1   長春  1   901 1   1   2G  123   
  62.  *  1   長春  1   901 1   3   3G  555   
  63.  *  2   吉林  2   902 1   2   3G  333   
  64.  *  3   四平  3   903 1   4   2G  777   
  65.  *  4   松原  4   904 1   5   3G  666   
  66.  */
  67. publicclass SemiJoin extends Configured implements Tool{   
  68. privatestaticfinal Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
  69. publicstaticclass SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
  70. private CombineValues combineValues = new CombineValues();   
  71. private HashSet<String> joinKeySet = new HashSet<String>();   
  72. private Text flag = new Text();   
  73. private Text joinKey = new Text();   
  74. private Text secondPart = new Text();   
  75. /**   
  76.          * 將參加join的key從DistributedCache取出放到記憶體中,以便在map端將要參加join的key過濾出來。b   
  77.          */
  78. @Override
  79. protectedvoid setup(Context context)   
  80. throws IOException, InterruptedException {   
  81.             BufferedReader br = null;   
  82. //獲得當前作業的DistributedCache相關檔案 
  83.             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
  84.             String joinKeyStr = null;   
  85. for(Path p : distributePaths){   
  86. if(p.toString().endsWith("joinKey.dat")){   
  87. //讀快取檔案,並放到mem中 
  88.                     br = new BufferedReader(new FileReader(p.toString()));   
  89. while(null!=(joinKeyStr=br.readLine())){   
  90.                         joinKeySet.add(joinKeyStr);   
  91.                     }   
  92.                 }   
  93.             }   
  94.         }   
  95. @Override
  96. protectedvoid map(Object key, Text value, Context context)   
  97. throws IOException, InterruptedException {   
  98. //獲得檔案輸入路徑 
  99.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
  100. //資料來自tb_dim_city.dat檔案,標誌即為"0" 
  101. if(pathName.endsWith("tb_dim_city.dat")){   
  102.                 String[] valueItems = value.toString().split("\\|");   
  103. //過濾格式錯誤的記錄 
  104. if(valueItems.length != 5){   
  105. return;   
  106.                 }   
  107. //過濾掉不需要參加join的記錄 
  108. if(joinKeySet.contains(valueItems[0])){   
  109.                     flag.set("0");   
  110.                     joinKey.set(valueItems[0]);   
  111.                     secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
  112.                     combineValues.setFlag(flag);   
  113.                     combineValues.setJoinKey(joinKey);   
  114.                     combineValues.setSecondPart(secondPart);   
  115.                     context.write(combineValues.getJoinKey(), combineValues);   
  116.                 }else{   
  117. return ;   
  118.                 }   
  119.             }//資料來自於tb_user_profiles.dat,標誌即為"1" 
  120. elseif(pathName.endsWith("tb_user_profiles.dat")){   
  121.                 String[] valueItems = value.toString().split("\\|");   
  122. //過濾格式錯誤的記錄 
  123. if(valueItems.length != 4){   
  124. return;   
  125.                 }   
  126. //過濾掉不需要參加join的記錄 
  127. if(joinKeySet.contains(valueItems[3])){   
  128.                     flag.set("1");   
  129.                     joinKey.set(valueItems[3]);   
  130.                     secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
  131.                     combineValues.setFlag(flag);   
  132.                     combineValues.setJoinKey(joinKey);   
  133.                     combineValues.setSecondPart(secondPart);   
  134.                     context.write(combineValues.getJoinKey(), combineValues);   
  135.                 }else{   
  136. return ;   
  137.                 }   
  138.             }   
  139.         }   
  140.     }   
  141. publicstaticclass SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
  142. //儲存一個分組中的左表資訊