06.Mapreduce例項——Reduce端join
06.Mapreduce例項——Reduce端join
實驗原理
在Reudce端進行Join連線是MapReduce框架進行表之間Join操作最為常見的模式。
1.Reduce端Join實現原理
(1)Map端的主要工作,為來自不同表(檔案)的key/value對打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。
(2)Reduce端的主要工作,在Reduce端以連線欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同檔案的記錄(在map階段已經打標誌)分開,最後進行笛卡爾只就ok了。
2.Reduce端Join的使用場景
Reduce端連線比Map端連線更為普遍,因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中,但是Reduce端連線效率比較低,因為所有資料都必須經過Shuffle過程。
3.本實驗的Reduce端Join程式碼執行流程:
(1)Map端讀取所有的檔案,並在輸出的內容里加上標識,代表資料是從哪個檔案裡來的。
(2)在Reduce處理函式中,按照標識對資料進行處理。
(3)然後將相同的key值進行Join連線操作,求出結果並直接輸出。
實驗步驟
1. 建兩個文字文件,用逗號分隔開,資料如下
orders1表
訂單ID訂單號使用者ID下單日期
523041112150526301764742011-12-1504:58:21
523031112150526291783502011-12-1504:45:31
523021112150526281722962011-12-1503:12:23
523011112150526271783482011-12-1502:37:32
523001112150526261748932011-12-1502:18:56
522991112150526251694712011-12-1501:33:46
522981112150526241783452011-12-1501:04:41
522971112150526231763692011-12-1501:02:20
522961112150526221783432011-12-1500:38:02
522951112150526211783422011-12-1500:18:43
522941112150526201783412011-12-1500:14:37
522931112150526191783382011-12-1500:13:07
order_items1表
明細ID訂單ID商品ID
252578522931016840
252579522931014040
252580522941014200
252581522941001012
252582522941022245
252583522941014724
252584522941010731
252586522951023399
252587522951016840
252592522961021134
252593522961021133
252585522951021840
252588522951014040
252589522961014040
252590522961019043
2. 虛擬機器中啟動Hadoop
3. 本地新建/data/mapreduce6目錄。
mkdir-p/data/mapreduce6
4. 將兩個表上傳到虛擬機器中
5. 上傳並解壓hadoop2lib檔案
6. 在HDFS上新建/mymapreduce6/in目錄,然後將Linux本地/data/mapreduce6目錄下的orders1和order_items1檔案匯入到HDFS的/mymapreduce6/in目錄中。
hadoopfs-mkdir-p/mymapreduce6/in
hadoopfs-put/data/mapreduce6/orders1/mymapreduce6/in
hadoopfs-put/data/mapreduce6/order_items1/mymapreduce6/in
7. IDEA中編寫Java程式碼
8. package mapreduce6;
import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath =
((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
String line = value.toString();
String[] arr = line.split(",");
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
//System.out.println(arr[0] + "_1+"
+ arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line =
value.toString();
String[] arr = line.split(",");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
//System.out.println(arr[1] + "_2+"
+ arr[2]);
}
}
}
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
Vector
left = new Vector();
Vector right = new Vector();
for (Text val : values) {
String str =
val.toString();
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key +
"left:"+left);
//System.out.println(key +
"right:"+right);
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text(
left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t"
+ left.get(i) + "\t" + right.get(j));
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("reducejoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(mymapper.class);
job.setReducerClass(myreducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path left = new Path("hdfs://localhost:9000/mymapreduce6/in/orders1");
Path right = new Path("hdfs://localhost:9000/mymapreduce6/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce6/out");
FileInputFormat.addInputPath(job, left);
FileInputFormat.addInputPath(job, right);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
9. 將hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。
10. 拷貝log4j.properties檔案
11. 執行結果