1. 程式人生 > 其它 >06.Mapreduce例項——Reduce端join

06.Mapreduce例項——Reduce端join

06Mapreduce例項——Reducejoin

實驗原理

Reudce端進行Join連線是MapReduce框架進行表之間Join操作最為常見的模式。

1.ReduceJoin實現原理

1Map端的主要工作,為來自不同表(檔案)的key/value對打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。

2Reduce端的主要工作,在Reduce端以連線欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同檔案的記錄(在map階段已經打標誌)分開,最後進行笛卡爾只就ok了。

2.ReduceJoin的使用場景

Reduce端連線比Map端連線更為普遍,因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中,但是Reduce端連線效率比較低,因為所有資料都必須經過Shuffle過程。

3.本實驗的ReduceJoin程式碼執行流程:

1Map端讀取所有的檔案,並在輸出的內容里加上標識,代表資料是從哪個檔案裡來的。

2)在Reduce處理函式中,按照標識對資料進行處理。

3)然後將相同的key值進行Join連線操作,求出結果並直接輸出。

實驗步驟

1. 建兩個文字文件,用逗號分隔開,資料如下

orders1

訂單ID訂單號使用者ID下單日期

523041112150526301764742011-12-1504:58:21

523031112150526291783502011-12-1504:45:31

523021112150526281722962011-12-1503:12:23

523011112150526271783482011-12-1502:37:32

523001112150526261748932011-12-1502:18:56

522991112150526251694712011-12-1501:33:46

522981112150526241783452011-12-1501:04:41

522971112150526231763692011-12-1501:02:20

522961112150526221783432011-12-1500:38:02

522951112150526211783422011-12-1500:18:43

522941112150526201783412011-12-1500:14:37

522931112150526191783382011-12-1500:13:07

order_items1

明細ID訂單ID商品ID

252578522931016840

252579522931014040

252580522941014200

252581522941001012

252582522941022245

252583522941014724

252584522941010731

252586522951023399

252587522951016840

252592522961021134

252593522961021133

252585522951021840

252588522951014040

252589522961014040

252590522961019043

2. 虛擬機器中啟動Hadoop

3. 本地新建/data/mapreduce6目錄。

mkdir-p/data/mapreduce6

4. 將兩個表上傳到虛擬機器中

5. 上傳並解壓hadoop2lib檔案

6. HDFS上新建/mymapreduce6/in目錄,然後將Linux本地/data/mapreduce6目錄下的orders1order_items1檔案匯入到HDFS/mymapreduce6/in目錄中。

hadoopfs-mkdir-p/mymapreduce6/in

hadoopfs-put/data/mapreduce6/orders1/mymapreduce6/in

hadoopfs-put/data/mapreduce6/order_items1/mymapreduce6/in

7. IDEA中編寫Java程式碼

8. package mapreduce6;
import
java.io.IOException;
import
java.util.Iterator;
import
java.util.Vector;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class
ReduceJoin {
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString()
;
if
(filePath.contains("orders1")) {
String line = value.toString()
;
String[] arr = line.split(",");
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
//System.out.println(arr[0] + "_1+" + arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line = value.toString()
;
String[] arr = line.split(",");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
//System.out.println(arr[1] + "_2+" + arr[2]);
}
}
}

public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
Vector left =
new Vector();
Vector right = new Vector();
for
(Text val : values) {
String str = val.toString()
;
if
(str.startsWith("1+")) {
left.add(str.substring(
2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(
2));
}
}

int sizeL = left.size();
int
sizeR = right.size();
//System.out.println(key + "left:"+left);
//System.out.println(key + "right:"+right);
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key
, new Text( left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t" + left.get(i) + "\t" + right.get(j));
}
}
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance()
;
job.setJobName("reducejoin");
job.setJarByClass(ReduceJoin.class);

job.setMapperClass(mymapper.class);
job.setReducerClass(myreducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Path left = new Path("hdfs://localhost:9000/mymapreduce6/in/orders1");
Path right = new Path("hdfs://localhost:9000/mymapreduce6/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce6/out");

FileInputFormat.addInputPath(job, left);
FileInputFormat.addInputPath(job, right);
FileOutputFormat.setOutputPath(job, out);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

9. hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。

10. 拷貝log4j.properties檔案

11. 執行結果