MapReduce中的join演算法-reduce端join
氣象站和天氣記錄的示例資料分別如下所示:
Station ID Station Name 011990-99999 SIHCCAJAVRI 012650-99999 TRNSET-HANSMOEN Station ID Timestamp Temperature 012650-99999 194903241200 111
基本思路是 mapper 為各個記錄標記源,並且使用連線鍵作為 map 輸出鍵,使鍵相同的記錄放在同一 reducer 中。
我們通過下面兩種技術實現 reduce 端連線。
1、多輸入
資料集的輸入源往往有多種格式,因此可以使用 MultipleInputs 類來方便地解析和標註各個資料來源。
2、二次排序
reducer 將兩個資料來源中選出鍵相同的記錄並不介意這些記錄是否已排好序。此外,為了更好地執行連線操作,先將某一個數據源傳輸到 reducer 會非常重要。
以上面的天氣資料連線為例,當天氣記錄傳送到 reducer 的時候,與這些記錄有相同鍵的氣象站資訊最好也已經放在 reducer ,使得 reducer 能夠將氣象站名稱填到天氣記錄之中就馬上輸出。
雖然也可以不指定資料傳輸次序,並將待處理的記錄快取在記憶體之中,但應該儘量避免這種情況,因為其中任何一組的記錄數量可能非常龐大,遠遠超出 reducer 的可用記憶體容量。
因此我們用到二次排序技術,對 map 階段輸出的每個鍵的值進行排序,實現這一效果。
--------------------------------------------------------------------------------------- 程式示例: [主類] publicclassJoinRecordWithStationName extends Configured implements Tool{ /** * 氣象站 mapper 標記為“0”,先到達reducer */ publicstaticclass JoinStationMapper extends Mapper< LongWritable,Text,TextPair,Text>{ protectedvoid map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] arr = line.split( "\\s+" );//解析氣象站資料 012650-99999194903241200 111 int length = arr.length ; if (length==2){//滿足這種資料格式 //key=氣象站id value=氣象站名稱 context.write( new TextPair(arr[0],"0" ),new Text(arr[1])); } } } /** * 天氣記錄 mapper 標記為“1”,後到達reducer */ publicstaticclass JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>{ protectedvoid map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] arr = line.split( "\\s+" ,2);//解析天氣記錄資料 int length = arr.length ; if (length==2){ //key=氣象站id value=天氣記錄資料 context.write( new TextPair(arr[0],"1" ),new Text(arr[1])); } } } /** *通過上面的分組,將相同的氣象站id分到同一個reducer中進行輸出
* *由於 TextPair 經過了二次排序,所以 reducer 會先接收到氣象站資料。 *因此從中抽取氣象站名稱,並將其作為後續每條輸出記錄的一部分寫到輸出檔案 */ publicstaticclass JoinReducer extends Reducer< TextPair,Text,Text,Text>{ protectedvoid reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{ Iterator< Text> iter = values.iterator(); Text stationName = new Text(iter.next());//氣象站名稱 SIHCCAJAVRI while (iter.hasNext()){ Text record = iter.next(); //天氣記錄的每條資料 195005151200 22 Text outValue = new Text(stationName.toString()+"\t" +record.toString()); context.write(key.getFirst(),outValue); } } }
//輸出結果: // 011990-99999 SIHCCAJAVRI 195005151800 -11 // 011990-99999 SIHCCAJAVRI 195005151200 22 // 011990-99999 SIHCCAJAVRI 195005150700 0 // 012650-99999 TRNSET-HANSMOEN 194903241800 78 // 012650-99999 TRNSET-HANSMOEN 194903241200 111 /** * 自定義分割槽方法,將氣象站id相同的記錄分到相同的reducer中 * */ publicstaticclass KeyPartitioner extends Partitioner< TextPair,Text>{ publicint getPartition(TextPair key,Text value,int numPartitions){ //根據氣象站id進行選擇分割槽,而不是組合鍵的整體 return (key.getFirst().hashCode()&Integer.MAX_VALUE ) % numPartitions; } } /** * 自定義分組,將氣象站id相同的key放到同一個reducer中執行。然後再通過TextPair進行內部比較排序 */ publicstaticclass GroupingComparator extends WritableComparator{ public GroupingComparator() { super (TextPair.class,true); } @SuppressWarnings ("rawtypes" ) @Override publicint compare(WritableComparable w1, WritableComparable w2) { TextPair tp1=(TextPair)w1; TextPair tp2=(TextPair)w2; Text f1= tp1.getFirst(); Text f2= tp2.getFirst(); return f1.compareTo(f2); } } @Override publicint run(String[] args) throws Exception{ Configuration conf = new Configuration();// 讀取配置檔案 Job job = Job. getInstance();// 新建一個任務 job.setJarByClass(JoinRecordWithStationName. class );// 主類 Path recordInputPath = new Path(args[0]);//天氣記錄資料來源 Path stationInputPath = new Path(args[1]);//氣象站資料來源 Path outputPath = new Path(args[2]);//輸出路徑 //如果輸出路徑存在就刪除 FileSystem fs = outputPath.getFileSystem(conf); if (fs.isDirectory(outputPath)){ fs.delete(outputPath, true ); } MultipleInputs. addInputPath(job,recordInputPath,TextInputFormat. class ,JoinRecordMapper.class);//讀取天氣記錄Mapper MultipleInputs. addInputPath(job,stationInputPath,TextInputFormat. class ,JoinStationMapper.class);//讀取氣象站Mapper FileOutputFormat. setOutputPath(job,outputPath); job.setPartitionerClass(KeyPartitioner. class );//自定義分割槽 job.setGroupingComparatorClass(GroupingComparator. class );//自定義分組 job.setMapOutputKeyClass(TextPair. class ); job.setReducerClass(JoinReducer. class );// Reducer job.setOutputKeyClass(Text. class ); return job.waitForCompletion(true)?0:1; } publicstaticvoid main(String[] args) throws Exception{ String args0[]={ "hdfs://yun-11:9000/join/records.txt" , "hdfs://yun-11:9000/join/station.txt" , "hdfs://yun-11:9000/join/out" }; int exitCode = ToolRunner.run( new JoinRecordWithStationName(),args0); System. exit(exitCode); } } 自定義組合鍵:TextPair 我們使用 TextPair 類構建組合鍵,包括氣象站 ID 和 “標記”。在這裡,“標記” 是一個虛擬的欄位,其唯一目的是對記錄排序,使氣象站記錄比天氣記錄先到達。 一種簡單的做法就是:對於氣象站記錄, “標記” 的值設為 0;對於天氣記錄,“標記” 的值設為1 ------------------------------------------------------------------------------------------------------------------------------ package com.tan.join1; import java.io.*; import org.apache.hadoop.io.*; publicclassTextPair implements WritableComparable<TextPair> { private Text first; //Text 型別的例項變數 first --氣象站id private Text second; //Text 型別的例項變數 second--標記符號 public TextPair() { set( new Text(),new Text()); } public TextPair(String first, String second) { set( new Text(first),new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } publicvoid set(Text first, Text second) { this .first = first; this .second = second; } public Text getFirst() { returnfirst ; } public Text getSecond() { returnsecond ; } //將物件轉換為位元組流並寫入到輸出流out中 @Override publicvoid write(DataOutput out)throws IOException { first.write(out); second.write(out); } //從輸入流in中讀取位元組流反序列化為物件 @Override publicvoid readFields(DataInput in)throws IOException { first.readFields(in); second.readFields(in); } @Override publicint hashCode() { returnfirst .hashCode() *163+ second.hashCode(); } @Override publicboolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; returnfirst .equals(tp.first) && second .equals(tp.second ); } returnfalse ; } @Override public String toString() { returnfirst +"\t" + second ; } //排序 @Override
相關推薦
MapReduce中的join演算法-reduce端join
在海量資料的環境下,不可避免的會碰到join需求, 例如在資料分析時需要連線從不同的資料來源中獲取到資料。 假設有兩個資料集:氣象站資料庫和天氣記錄資料庫,並考慮如何合二為一。 一個典型的查詢是:輸出氣象站的歷史資訊,同時各行記錄也包含氣象站的元資料資訊。 氣象站
Reduce端join演算法實現 - (訂單跟商品)
程式碼地址: https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/rjon 現在有兩張表 1.訂單表 2.商品表 訂單資料表t_order: id
MapReduce表連線操作之Reduce端join
一:背景 Reduce端連線比Map端連線更為普遍,因為輸入的資料不需要特定的結構,但是效率比較低,因為所有資料都必須經過Shuffle過程。 二:技術實現 基本思路 (1):Map端讀取所有的檔案,並在輸出的內容里加上標示,代表資料是從哪個檔案裡來的。 (2):在red
hadoop streaming reduce端join的python兩種實現方式
實現student和course資料表的join操作,以學生編號(sno)為連線欄位 測試資料 student.txt檔案 #以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jac
Hadoop應用——Reduce端Join操作
聯接 使用案例 Table EMP: Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Ta
mapreduce中map和reduce個數
case when 生成 task 輸入 slots align reducer 進行 很多 一、 控制hive任務中的map數: 1. 通常情況下,作業會通過input的目錄產生一個或者多個map任務。 主要的決定因素有: input的文件總個數,input的
mapreduce中map和reduce的最大併發數量設定
reduce數量究竟多少是適合的。目前測試認為reduce數量約等於cluster中datanode的總cores的一半比較合適,比如cluster中有32臺datanode,每臺8 core,那麼reduce設定為128速度最快。因為每臺機器8 core,4個作m
MapReduce中計算Wordcount中map端及reduce端的設定
map端的設定: package wordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import or
mapreduce的join演算法程式設計案例
mapreduce程式設計案例 map端的join演算法 1、原理闡述 適用於關聯表中有小表的情形,可以將小表傳送到所有的map節點,這樣map節點就可以在本地對自己讀到的大表資料進行join並輸出最終結果,可以大大提高join操作的併發度,加快處理速度 2、例項: 兩表
MySQL中Join演算法實現原理通俗易懂
注意:MySQL 5.1.18之後的版本中才會體現出來 一:原理: 在MySQL 中,只有一種 Join 演算法,就是大名鼎鼎的 Nested Loop Join,他沒有其他很多資料庫所提供的 Hash Join,也沒有 Sort Merge Join。 1.Nested Loop
map端join的實現 ,用來解決小表中資料的讀取
通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作 1、需求: 訂單資料表t_order: id date pid amount 1001 20150710 P0001 2
MapReduce之join演算法案例實現
1、需求:訂單資料表t_order:id date pid amount 1001 20150710 P0001 2 1002 20150710 P0001 3 1002 20150710 P0002 3 商品資訊表t_productid pname
mapreduce——join演算法的程式碼實現
需求:有user資料檔案:user.csv u001,senge,18,angelababy u002,laozhao,48
DataStage中merge、lookup、join的區別與聯系
處理過程 key 占用內存 效率 功能 要求 過程 事實表 lookup 三者功能類似,都可以將表連接起來進行輸出。 區別主要體現在性能上。 lookup就是一個表在另一個表中找,處理過程都在內存進行,因此占用內存較多,一般大事實表和小緯表用這種方式關聯效率高。 merge
mapreduce中reduce中的叠代器只能調用一次!
new resultset row reducer style prot category nds 重復 親測,只能調用一次,如果想想在一次reduce重復使用叠代器中的數據,得先取出來放在list中然後在從list中取出來!!多次讀取reduce函數中叠代器的數據
map端join
path auth not config 單表 mapreduce == 書包 task package my.hadoop.hdfs.mapreduceJoin; import java.io.BufferedReader; import java.io.FileIn
Java 中Thread的sleep、join方法解析
開始 system sleep main gen 解析 等待時間 calling trace 1.Thread中sleep方法作用是使當前線程等待,其他線程開始執行,如果有線程鎖,sleep不會讓出鎖 沒有加鎖代碼如下: public class Synchronized
Case:update中把in改寫成join效能提高數倍
(1)優化前 如下一條SQL,把從1985-05-21入職前的員工薪資都增加500,執行約20.70 s, 從執行計劃中可以看出對錶salaries進行的是索引全掃描,掃描行數約260W行。 mysql> update salaries set sala
Spark專案實戰-資料傾斜解決方案之將reduce join轉換為map join
一、reduce端join操作原理 二、map端join操作原理 三、適用場景 如果兩個RDD要進行join,其中一個RDD是比較小的。一個RDD是100萬資料,一個RDD是1萬資料。(一個RDD是1億資料,一個RDD是100萬資料) 其中一個RDD必須是比較
大資料教程(9.6)map端join實現
上一篇文章講了mapreduce配合實現join,本節博主將講述在map端的join實現; 一、需求 &n