1. 程式人生 > 其它 >Mapreduce例項——Reduce端join

Mapreduce例項——Reduce端join

現有某電商網站兩張資訊表,分別為訂單表orders1和訂單明細表order_items1,orders1表記錄了使用者購買商品的下單日期以及訂單編號,order_items1表記錄了商品id,訂單id以及明細id,它們的表結構以及關係如下圖所示

兩表的資料內容如下:

訂單ID    訂單號            使用者ID    下單日期
52304    111215052630    176474    2011-12-15 04:58:21
52303    111215052629    178350    2011-12-15 04:45:31
52302    111215052628    172296    2011-12-15 03:12:23
52301    111215052627    178348    2011-12-15 02:37:32
52300    111215052626    174893    2011-12-15 02:18:56
52299    111215052625    169471    2011-12-15 01:33:46
52298    111215052624    178345    2011-12-15 01:04:41
52297    111215052623    176369    2011-12-15 01:02:20
52296    111215052622    178343    2011-12-15 00:38:02
52295    111215052621    178342    2011-12-15 00:18:43
52294    111215052620    178341    2011-12-15 00:14:37
52293    111215052619    178338    2011-12-15 00:13:07
orders1
明細ID    訂單ID    商品ID
252578    52293    1016840
252579    52293    1014040
252580    52294    1014200
252581    52294    1001012
252582    52294    1022245
252583    52294    1014724
252584    52294    1010731
252586    52295    1023399
252587    52295    1016840
252592    52296    1021134
252593    52296    1021133
252585    52295    1021840
252588    52295    1014040
252589    52296    1014040
252590    52296    1019043
order_items1

要求查詢在2011-12-15日該電商都有哪些使用者購買了什麼商品:

package mapreduce6;

import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //06.Mapreduce例項——Reduce端join public class ReduceJoin { public static class mymapper extends Mapper<Object, Text, Text, Text>{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String filePath = ((FileSplit)context.getInputSplit()).getPath().toString(); if (filePath.contains("orders")) { String line = value.toString(); String[] arr = line.split("\t"); context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3])); //System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]); }else if(filePath.contains("order_items1")) { String line = value.toString(); String[] arr = line.split("\t"); context.write(new Text(arr[1]), new Text("2+" + arr[2])); //System.out.println(arr[1] + "_2+" + arr[2]); } } } public static class myreducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Vector<String> left = new Vector<String>(); Vector<String> right = new Vector<String>(); for (Text val : values) { String str = val.toString(); if (str.startsWith("1+")) { left.add(str.substring(2)); } else if (str.startsWith("2+")) { right.add(str.substring(2)); } } int sizeL = left.size(); int sizeR = right.size(); //System.out.println(key + "left:"+left); //System.out.println(key + "right:"+right); for (int i = 0; i < sizeL; i++) { for (int j = 0; j < sizeR; j++) { context.write( key, new Text( left.get(i) + "\t" + right.get(j) ) ); //System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j)); } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); job.setJobName("reducejoin"); job.setJarByClass(ReduceJoin.class); job.setMapperClass(mymapper.class); job.setReducerClass(myreducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path left = new Path("hdfs://192.168.51.100:8020/mymapreduce6/in/orders"); Path right = new Path("hdfs://192.168.51.100:8020/mymapreduce6/in/order_items1"); Path out = new Path("hdfs://192.168.51.100:8020/mymapreduce6/out"); FileInputFormat.addInputPath(job, left); FileInputFormat.addInputPath(job, right); FileOutputFormat.setOutputPath(job, out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

統計結果:

原理:

在Reudce端進行Join連線是MapReduce框架進行表之間Join操作最為常見的模式。

1.Reduce端Join實現原理

(1)Map端的主要工作,為來自不同表(檔案)的key/value對打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。

(2)Reduce端的主要工作,在Reduce端以連線欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同檔案的記錄(在map階段已經打標誌)分開,最後進行笛卡爾只就ok了。

2.Reduce端Join的使用場景

Reduce端連線比Map端連線更為普遍,因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中,但是Reduce端連線效率比較低,因為所有資料都必須經過Shuffle過程。

3.本實驗的Reduce端Join程式碼執行流程:

(1)Map端讀取所有的檔案,並在輸出的內容里加上標識,代表資料是從哪個檔案裡來的。

(2)在Reduce處理函式中,按照標識對資料進行處理。

(3)然後將相同的key值進行Join連線操作,求出結果並直接輸出。