1. 程式人生 > >Hadoop應用——Reduce端Join操作

Hadoop應用——Reduce端Join操作

聯接
使用案例
Table EMP:

Name    Sex Age DepNo
zhang   male    20  1
li  female  25  2
wang    female  30  3
zhou    male    35  2

Table DEP:

DepNo   DepName
1   Sales
2   Dev
3   Mgt

reduce端聯接比map端聯接更普遍,因為輸入的資料不需要特定的結構;但是效率低,因為所有資料必須經過shuffle過程
基本思路:
1)Map端讀取所有檔案,並在輸出的內容里加上標識代表資料是從哪個檔案裡來的
2)在reduce處理函式裡,按照標識對資料進行儲存
3)然後根據key的join來求出結果直接輸出

定義EMP_DEP.java實體bean儲存表中資料

package com.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class EMP_DEP implements WritableComparable{
    private String name  = "" ;
    private String sex = "" ;
    private
int age = 0 ; private int depNo = 0 ; private String depName = "" ; private String table = "" ; public EMP_DEP(){ } public EMP_DEP(EMP_DEP emp_dep){ this.name = emp_dep.getName() ; this.sex = emp_dep.getSex() ; this.age = emp_dep.getAge() ; this
.depNo = emp_dep.getDepNo() ; this.depName = emp_dep.getDepName() ; this.table = emp_dep.getTable() ; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int getDepNo() { return depNo; } public void setDepNo(int depNo) { this.depNo = depNo; } public String getDepName() { return depName; } public void setDepName(String depName) { this.depName = depName; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } @Override public void readFields(DataInput in) throws IOException { this.name = in.readUTF() ; this.sex = in.readUTF() ; this.age = in.readInt() ; this.depNo = in.readInt() ; this.depName = in.readUTF() ; this.table = in.readUTF() ; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name) ; out.writeUTF(sex) ; out.writeInt(age) ; out.writeInt(depNo) ; out.writeUTF(depName) ; out.writeUTF(table) ; } @Override public int compareTo(Object o) { return 0; } public String toString(){ return name + " " + sex + " " + age + " " + depName ; } }

定義Mapper函式

package com.join;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ReduceSideMapper extends Mapper<LongWritable, Text, IntWritable, EMP_DEP> {

    private EMP_DEP emp_dep = new EMP_DEP() ;
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String[] values = value.toString().split("\\s+") ;
        if(values.length == 4 ){
            emp_dep.setName(values[0]) ;
            emp_dep.setSex(values[1]) ;
            emp_dep.setAge(Integer.valueOf(values[2])) ;
            emp_dep.setDepNo(Integer.valueOf(values[3])) ;
            emp_dep.setTable("EMP") ;
            context.write(new IntWritable(Integer.valueOf(values[3])), emp_dep) ;
        } 
        if(values.length ==2 ){
            emp_dep.setDepNo(Integer.valueOf(values[0])) ;
            emp_dep.setDepName(values[1]) ;
            emp_dep.setTable("DEP") ;
            context.write(new IntWritable(Integer.valueOf(values[0])), emp_dep) ;
        }
    }

}

定義reducer函式

package com.join;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class ReduceSideReducer extends
        Reducer<IntWritable, EMP_DEP, NullWritable, EMP_DEP> {

    @Override
    protected void reduce(IntWritable key, Iterable<EMP_DEP> value,Context context)
            throws IOException, InterruptedException {
        String depName = "" ;
        List<EMP_DEP> list = new LinkedList<EMP_DEP>() ;
        for(EMP_DEP val : value){
            //在DEP表中,depNo是主鍵,所以只會存在一個
            if(val.getTable().equals("DEP")){
                depName = val.getDepName() ;
            }else{
                list.add(new EMP_DEP(val)) ;
            }
        }

        for(EMP_DEP v : list){
            if(v.getTable().equals("EMP")){
                v.setDepName(depName) ;
                context.write(NullWritable.get(), v) ;
            }

        }
    }

}

定義TestReduceSideJoin

package com.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class TestReduceSideJoin {
    public static void main(String args[]) throws Exception{
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }

        Job job = new Job(conf, "Reduce side join");
        job.setJarByClass(TestReduceSideJoin.class);
        job.setMapperClass(ReduceSideMapper.class);
        job.setReducerClass(ReduceSideReducer.class);

        job.setMapOutputKeyClass(IntWritable.class) ;
        job.setMapOutputValueClass(EMP_DEP.class) ;

        job.setOutputKeyClass(NullWritable.class) ;
        job.setOutputValueClass(EMP_DEP.class) ;


        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

相關推薦

Hadoop應用——ReduceJoin操作

聯接 使用案例 Table EMP: Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Ta

hadoop streaming reducejoin的python兩種實現方式

實現student和course資料表的join操作,以學生編號(sno)為連線欄位 測試資料 student.txt檔案 #以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jac

MapReduce表連線操作Reducejoin

一:背景 Reduce端連線比Map端連線更為普遍,因為輸入的資料不需要特定的結構,但是效率比較低,因為所有資料都必須經過Shuffle過程。 二:技術實現 基本思路 (1):Map端讀取所有的檔案,並在輸出的內容里加上標示,代表資料是從哪個檔案裡來的。 (2):在red

Hadoop基礎-MapReduce的Join操作

否則 mapred HA 原創 -m mapr red 轉載 hadoop基礎                   Hadoop基礎-MapReduce的Join操作                                     作者:尹正傑 版權聲明:原創作品,

Reducejoin演算法實現 - (訂單跟商品)

程式碼地址: https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/rjon 現在有兩張表 1.訂單表 2.商品表 訂單資料表t_order: id

hadoop streaming mapjoin

測試資料 # lixiang_list.txt(小表,可以在map端載入到記憶體中) #立項ID 立項名稱 1800 心願券測試003 1801 fw心願單 1802 wtest心願券0524 1803

Hadoopreduceshuffle過程及原始碼解析

一、概要描述 在Child的main函式中通過TaskUmbilicalProtocol協議,從TaskTracker獲得需要執行的Task,並呼叫Task的run方法來執行。在ReduceTask而Task的run方法會通過java反射機制構造Reducer

MapReduce中的join演算法-reducejoin

   在海量資料的環境下,不可避免的會碰到join需求, 例如在資料分析時需要連線從不同的資料來源中獲取到資料。 假設有兩個資料集:氣象站資料庫和天氣記錄資料庫,並考慮如何合二為一。 一個典型的查詢是:輸出氣象站的歷史資訊,同時各行記錄也包含氣象站的元資料資訊。 氣象站

python之sorted、map、reducejoin、split函式的例項操作

sorted 資料如下: key為選擇需要排序的元素;reverse為True,表示逆序排序。 reverse為False,表示順序排序。 map 資料如下。 按lambda表示式操作。 reduce 按lambda表示式操作

Hadoop 學習研究(五): hadoop中的join操作

Hadoop中的Join操作: 考慮如下問題: 假設有兩個資料集:一個是城市名稱編號,一個是日期和產出,考慮如何將這兩個不同的資料集合二為一。或者有如下需求:獲取某個城市在指定年份的產出等等問題。 需

mapjoin

path auth not config 單表 mapreduce == 書包 task package my.hadoop.hdfs.mapreduceJoin; import java.io.BufferedReader; import java.io.FileIn

hadoop 2.7.3基本操作

dir 不出 管理 查看 運行 oca 好的 nbsp 資源管理 ./bin/hdfs dfs -mkdir -p input 新建文件夾 YARN 有個好處是可以通過 Web 界面查看任務的運行情況:http://localhost:8088/cluster 但 Y

MySQL left join操作中 on與where放置條件的區別

合成 可見 找到 需要 兩張 oca aaa rip 多個 優先級 兩者放置相同條件,之所以可能會導致結果集不同,就是因為優先級。on的優先級是高於where的。 1 1 首先明確兩個概念: LEFT JOIN 關鍵字會從左表 (table_name1) 那裏返回

一步一步跟我學習hadoop(5)----hadoop Map/Reduce教程(2)

submit calc run submitjob des conf sam ner 打開 Map/Reduce用戶界面 本節為用戶採用框架要面對的各個環節提供了具體的描寫敘述,旨在與幫助用戶對實現、配置和調優進行具體的設置。然而,開發時候還是要相應著API進行

scala 高級十六 scala 集合和集合的高級特性 map flatten fllatmap zip reduce zip 等操作

高級 類型 strong nbsp println 參數 highlight 匿名 pri 1. scala 的列表List 和集Set 的操作 //Set 和list 差不多,不過 Set 中不允許有重復的元素 var set=scala.collectio

Hadoop之HDFS文件操作

文件操作命令 help 文件夾 利用 jpg 查看 作文 rgs fill 摘要:Hadoop之HDFS文件操作常有兩種方式。命令行方式和JavaAPI方式。本文介紹怎樣利用這兩種方式對HDFS文件進行操作。 關鍵詞:HDFS文件 命令行

hadoop應用場景

本地 計算 本地緩存 Lucene 智能 場景 搜索 學習 基於 大數據量存儲:分布式存儲 日誌處理: Hadoop擅長這個 海量計算: 並行計算 ETL:數據抽取到oracle、mysql、DB2、mongdb及主流數據庫 使用HBase做數據分析: 用擴展性應對大量

MapReduce框架Hadoop應用(一)

atan 查看 應該 節點 抽象 資源 log ack 任務   Google對其的定義:MapReduce是一種變成模型,用於大規模數據集(以T為級別的數據)的並行運算。用戶定義一個map函數來處理一批Key-Value對以生成另一批中間的Key-Value對,再定義一個

教你如何實現微信小程序與.net core應用服務的無狀態身份驗證

做的 動圖 ef7 服務端 apt 是我 分布 .net service 隨著.net core2的發布,越來越多人使用.net core2開發各種應用服務端,下面我就結合自己最近開發的一款小程序,給大家分享下,怎麽使用小程序登錄後,小程序與服務端交互的權限控制。

Hive---join操作

http gpo bsp body outer blog www com size HIVE中join、semi join、outer join舉例詳解 加油 舉例子: hive> select * from zz0; 111111 222222 888888 hiv