MapReduce-Join中級優化-hadoop自帶datajoin的解決方法
阿新 • • 發佈:2019-02-09
接著上一篇《MapReuce-Join操作-初級優化》這一篇部落格繼續說明MapReduce對於Join的操作,這裡使用hadoop包中自帶的datajoin包來處理,如果是hadoop1.x則包在${HADOOP_HOME}/contrib/datajoin資料夾下。如果是hadoop2.x則該包在${HADOOP_HOME}/share/hadoop/tools/lib下面把包引入工程中就可以使用了。
以下是本篇部落格要處理的資料,為了我們前兩篇進行用法上的比較,這裡使用同樣的資料:
uid,name,phoneid
1,tom,40
2,jack,20
3,seven,30
4,lee,10
5,smith,20
6,張三,10
7,李四,30
8,王五,20
goodid,name
10,蘋果
20,三星
30,LG
40,華為
輸出結果:
lee 蘋果
張三 蘋果
jack 三星
smith 三星
王五 三星
seven LG
李四 LG
tom 華為
下面說說datajoin包的基本用法:
首先來看看Map端的寫法:
Map端要繼承DataJoinMapperBase類
public abstract class DataJoinMapperBase extends JobBase
首先我們要創造一個用於傳輸的實體類,必須繼承TaggedMapOutput,下面是參考程式碼以及註釋:
這裡的程式碼編寫方式是,直接繼承DataJoinReducerBase並實現combine()方法就行。
以下是本篇部落格要處理的資料,為了我們前兩篇進行用法上的比較,這裡使用同樣的資料:
uid,name,phoneid
1,tom,40
2,jack,20
3,seven,30
4,lee,10
5,smith,20
6,張三,10
7,李四,30
8,王五,20
goodid,name
10,蘋果
20,三星
30,LG
40,華為
輸出結果:
lee 蘋果
張三 蘋果
jack 三星
smith 三星
王五 三星
seven LG
李四 LG
tom 華為
下面說說datajoin包的基本用法:
首先來看看Map端的寫法:
Map端要繼承DataJoinMapperBase類
public abstract class DataJoinMapperBase extends JobBase
並實現以下幾個方法:
下面來看看configure()和map()函式的執行過程:/** * Determine the source tag based on the input file name. * * @param inputFile * @return the source tag computed from the given file name. */ protected abstract Text generateInputTag(String inputFile); /** * Generate a tagged map output value. The user code can also perform * projection/filtering. If it decides to discard the input record when * certain conditions are met,it can simply return a null. * * @param value * @return an object of TaggedMapOutput computed from the given value. */ protected abstract TaggedMapOutput generateTaggedMapOutput(Object value); /** * Generate a map output key. The user code can compute the key * programmatically, not just selecting the values of some fields. In this * sense, it is more general than the joining capabilities of SQL. * * @param aRecord * @return the group key for the given record */ protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
以上知道了map()中的處理流程過後,public void configure(JobConf job) { super.configure(job); this.job = job; this.inputFile = job.get(MRJobConfig.MAP_INPUT_FILE); //生成該map的資料的Tag this.inputTag = generateInputTag(this.inputFile); } public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { if (this.reporter == null) { this.reporter = reporter; } //記錄總記錄條數 addLongValue("totalCount", 1); //把原始行記錄成生一個TaggedMapOutput的物件 TaggedMapOutput aRecord = generateTaggedMapOutput(value); if (aRecord == null) { //記錄不合格的字條數 addLongValue("discardedCount", 1); return; } Text groupKey = generateGroupKey(aRecord); if (groupKey == null) { //記錄分組鍵為空的記錄條數 addLongValue("nullGroupKeyCount", 1); return; } //輸出分組鍵和TaggedMapOutput的物件 output.collect(groupKey, aRecord); addLongValue("collectedCount", 1); } //主要功能為把map物件中對應的name的計數器加1 protected Long addLongValue(Object name, long inc) { Long val = this.longCounters.get(name); Long retv = null; if (val == null) { retv = Long.valueOf(inc); } else { retv = Long.valueOf(val.longValue() + inc); } this.longCounters.put(name, retv); return retv; }
首先我們要創造一個用於傳輸的實體類,必須繼承TaggedMapOutput,下面是參考程式碼以及註釋:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class TaggedWritable extends TaggedMapOutput {
/**
* 這樣定義報空以下導常:
* Error: java.lang.NullPointerException
* at com.seven.mapreduce.join.TaggedWritable.readFields(TaggedWritable.java:32)
* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)
* 可以參考http://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin
* 解決。
*/
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
public void setData(Writable data) {
this.data = data;
}
public void readFields(DataInput arg0) throws IOException {
this.tag.readFields(arg0);
String dataClz = arg0.readUTF();
/**
* 根據序列化時傳入的型別進行反序列化
*/
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(arg0);
}
public void write(DataOutput arg1) throws IOException {
this.tag.write(arg1);
/**
* 寫入類名,反序列化時可以用到
*/
arg1.writeUTF(this.data.getClass().getName());
this.data.write(arg1);
}
@Override
public Writable getData() {
return data;
}
}
下面就來編寫Map端程式了:
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
public class JoinMapper extends DataJoinMapperBase {
@Override
protected Text generateInputTag(String inputFile) {
/**
* 生成對應於該Map的Tag
*/
String tagTmp = inputFile.substring(inputFile.lastIndexOf("/") + 1);
return new Text(tagTmp);
}
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
/**
* 來自父類DataJoinMapperBase的變數,在config()方法中根據檔名初始化
*/
retv.setTag(this.inputTag);
return retv;
}
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
/**
* 生成分組的鍵,如果是多個檔案,但對應的列不同,則在這裡根據inputTag來進行
* 判斷和控制
*/
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(",");
String groupKey = null;
if(this.inputTag.toString().equals("12")){
groupKey = tokens[2];
} else if (this.inputTag.toString().equals("122")){
groupKey = tokens[0];
}
return new Text(groupKey);
}
}
下面實現reduce端的程式碼:這裡不過多的介紹DataJoinReducerBase的具體執行過程,下一篇部落格會單獨的分析這個包的整個執行過程。這裡的程式碼編寫方式是,直接繼承DataJoinReducerBase並實現combine()方法就行。
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
public class JoinReducer extends DataJoinReducerBase {
/**
* combine方法用來篩選掉不需要的組合,獲得所需的聯結操作(內聯結,左聯結等)。並且
* 將結果化為合適輸出格式(如:欄位排列,去重等)
*/
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
/**
* 實現innerjoin的功能
*/
if (tags.length < 2) return null;
String joinedStr = "";
for (int i=0; i<values.length; i++) {
if (i > 0) joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(",");
/**
* 根據tag的不同,把不同檔案中的不同的欄位取出進和join操作
* 12為使用者資訊檔名 122為手機資訊檔名
*/
if(tw.getTag().equals("12")) {
joinedStr += tokens[1];
} else {
joinedStr += tokens[1];
}
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
啟動程式:import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, JobMain.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
/**
* 設定多個資料夾下面的檔案進和JOIN操作
*/
//FileInputFormat.setInputPaths(job, args[0]+ "," + args[1]);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin");
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(
new Configuration(),
new JobMain(),
args);
System.exit(res);
}
}
執行結果:
使用者資訊表
手機資訊表
執行命令:
./hadoop jar mr.jar com.seven.mapreduce.join.JobMain /input/eight /output/night00
執行結果:
總結:
這是hadoop包中自帶的join方式的使用,這是一個通用型的JOIN方法,如果熟練了可以快速的開發出JOIN功能,但在執行效率上還有可以提高的空間,下面一篇會說明《hadoop硬實戰》中的對這一個功能的優化的實現。