Hadoop使用DATAJOIN軟體包連結不同來源的資料
阿新 • • 發佈:2018-12-24
具體參見《Hadoop in action》
這裡說一下幾個問題:這幾個問題在stackoverflow 得到了解決
(1)如何輸入多個檔案
- 將多個檔案放入一個資料夾,輸入路徑寫資料夾的路徑
- MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,MapClass.class);
(2)TaggedWritable要定義一個無參的建構函式,後面reduce反射的時候會用到
(3)呼叫data.readFields的時候,data有可能是空,而且並不知道data的型別,所以在TaggedWritable的write方法序列化data之前,儲存一下data的類名,然後在readFields檢查。
程式碼如下:
package Chapter5;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
import javax.lang.model.SourceVersion;
import javax.print.DocFlavor.STRING;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.util.EnumCounters.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DataJoin extends Configured implements Tool {
public static class TaggedWritable extends TaggedMapOutput{
private Writable data;
public TaggedWritable() {
this.tag=new Text();
}
public TaggedWritable(Writable data) {
this.tag=new Text("");
this.data = data;
}
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null || !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}
@Override
public Writable getData() {
// TODO Auto-generated method stub
return data;
}
public void setData(Writable data){
this.data=data;
}
}
public static class MapClass extends DataJoinMapperBase{
@Override
protected Text generateGroupKey(TaggedMapOutput arg0) {
// TODO Auto-generated method stub
String line=((Text)arg0.getData()).toString();
String tokens []=line.split(",");
return new Text(tokens[0]);
}
@Override
protected Text generateInputTag(String arg0) {
// TODO Auto-generated method stub
String datasource=arg0.split("-")[0];
return new Text(datasource);
}
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object arg0) {
// TODO Auto-generated method stub
TaggedMapOutput res=new TaggedWritable((Text)arg0);
res.setTag(this.inputTag);
return res;
}
}
public static class Reduce extends DataJoinReducerBase{
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
// TODO Auto-generated method stub
if(tags.length<2)return null;
String res="";
for(int i=0;i<values.length;i++){
if(i>0)res+=",";
TaggedWritable tmp=(TaggedWritable)values[i];
String line=((Text)tmp.getData()).toString();
String tokens[]=line.split(",",2);
res+=tokens[1];
}
TaggedWritable retv=new TaggedWritable(new Text(res));
retv.setTag((Text)tags[0]);
return retv;
}
}
public int run(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration configuration=getConf();
JobConf job=new JobConf(configuration,DataJoin.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
int res=ToolRunner.run(new Configuration(), new DataJoin(), args);
System.exit(res);
}
}