1. 程式人生 > >MapReduce中的join演算法-reduce端join

MapReduce中的join演算法-reduce端join

   在海量資料的環境下,不可避免的會碰到join需求, 例如在資料分析時需要連線從不同的資料來源中獲取到資料。 假設有兩個資料集:氣象站資料庫和天氣記錄資料庫,並考慮如何合二為一。 一個典型的查詢是:輸出氣象站的歷史資訊,同時各行記錄也包含氣象站的元資料資訊。
氣象站和天氣記錄的示例資料分別如下所示:
Station ID            Station Name 011990-99999    SIHCCAJAVRI 012650-99999    TRNSET-HANSMOEN Station ID            Timestamp    Temperature 012650-99999    194903241200    111
012650-99999    194903241800    78 011990-99999    195005150700    0 011990-99999    195005151200    22 011990-99999    195005151800    -11 氣象站和天氣記錄合併之後的示意圖如下所示。 Station ID    Station Name    Timestamp    Temperature 011990-99999    SIHCCAJAVRI    195005150700    0 011990-99999    SIHCCAJAVRI    195005151200    22 011990-99999    SIHCCAJAVRI    195005151800    -11
012650-99999    TYNSET-HANSMOEN    194903241200    111 012650-99999    TYNSET-HANSMOEN    194903241800    78 Reducer端連線:
基本思路是 mapper 為各個記錄標記源,並且使用連線鍵作為 map 輸出鍵,使鍵相同的記錄放在同一 reducer 中。

我們通過下面兩種技術實現 reduce 端連線。

        1、多輸入

        資料集的輸入源往往有多種格式,因此可以使用 MultipleInputs 類來方便地解析和標註各個資料來源。

        2、二次排序

reducer 將兩個資料來源中選出鍵相同的記錄並不介意這些記錄是否已排好序。此外,為了更好地執行連線操作,先將某一個數據源傳輸到 reducer 會非常重要
。 
以上面的天氣資料連線為例,當天氣記錄傳送到 reducer 的時候,與這些記錄有相同鍵的氣象站資訊最好也已經放在 reducer ,使得 reducer 能夠將氣象站名稱填到天氣記錄之中就馬上輸出。 
雖然也可以不指定資料傳輸次序,並將待處理的記錄快取在記憶體之中,但應該儘量避免這種情況,因為其中任何一組的記錄數量可能非常龐大,遠遠超出 reducer 的可用記憶體容量。 
因此我們用到二次排序技術,對 map 階段輸出的每個鍵的值進行排序,實現這一效果

--------------------------------------------------------------------------------------- 程式示例: [主類] publicclassJoinRecordWithStationName extends Configured implements Tool{      /**       * 氣象站 mapper 標記為“0”,先到達reducer       */      publicstaticclass JoinStationMapper extends Mapper< LongWritable,Text,TextPair,Text>{           protectedvoid map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{              String line = value.toString();              String[] arr = line.split( "\\s+" );//解析氣象站資料 012650-99999194903241200  111              int length = arr.length ;            if (length==2){//滿足這種資料格式                 //key=氣象站id  value=氣象站名稱                  context.write( new TextPair(arr[0],"0" ),new Text(arr[1]));              }          }      }      /**       * 天氣記錄 mapper 標記為“1”,後到達reducer       */      publicstaticclass JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>{          protectedvoid map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{           String line = value.toString();           String[] arr = line.split( "\\s+" ,2);//解析天氣記錄資料            int length = arr.length ;            if (length==2){                 //key=氣象站id  value=天氣記錄資料                context.write( new TextPair(arr[0],"1" ),new Text(arr[1]));           }           }      }      /**       *通過上面的分組,將相同的氣象站id分到同一個reducer中進行輸出
      *
      *由於 TextPair 經過了二次排序,所以 reducer 會先接收到氣象站資料。      *因此從中抽取氣象站名稱,並將其作為後續每條輸出記錄的一部分寫到輸出檔案       */      publicstaticclass JoinReducer extends Reducer< TextPair,Text,Text,Text>{          protectedvoid reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{              Iterator< Text> iter = values.iterator();              Text stationName = new Text(iter.next());//氣象站名稱   SIHCCAJAVRI              while (iter.hasNext()){                  Text record = iter.next(); //天氣記錄的每條資料   195005151200   22                  Text outValue = new Text(stationName.toString()+"\t" +record.toString());                  context.write(key.getFirst(),outValue);              }          }             }

 //輸出結果: //   011990-99999  SIHCCAJAVRI   195005151800  -11 //   011990-99999  SIHCCAJAVRI   195005151200  22 //   011990-99999  SIHCCAJAVRI   195005150700  0 //   012650-99999  TRNSET-HANSMOEN    194903241800  78 //   012650-99999  TRNSET-HANSMOEN    194903241200  111      /**       * 自定義分割槽方法,將氣象站id相同的記錄分到相同的reducer中       *       */      publicstaticclass KeyPartitioner extends Partitioner< TextPair,Text>{            publicint getPartition(TextPair key,Text value,int numPartitions){                //根據氣象站id進行選擇分割槽,而不是組合鍵的整體                return (key.getFirst().hashCode()&Integer.MAX_VALUE ) % numPartitions;           }      }      /**       * 自定義分組,將氣象站id相同的key放到同一個reducer中執行。然後再通過TextPair進行內部比較排序       */      publicstaticclass GroupingComparator extends WritableComparator{            public GroupingComparator() {                super (TextPair.class,true);           }            @SuppressWarnings ("rawtypes" )            @Override            publicint compare(WritableComparable w1, WritableComparable w2) {               TextPair tp1=(TextPair)w1;               TextPair tp2=(TextPair)w2;               Text f1= tp1.getFirst();               Text f2= tp2.getFirst();                return f1.compareTo(f2);           }      }      @Override      publicint run(String[] args) throws Exception{           Configuration conf = new Configuration();// 讀取配置檔案           Job job = Job. getInstance();// 新建一個任務           job.setJarByClass(JoinRecordWithStationName. class );// 主類           Path recordInputPath = new Path(args[0]);//天氣記錄資料來源           Path stationInputPath = new Path(args[1]);//氣象站資料來源           Path outputPath = new Path(args[2]);//輸出路徑            //如果輸出路徑存在就刪除           FileSystem fs = outputPath.getFileSystem(conf);            if (fs.isDirectory(outputPath)){               fs.delete(outputPath, true );           }           MultipleInputs. addInputPath(job,recordInputPath,TextInputFormat. class ,JoinRecordMapper.class);//讀取天氣記錄Mapper    MultipleInputs. addInputPath(job,stationInputPath,TextInputFormat. class ,JoinStationMapper.class);//讀取氣象站Mapper           FileOutputFormat. setOutputPath(job,outputPath);           job.setPartitionerClass(KeyPartitioner. class );//自定義分割槽           job.setGroupingComparatorClass(GroupingComparator. class );//自定義分組           job.setMapOutputKeyClass(TextPairclass );           job.setReducerClass(JoinReducer. class );// Reducer           job.setOutputKeyClass(Text. class );            return job.waitForCompletion(true)?0:1;           }            publicstaticvoid main(String[] args) throws Exception{               String args0[]={                          "hdfs://yun-11:9000/join/records.txt" ,                          "hdfs://yun-11:9000/join/station.txt" ,                          "hdfs://yun-11:9000/join/out"      };                int exitCode = ToolRunner.runnew JoinRecordWithStationName(),args0);               System. exit(exitCode);      } } 自定義組合鍵:TextPair  我們使用 TextPair 類構建組合鍵,包括氣象站 ID 和 “標記”。在這裡,“標記” 是一個虛擬的欄位,其唯一目的是對記錄排序,使氣象站記錄比天氣記錄先到達 一種簡單的做法就是:對於氣象站記錄, “標記” 的值設為 0;對於天氣記錄,“標記” 的值設為1 ------------------------------------------------------------------------------------------------------------------------------ package com.tan.join1; import java.io.*; import org.apache.hadoop.io.*; publicclassTextPair implements WritableComparable<TextPair> {      private   Text first//Text 型別的例項變數 first --氣象站id      private   Text second//Text 型別的例項變數 second--標記符號      public TextPair() {           set( new Text(),new Text());      }      public TextPair(String first, String second) {           set( new Text(first),new Text(second));      }      public TextPair(Text first, Text second) {           set(first, second);      }      publicvoid set(Text first, Text second) {            this .first = first;            this .second = second;      }      public Text getFirst() {            returnfirst ;      }      public Text getSecond() {            returnsecond ;      }      //將物件轉換為位元組流並寫入到輸出流out中      @Override      publicvoid write(DataOutput out)throws IOException {            first.write(out);            second.write(out);      }      //從輸入流in中讀取位元組流反序列化為物件      @Override      publicvoid readFields(DataInput in)throws IOException {            first.readFields(in);            second.readFields(in);      }      @Override      publicint hashCode() {            returnfirst .hashCode() *163+ second.hashCode();      }      @Override      publicboolean equals(Object o) {            if (o instanceof TextPair) {               TextPair tp = (TextPair) o;                returnfirst .equals(tp.first) && second .equals(tp.second );           }                returnfalse ;      }      @Override      public String toString() {            returnfirst +"\t" + second ;      }      //排序      @Override

相關推薦

MapReducejoin演算法-reducejoin

   在海量資料的環境下,不可避免的會碰到join需求, 例如在資料分析時需要連線從不同的資料來源中獲取到資料。 假設有兩個資料集:氣象站資料庫和天氣記錄資料庫,並考慮如何合二為一。 一個典型的查詢是:輸出氣象站的歷史資訊,同時各行記錄也包含氣象站的元資料資訊。 氣象站

Reducejoin演算法實現 - (訂單跟商品)

程式碼地址: https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/rjon 現在有兩張表 1.訂單表 2.商品表 訂單資料表t_order: id

MapReduce表連線操作之Reducejoin

一:背景 Reduce端連線比Map端連線更為普遍,因為輸入的資料不需要特定的結構,但是效率比較低,因為所有資料都必須經過Shuffle過程。 二:技術實現 基本思路 (1):Map端讀取所有的檔案,並在輸出的內容里加上標示,代表資料是從哪個檔案裡來的。 (2):在red

hadoop streaming reducejoin的python兩種實現方式

實現student和course資料表的join操作,以學生編號(sno)為連線欄位 測試資料 student.txt檔案 #以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jac

Hadoop應用——ReduceJoin操作

聯接 使用案例 Table EMP: Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Ta

mapreducemap和reduce個數

case when 生成 task 輸入 slots align reducer 進行 很多 一、 控制hive任務中的map數: 1. 通常情況下,作業會通過input的目錄產生一個或者多個map任務。 主要的決定因素有: input的文件總個數,input的

mapreducemap和reduce的最大併發數量設定

     reduce數量究竟多少是適合的。目前測試認為reduce數量約等於cluster中datanode的總cores的一半比較合適,比如cluster中有32臺datanode,每臺8 core,那麼reduce設定為128速度最快。因為每臺機器8 core,4個作m

MapReduce計算Wordcountmapreduce的設定

map端的設定:  package wordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import or

mapreducejoin演算法程式設計案例

mapreduce程式設計案例 map端的join演算法 1、原理闡述 適用於關聯表中有小表的情形,可以將小表傳送到所有的map節點,這樣map節點就可以在本地對自己讀到的大表資料進行join並輸出最終結果,可以大大提高join操作的併發度,加快處理速度 2、例項: 兩表

MySQLJoin演算法實現原理通俗易懂

注意:MySQL 5.1.18之後的版本中才會體現出來 一:原理: 在MySQL 中,只有一種 Join 演算法,就是大名鼎鼎的 Nested Loop Join,他沒有其他很多資料庫所提供的 Hash Join,也沒有 Sort Merge Join。 1.Nested Loop

mapjoin的實現 ,用來解決小表資料的讀取

通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作 1、需求: 訂單資料表t_order: id date pid amount 1001 20150710 P0001 2

MapReducejoin演算法案例實現

1、需求:訂單資料表t_order:id date pid amount 1001 20150710 P0001 2 1002 20150710 P0001 3 1002 20150710 P0002 3 商品資訊表t_productid pname

mapreduce——join演算法的程式碼實現

需求:有user資料檔案:user.csv u001,senge,18,angelababy u002,laozhao,48

DataStagemerge、lookup、join的區別與聯系

處理過程 key 占用內存 效率 功能 要求 過程 事實表 lookup 三者功能類似,都可以將表連接起來進行輸出。 區別主要體現在性能上。 lookup就是一個表在另一個表中找,處理過程都在內存進行,因此占用內存較多,一般大事實表和小緯表用這種方式關聯效率高。 merge

mapreducereduce的叠代器只能調用一次!

new resultset row reducer style prot category nds 重復 親測,只能調用一次,如果想想在一次reduce重復使用叠代器中的數據,得先取出來放在list中然後在從list中取出來!!多次讀取reduce函數中叠代器的數據

mapjoin

path auth not config 單表 mapreduce == 書包 task package my.hadoop.hdfs.mapreduceJoin; import java.io.BufferedReader; import java.io.FileIn

Java Thread的sleep、join方法解析

開始 system sleep main gen 解析 等待時間 calling trace 1.Thread中sleep方法作用是使當前線程等待,其他線程開始執行,如果有線程鎖,sleep不會讓出鎖 沒有加鎖代碼如下: public class Synchronized

Case:update把in改寫成join效能提高數倍

(1)優化前 如下一條SQL,把從1985-05-21入職前的員工薪資都增加500,執行約20.70 s, 從執行計劃中可以看出對錶salaries進行的是索引全掃描,掃描行數約260W行。 mysql> update salaries set sala

Spark專案實戰-資料傾斜解決方案之將reduce join轉換為map join

一、reduce端join操作原理 二、map端join操作原理  三、適用場景 如果兩個RDD要進行join,其中一個RDD是比較小的。一個RDD是100萬資料,一個RDD是1萬資料。(一個RDD是1億資料,一個RDD是100萬資料) 其中一個RDD必須是比較

大資料教程(9.6)mapjoin實現

        上一篇文章講了mapreduce配合實現join,本節博主將講述在map端的join實現;         一、需求     &n