MapReduce實現join操作
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Vector; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; /** * MapReduce實現Join操作 */ public class MapRedJoin { public static final String DELIMITER = "\u0009"; // 欄位分隔符 // map過程 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { public void configure(JobConf job) { super.configure(job); } public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, ClassCastException { // 獲取輸入檔案的全路徑和名稱 String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString(); // 獲取記錄字串 String line = value.toString(); // 拋棄空記錄 if (line == null || line.equals("")) return; // 處理來自表A的記錄 if (filePath.contains("m_ys_lab_jointest_a")) { String[] values = line.split(DELIMITER); // 按分隔符分割出欄位 if (values.length < 2) return; String id = values[0]; // id String name = values[1]; // name output.collect(new Text(id), new Text("a#"+name)); } // 處理來自表B的記錄 else if (filePath.contains("m_ys_lab_jointest_b")) { String[] values = line.split(DELIMITER); // 按分隔符分割出欄位 if (values.length < 3) return; String id = values[0]; // id String statyear = values[1]; // statyear String num = values[2]; //num output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num)); } } } // reduce過程 public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { Vector<String> vecA = new Vector<String>(); // 存放來自表A的值 Vector<String> vecB = new Vector<String>(); // 存放來自表B的值 while (values.hasNext()) { String value = values.next().toString(); if (value.startsWith("a#")) { vecA.add(value.substring(2)); } else if (value.startsWith("b#")) { vecB.add(value.substring(2)); } } int sizeA = vecA.size(); int sizeB = vecB.size(); // 遍歷兩個向量 int i, j; for (i = 0; i < sizeA; i ++) { for (j = 0; j < sizeB; j ++) { output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j))); } } } } protected void configJob(JobConf conf) { conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setOutputFormat(ReportOutFormat.class); } }
相關推薦
MapReduce實現join操作
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Vector; import org.apache.hadoop.io.LongWritable; import
案例-使用MapReduce實現join操作
哈嘍~各位小夥伴們中秋快樂,好久沒更新新的文章啦,今天分享如何使用mapreduce進行join操作。 在離線計算中,我們常常不只是會對單一一個檔案進行操作,進行需要進行兩個或多個檔案關聯出更多資料,類似與sql中的join操作。 今天就跟大家分享一下如何在MapReduce中實現join操作 需求 現有
Hadoop基礎-MapReduce的Join操作
否則 mapred HA 原創 -m mapr red 轉載 hadoop基礎 Hadoop基礎-MapReduce的Join操作 作者:尹正傑 版權聲明:原創作品,
MapReduce實現join
在我們平常的大資料專案開發和專案需求中,可能需要我們完成在關係型資料庫中十分常見的join類功能。那麼針對這種型別的功能需求,用hadoop中的MapReduce模型應該要怎麼實現呢?本篇文章將針對這種功能需求提供幾種實現選擇。 首先,我的開發環境為:jdk1
MapReduce實現基本SQL操作的原理-join和group by,以及Dinstinct
詳細講解SQL編譯為MapReduce之前,我們先來看看MapReduce框架實現SQL基本操作的原理 Join的實現原理 select u.name, o.orderid from order o join user u on o.uid = u.uid; 在map
使用MapReduce實現兩個文件的Join操作
ash 鍵值 turn @param nts n) extend cache inter 數據結構 customer表 1 hanmeimei ShangHai 110 2
Hadoop_21_編寫MapReduce程序實現Join功能
持久化 tle 格式 AD style tro 消息 clas HA 1.序列化與Writable接口 1.1.hadoop的序列化格式 序列化和反序列化就是結構化對象和字節流之間的轉換,主要用在內部進程的通訊和持久化存儲方面 hadoop在節點間的內部通訊使用的是
MapReduce實現兩表的Join--原理及python和java程式碼實現
用Hive一句話搞定的,但是有時必須要用mapreduce 方法介紹 1. 概述 在傳統資料庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由於Hadoop的獨特設計思想,當進行JOIN操作時,有一
Java MapReduce 基本計算操作實現實戰
package com.sct.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;
基於spark實現表的join操作
1. 自連線 假設存在如下檔案: [root@bluejoe0 ~]# cat categories.csv 1,生活用品,0 2,數碼用品,1 3,手機,2 4,華為Mate7,3 每一行的格式為:類別ID,類別名稱,父類ID 現在欲輸出每個類別
MapReduce表連線操作之Reduce端join
一:背景 Reduce端連線比Map端連線更為普遍,因為輸入的資料不需要特定的結構,但是效率比較低,因為所有資料都必須經過Shuffle過程。 二:技術實現 基本思路 (1):Map端讀取所有的檔案,並在輸出的內容里加上標示,代表資料是從哪個檔案裡來的。 (2):在red
MapReduce的兩表join操作優化
注:優化前的分析過程詳見本博的上篇博文 案例 地址(Address)和人員(Person)的一對多關聯 原始資料 地址(Address)資料 id AddreName 1 beijing 2 shanghai 3 guangzhou 人員(Person)資料 1 zhan
MapReduce之join演算法案例實現
1、需求:訂單資料表t_order:id date pid amount 1001 20150710 P0001 2 1002 20150710 P0001 3 1002 20150710 P0002 3 商品資訊表t_productid pname
關於MapReduce join操作
使用者表:ID+name+sex 使用者行為表:ID+City+action+notes Join完成後的形式:ID+name+sex+city+action+notes package com.qst.DateJoin; import ja
Hadoop中MapReduce多種join實現例項分析
感謝分享:http://database.51cto.com/art/201410/454277.htm 1、在Reudce端進行連線。 在Reudce端進行連線是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下: Map端的主要工作:為來自
hadoop streaming兩個資料檔案實現join合併操作
hadoop做資料處理,大都是對集合進行操作,因此將資料檔案與另一個數據檔案進行join的操作需求非常常見。 有很多人詢問,下面將彙總一個例子讓入門的朋友掌握編寫方法: [hdfs@server1]$ more clean_item_new 100002303,3368 1
Spark利用Broadcast實現Join避免Shuffle操作
在Spark中, 諸如ReduceByKey,GroupByKey等操作會觸發Shuffle, 影響效能。 本文提供了一種利用廣播
Fp關聯規則算法計算置信度及MapReduce實現思路
i++ htm [] blank none reat 頻繁項集 可能 term 說明:參考Mahout FP算法相關相關源代碼。算法project能夠在FP關聯規則計算置信度下載:(僅僅是單機版的實現,並沒有MapReduce的代碼)使用FP關聯規則算法計算置信度基於以下
設計模式-備忘錄模式實現悔棋操作
exc turn color new label isp lis args set 利用設計模式中的備忘錄模式實現多步悔棋的操作 1 import java.util.*; 2 class Chessman { 3 private String lab
MySQL left join操作中 on與where放置條件的區別
合成 可見 找到 需要 兩張 oca aaa rip 多個 優先級 兩者放置相同條件,之所以可能會導致結果集不同,就是因為優先級。on的優先級是高於where的。 1 1 首先明確兩個概念: LEFT JOIN 關鍵字會從左表 (table_name1) 那裏返回