Hadoop應用——Reduce端Join操作
聯接
使用案例
Table EMP:
Name Sex Age DepNo
zhang male 20 1
li female 25 2
wang female 30 3
zhou male 35 2
Table DEP:
DepNo DepName
1 Sales
2 Dev
3 Mgt
reduce端聯接比map端聯接更普遍,因為輸入的資料不需要特定的結構;但是效率低,因為所有資料必須經過shuffle過程
基本思路:
1)Map端讀取所有檔案,並在輸出的內容里加上標識代表資料是從哪個檔案裡來的
2)在reduce處理函式裡,按照標識對資料進行儲存
3)然後根據key的join來求出結果直接輸出
定義EMP_DEP.java實體bean儲存表中資料
package com.join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class EMP_DEP implements WritableComparable{
private String name = "" ;
private String sex = "" ;
private int age = 0 ;
private int depNo = 0 ;
private String depName = "" ;
private String table = "" ;
public EMP_DEP(){
}
public EMP_DEP(EMP_DEP emp_dep){
this.name = emp_dep.getName() ;
this.sex = emp_dep.getSex() ;
this.age = emp_dep.getAge() ;
this .depNo = emp_dep.getDepNo() ;
this.depName = emp_dep.getDepName() ;
this.table = emp_dep.getTable() ;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getDepNo() {
return depNo;
}
public void setDepNo(int depNo) {
this.depNo = depNo;
}
public String getDepName() {
return depName;
}
public void setDepName(String depName) {
this.depName = depName;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF() ;
this.sex = in.readUTF() ;
this.age = in.readInt() ;
this.depNo = in.readInt() ;
this.depName = in.readUTF() ;
this.table = in.readUTF() ;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name) ;
out.writeUTF(sex) ;
out.writeInt(age) ;
out.writeInt(depNo) ;
out.writeUTF(depName) ;
out.writeUTF(table) ;
}
@Override
public int compareTo(Object o) {
return 0;
}
public String toString(){
return name + " " + sex + " " + age + " " + depName ;
}
}
定義Mapper函式
package com.join;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ReduceSideMapper extends Mapper<LongWritable, Text, IntWritable, EMP_DEP> {
private EMP_DEP emp_dep = new EMP_DEP() ;
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] values = value.toString().split("\\s+") ;
if(values.length == 4 ){
emp_dep.setName(values[0]) ;
emp_dep.setSex(values[1]) ;
emp_dep.setAge(Integer.valueOf(values[2])) ;
emp_dep.setDepNo(Integer.valueOf(values[3])) ;
emp_dep.setTable("EMP") ;
context.write(new IntWritable(Integer.valueOf(values[3])), emp_dep) ;
}
if(values.length ==2 ){
emp_dep.setDepNo(Integer.valueOf(values[0])) ;
emp_dep.setDepName(values[1]) ;
emp_dep.setTable("DEP") ;
context.write(new IntWritable(Integer.valueOf(values[0])), emp_dep) ;
}
}
}
定義reducer函式
package com.join;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class ReduceSideReducer extends
Reducer<IntWritable, EMP_DEP, NullWritable, EMP_DEP> {
@Override
protected void reduce(IntWritable key, Iterable<EMP_DEP> value,Context context)
throws IOException, InterruptedException {
String depName = "" ;
List<EMP_DEP> list = new LinkedList<EMP_DEP>() ;
for(EMP_DEP val : value){
//在DEP表中,depNo是主鍵,所以只會存在一個
if(val.getTable().equals("DEP")){
depName = val.getDepName() ;
}else{
list.add(new EMP_DEP(val)) ;
}
}
for(EMP_DEP v : list){
if(v.getTable().equals("EMP")){
v.setDepName(depName) ;
context.write(NullWritable.get(), v) ;
}
}
}
}
定義TestReduceSideJoin
package com.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestReduceSideJoin {
public static void main(String args[]) throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Reduce side join");
job.setJarByClass(TestReduceSideJoin.class);
job.setMapperClass(ReduceSideMapper.class);
job.setReducerClass(ReduceSideReducer.class);
job.setMapOutputKeyClass(IntWritable.class) ;
job.setMapOutputValueClass(EMP_DEP.class) ;
job.setOutputKeyClass(NullWritable.class) ;
job.setOutputValueClass(EMP_DEP.class) ;
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
相關推薦
Hadoop應用——Reduce端Join操作
聯接 使用案例 Table EMP: Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Ta
hadoop streaming reduce端join的python兩種實現方式
實現student和course資料表的join操作,以學生編號(sno)為連線欄位 測試資料 student.txt檔案 #以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jac
MapReduce表連線操作之Reduce端join
一:背景 Reduce端連線比Map端連線更為普遍,因為輸入的資料不需要特定的結構,但是效率比較低,因為所有資料都必須經過Shuffle過程。 二:技術實現 基本思路 (1):Map端讀取所有的檔案,並在輸出的內容里加上標示,代表資料是從哪個檔案裡來的。 (2):在red
Hadoop基礎-MapReduce的Join操作
否則 mapred HA 原創 -m mapr red 轉載 hadoop基礎 Hadoop基礎-MapReduce的Join操作 作者:尹正傑 版權聲明:原創作品,
Reduce端join演算法實現 - (訂單跟商品)
程式碼地址: https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/rjon 現在有兩張表 1.訂單表 2.商品表 訂單資料表t_order: id
hadoop streaming map端join
測試資料 # lixiang_list.txt(小表,可以在map端載入到記憶體中) #立項ID 立項名稱 1800 心願券測試003 1801 fw心願單 1802 wtest心願券0524 1803
Hadoop中reduce端shuffle過程及原始碼解析
一、概要描述 在Child的main函式中通過TaskUmbilicalProtocol協議,從TaskTracker獲得需要執行的Task,並呼叫Task的run方法來執行。在ReduceTask而Task的run方法會通過java反射機制構造Reducer
MapReduce中的join演算法-reduce端join
在海量資料的環境下,不可避免的會碰到join需求, 例如在資料分析時需要連線從不同的資料來源中獲取到資料。 假設有兩個資料集:氣象站資料庫和天氣記錄資料庫,並考慮如何合二為一。 一個典型的查詢是:輸出氣象站的歷史資訊,同時各行記錄也包含氣象站的元資料資訊。 氣象站
python之sorted、map、reduce、join、split函式的例項操作
sorted 資料如下: key為選擇需要排序的元素;reverse為True,表示逆序排序。 reverse為False,表示順序排序。 map 資料如下。 按lambda表示式操作。 reduce 按lambda表示式操作
Hadoop 學習研究(五): hadoop中的join操作
Hadoop中的Join操作: 考慮如下問題: 假設有兩個資料集:一個是城市名稱編號,一個是日期和產出,考慮如何將這兩個不同的資料集合二為一。或者有如下需求:獲取某個城市在指定年份的產出等等問題。 需
map端join
path auth not config 單表 mapreduce == 書包 task package my.hadoop.hdfs.mapreduceJoin; import java.io.BufferedReader; import java.io.FileIn
hadoop 2.7.3基本操作
dir 不出 管理 查看 運行 oca 好的 nbsp 資源管理 ./bin/hdfs dfs -mkdir -p input 新建文件夾 YARN 有個好處是可以通過 Web 界面查看任務的運行情況:http://localhost:8088/cluster 但 Y
MySQL left join操作中 on與where放置條件的區別
合成 可見 找到 需要 兩張 oca aaa rip 多個 優先級 兩者放置相同條件,之所以可能會導致結果集不同,就是因為優先級。on的優先級是高於where的。 1 1 首先明確兩個概念: LEFT JOIN 關鍵字會從左表 (table_name1) 那裏返回
一步一步跟我學習hadoop(5)----hadoop Map/Reduce教程(2)
submit calc run submitjob des conf sam ner 打開 Map/Reduce用戶界面 本節為用戶採用框架要面對的各個環節提供了具體的描寫敘述,旨在與幫助用戶對實現、配置和調優進行具體的設置。然而,開發時候還是要相應著API進行
scala 高級十六 scala 集合和集合的高級特性 map flatten fllatmap zip reduce zip 等操作
高級 類型 strong nbsp println 參數 highlight 匿名 pri 1. scala 的列表List 和集Set 的操作 //Set 和list 差不多,不過 Set 中不允許有重復的元素 var set=scala.collectio
Hadoop之HDFS文件操作
文件操作命令 help 文件夾 利用 jpg 查看 作文 rgs fill 摘要:Hadoop之HDFS文件操作常有兩種方式。命令行方式和JavaAPI方式。本文介紹怎樣利用這兩種方式對HDFS文件進行操作。 關鍵詞:HDFS文件 命令行
hadoop應用場景
本地 計算 本地緩存 Lucene 智能 場景 搜索 學習 基於 大數據量存儲:分布式存儲 日誌處理: Hadoop擅長這個 海量計算: 並行計算 ETL:數據抽取到oracle、mysql、DB2、mongdb及主流數據庫 使用HBase做數據分析: 用擴展性應對大量
MapReduce框架Hadoop應用(一)
atan 查看 應該 節點 抽象 資源 log ack 任務 Google對其的定義:MapReduce是一種變成模型,用於大規模數據集(以T為級別的數據)的並行運算。用戶定義一個map函數來處理一批Key-Value對以生成另一批中間的Key-Value對,再定義一個
教你如何實現微信小程序與.net core應用服務端的無狀態身份驗證
做的 動圖 ef7 服務端 apt 是我 分布 .net service 隨著.net core2的發布,越來越多人使用.net core2開發各種應用服務端,下面我就結合自己最近開發的一款小程序,給大家分享下,怎麽使用小程序登錄後,小程序與服務端交互的權限控制。
Hive---join操作
http gpo bsp body outer blog www com size HIVE中join、semi join、outer join舉例詳解 加油 舉例子: hive> select * from zz0; 111111 222222 888888 hiv