關於MapReduce join操作
阿新 • • 發佈:2019-02-11
使用者表:ID+name+sex
使用者行為表:ID+City+action+notes
Join完成後的形式:ID+name+sex+city+action+notes
package com.qst.DateJoin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable ;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper ;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
public class Data {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text theKey = new Text();
private Text theValue = new Text();
/*Map方法:輸入的key是行號,輸入的value是每一行的資料
輸出的資料key是兩個檔案共有的使用者ID,輸出的value是每個檔案除ID以外的資料*/
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
//獲取檔案的路徑
String filePath = ((FileSplit) reporter.getInputSplit()).getPath().toString();
//將每次傳過來的Value按照\t分隔
String[] line = value.toString().split("\\t");
//判斷Value是否為空
if (line == null || line.equals(""))
return;
//判斷擷取的字串陣列的長度,因為使用者表每一行至少有三個屬性,所以長度至少為3
if (line.length < 3)
return;
//根據檔名,判斷Map中輸入的Value屬於user還是屬於Action
if (filePath.contains("userfile")) {
//獲取ID
String userid = line[0];
//將除ID以外的值拼接起來傳遞給Reduce
String userValue = line[1] + "\t" + line[2];
theKey.set(userid);
//在傳遞的Value中新增標識,以便在Reduce的時候分辨Value是來自使用者表還是來自使用者行為表
theValue.set("u:" + userValue);
output.collect(theKey, theValue);
} else if (filePath.contains("actionfile")) {
String userid = line[0];
String userValue = line[1] + "\t" + line[2] + "\t" + line[3] + "\t";
theKey.set(userid);
theValue.set("a:" + userValue);
output.collect(theKey, theValue);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
//因為傳遞過來的Value不止一個是Iterator形式的,所以分別建立連結串列來儲存User的Value和Action的Value
List<String> userlist = new ArrayList<String>();
List<String> valuelist = new ArrayList<String>();
while (values.hasNext()) {
//遍歷Value,因為Value是Text型別的,所以轉換成字串型別
String value = values.next().toString();
//通過在Map中新增的識別符號來把Value分開,以便拼接
if (value.startsWith("u:")) {
String user = value.substring(2);
userlist.add(user);
} else if (value.startsWith("a:")) {
String val = value.substring(2);
valuelist.add(val);
}
}
//最後輸出的形式是一個User和許多User的行為,所以遍歷兩個連結串列將數值拼接
int i, j;
for (i = 0; i < userlist.size(); i++) {
for (j = 0; j < valuelist.size(); j++) {
output.collect(key, new Text(userlist.get(i) + "\t" + valuelist.get(j)));
}
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(DataJoin.class);
conf.setJobName("Data Join");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}