1. 程式人生 > >MapReduce實現join操作

MapReduce實現join操作

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * MapReduce實現Join操作
 */
public class MapRedJoin {
	public static final String DELIMITER = "\u0009"; // 欄位分隔符
	
	// map過程
	public static class MapClass extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, Text> {
						
		public void configure(JobConf job) {
			super.configure(job);
		}
		
		public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
				Reporter reporter) throws IOException, ClassCastException {
			// 獲取輸入檔案的全路徑和名稱
			String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
			// 獲取記錄字串
			String line = value.toString();
			// 拋棄空記錄
			if (line == null || line.equals("")) return; 
			
			// 處理來自表A的記錄
			if (filePath.contains("m_ys_lab_jointest_a")) {
				String[] values = line.split(DELIMITER); // 按分隔符分割出欄位
				if (values.length < 2) return;
				
				String id = values[0]; // id
				String name = values[1]; // name
				
				output.collect(new Text(id), new Text("a#"+name));
			}
			// 處理來自表B的記錄
			else if (filePath.contains("m_ys_lab_jointest_b")) {
				String[] values = line.split(DELIMITER); // 按分隔符分割出欄位
				if (values.length < 3) return;
				
				String id = values[0]; // id
				String statyear = values[1]; // statyear
				String num = values[2]; //num
				
				output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
			}
		}
	}
	
	// reduce過程
	public static class Reduce extends MapReduceBase
			implements Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
					
			Vector<String> vecA = new Vector<String>(); // 存放來自表A的值
			Vector<String> vecB = new Vector<String>(); // 存放來自表B的值
			
			while (values.hasNext()) {
				String value = values.next().toString();
				if (value.startsWith("a#")) {
					vecA.add(value.substring(2));
				} else if (value.startsWith("b#")) {
					vecB.add(value.substring(2));
				}
			}
			
			int sizeA = vecA.size();
			int sizeB = vecB.size();
			
			// 遍歷兩個向量
			int i, j;
			for (i = 0; i < sizeA; i ++) {
				for (j = 0; j < sizeB; j ++) {
					output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
				}
			}	
		}
	}
	
	protected void configJob(JobConf conf) {
		conf.setMapOutputKeyClass(Text.class);
		conf.setMapOutputValueClass(Text.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
		conf.setOutputFormat(ReportOutFormat.class);
	}
}

相關推薦

MapReduce實現join操作

import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Vector; import org.apache.hadoop.io.LongWritable; import

案例-使用MapReduce實現join操作

哈嘍~各位小夥伴們中秋快樂,好久沒更新新的文章啦,今天分享如何使用mapreduce進行join操作。 在離線計算中,我們常常不只是會對單一一個檔案進行操作,進行需要進行兩個或多個檔案關聯出更多資料,類似與sql中的join操作。 今天就跟大家分享一下如何在MapReduce中實現join操作 需求 現有

Hadoop基礎-MapReduceJoin操作

否則 mapred HA 原創 -m mapr red 轉載 hadoop基礎                   Hadoop基礎-MapReduce的Join操作                                     作者:尹正傑 版權聲明:原創作品,

MapReduce實現join

在我們平常的大資料專案開發和專案需求中,可能需要我們完成在關係型資料庫中十分常見的join類功能。那麼針對這種型別的功能需求,用hadoop中的MapReduce模型應該要怎麼實現呢?本篇文章將針對這種功能需求提供幾種實現選擇。 首先,我的開發環境為:jdk1

MapReduce實現基本SQL操作的原理-join和group by,以及Dinstinct

詳細講解SQL編譯為MapReduce之前,我們先來看看MapReduce框架實現SQL基本操作的原理 Join的實現原理 select u.name, o.orderid from order o join user u on o.uid = u.uid; 在map

使用MapReduce實現兩個文件的Join操作

ash 鍵值 turn @param nts n) extend cache inter 數據結構 customer表 1 hanmeimei ShangHai 110 2

Hadoop_21_編寫MapReduce程序實現Join功能

持久化 tle 格式 AD style tro 消息 clas HA 1.序列化與Writable接口 1.1.hadoop的序列化格式   序列化和反序列化就是結構化對象和字節流之間的轉換,主要用在內部進程的通訊和持久化存儲方面   hadoop在節點間的內部通訊使用的是

MapReduce實現兩表的Join--原理及python和java程式碼實現

用Hive一句話搞定的,但是有時必須要用mapreduce 方法介紹 1. 概述 在傳統資料庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由於Hadoop的獨特設計思想,當進行JOIN操作時,有一

Java MapReduce 基本計算操作實現實戰

package com.sct.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;

基於spark實現表的join操作

1. 自連線 假設存在如下檔案: [root@bluejoe0 ~]# cat categories.csv 1,生活用品,0 2,數碼用品,1 3,手機,2 4,華為Mate7,3 每一行的格式為:類別ID,類別名稱,父類ID 現在欲輸出每個類別

MapReduce表連線操作之Reduce端join

一:背景 Reduce端連線比Map端連線更為普遍,因為輸入的資料不需要特定的結構,但是效率比較低,因為所有資料都必須經過Shuffle過程。 二:技術實現 基本思路 (1):Map端讀取所有的檔案,並在輸出的內容里加上標示,代表資料是從哪個檔案裡來的。 (2):在red

MapReduce的兩表join操作優化

注:優化前的分析過程詳見本博的上篇博文 案例 地址(Address)和人員(Person)的一對多關聯 原始資料 地址(Address)資料 id AddreName 1 beijing 2 shanghai 3 guangzhou 人員(Person)資料 1 zhan

MapReducejoin演算法案例實現

1、需求:訂單資料表t_order:id date pid amount 1001 20150710 P0001 2 1002 20150710 P0001 3 1002 20150710 P0002 3 商品資訊表t_productid pname

關於MapReduce join操作

使用者表:ID+name+sex 使用者行為表:ID+City+action+notes Join完成後的形式:ID+name+sex+city+action+notes package com.qst.DateJoin; import ja

Hadoop中MapReduce多種join實現例項分析

感謝分享:http://database.51cto.com/art/201410/454277.htm 1、在Reudce端進行連線。 在Reudce端進行連線是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下: Map端的主要工作:為來自

hadoop streaming兩個資料檔案實現join合併操作

hadoop做資料處理,大都是對集合進行操作,因此將資料檔案與另一個數據檔案進行join的操作需求非常常見。 有很多人詢問,下面將彙總一個例子讓入門的朋友掌握編寫方法: [hdfs@server1]$ more clean_item_new 100002303,3368 1

Spark利用Broadcast實現Join避免Shuffle操作

在Spark中, 諸如ReduceByKey,GroupByKey等操作會觸發Shuffle, 影響效能。 本文提供了一種利用廣播

Fp關聯規則算法計算置信度及MapReduce實現思路

i++ htm [] blank none reat 頻繁項集 可能 term 說明:參考Mahout FP算法相關相關源代碼。算法project能夠在FP關聯規則計算置信度下載:(僅僅是單機版的實現,並沒有MapReduce的代碼)使用FP關聯規則算法計算置信度基於以下

設計模式-備忘錄模式實現悔棋操作

exc turn color new label isp lis args set 利用設計模式中的備忘錄模式實現多步悔棋的操作 1 import java.util.*; 2 class Chessman { 3 private String lab

MySQL left join操作中 on與where放置條件的區別

合成 可見 找到 需要 兩張 oca aaa rip 多個 優先級 兩者放置相同條件,之所以可能會導致結果集不同,就是因為優先級。on的優先級是高於where的。 1 1 首先明確兩個概念: LEFT JOIN 關鍵字會從左表 (table_name1) 那裏返回