MapReduce對輸入多檔案的處理
阿新 • • 發佈:2019-01-27
MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式
現有兩份資料
phone
123,good number
124,common number
125,bad number
user
zhangsan,123
lisi,124
wangwu,125
現在需要把user和phone按照phone number連線起來。得到下面的結果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number
分析思路
還是相當於兩張表的一對一join操作。join時對value設定個Bean(JavaBean實現writablecomparable介面),key為外來鍵值
本例中將通過value進行排序,即在value的JavaBean中通過實習CompareTo()方法,完成排序,使得phone表位於首位
1.對value實現JavaBean(實現writablecomparable介面)
package test.mr.multiinputs; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /* * 自定義的JavaBean */ public class FlagString implements WritableComparable<FlagString> { private String value; private int flag; // 標記 0:表示phone表 1:表示user表 public FlagString() { super(); // TODO Auto-generated constructor stub } public FlagString(String value, int flag) { super(); this.value = value; this.flag = flag; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } @Override public void write(DataOutput out) throws IOException { out.writeInt(flag); out.writeUTF(value); } @Override public void readFields(DataInput in) throws IOException { this.flag = in.readInt(); this.value = in.readUTF(); } @Override public int compareTo(FlagString o) { if (this.flag >= o.getFlag()) { if (this.flag > o.getFlag()) { return 1; } } else { return -1; } return this.value.compareTo(o.getValue()); } }
2.多map類,map1(實現對phone表文件操作)
package test.mr.multiinputs; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MultiMap1 extends Mapper<LongWritable, Text, Text, FlagString> { private String delimiter; // 定義分隔符,由job端設定 @Override protected void setup( Mapper<LongWritable, Text, Text, FlagString>.Context context) throws IOException, InterruptedException { delimiter = context.getConfiguration().get("delimiter", ","); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlagString>.Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if (line.length() > 0) { String[] str = line.split(delimiter); if (str.length == 2) { context.write(new Text(str[0].trim()), new FlagString(str[1].trim(), 0)); // flag=0,表示phone表 } } } }
2.map2(實現對user表文件操作)
package test.mr.multiinputs;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MultiMap2 extends Mapper<LongWritable, Text, Text, FlagString> {
private String delimiter; // 設定分隔符
@Override
protected void setup(
Mapper<LongWritable, Text, Text, FlagString>.Context context)
throws IOException, InterruptedException {
delimiter = context.getConfiguration().get("delimiter", ",");
}
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, FlagString>.Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
if (line.length() > 0) {
String[] str = line.split(delimiter);
if (str.length == 2) {
context.write(new Text(str[1].trim()),
new FlagString(str[0].trim(), 1)); // flag=1為user表
}
}
}
}
3.reduce類
package test.mr.multiinputs;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MultiRedu extends Reducer<Text, FlagString, NullWritable, Text> {
private String delimiter; // 設定分隔符
@Override
protected void setup(
Reducer<Text, FlagString, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
delimiter = context.getConfiguration().get("delimiter", ",");
}
@Override
protected void reduce(Text key, Iterable<FlagString> values,
Reducer<Text, FlagString, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// 最後輸出的格式為: uservalue,key,phonevalue
String phoneValue = "";
String userValue = "";
int num = 0;
for (FlagString value : values) {
// 第一個即為phone表
if (num == 0) {
phoneValue = value.getValue();
num++;
} else {
userValue = value.getValue();
context.write(NullWritable.get(),
new Text(userValue + key.toString() + phoneValue));
}
}
}
}
4.job類(關鍵!!實現多檔案的輸入格式等)
package test.mr.multiinputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*
* MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式
現有兩份資料
phone
123,good number
124,common number
123,bad number
user
zhangsan,123
lisi,124
wangwu,125
現在需要把user和phone按照phone number連線起來。得到下面的結果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number
*/
public class MultiMapMain extends Configuration implements Tool {
private String input1 = null; // 定義的多個輸入檔案
private String input2 = null;
private String output = null;
private String delimiter = null;
@Override
public void setConf(Configuration conf) {
}
@Override
public Configuration getConf() {
return new Configuration();
}
@Override
public int run(String[] args) throws Exception {
setArgs(args);
checkParam();// 對引數進行檢測
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MultiMapMain.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlagString.class);
job.setReducerClass(MultiRedu.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// MultipleInputs類新增檔案路徑
MultipleInputs.addInputPath(job, new Path(input1),
TextInputFormat.class, MultiMap1.class);
MultipleInputs.addInputPath(job, new Path(input2),
TextInputFormat.class, MultiMap2.class);
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
return 0;
}
private void checkParam() {
if (input1 == null || "".equals(input1.trim())) {
System.out.println("no input phone-data path");
userMaunel();
System.exit(-1);
}
if (input2 == null || "".equals(input2.trim())) {
System.out.println("no input user-data path");
userMaunel();
System.exit(-1);
}
if (output == null || "".equals(output.trim())) {
System.out.println("no output path");
userMaunel();
System.exit(-1);
}
if (delimiter == null || "".equals(delimiter.trim())) {
System.out.println("no delimiter");
userMaunel();
System.exit(-1);
}
}
// 使用者手冊
private void userMaunel() {
System.err.println("Usage:");
System.err.println("-i1 input \t phone data path.");
System.err.println("-i2 input \t user data path.");
System.err.println("-o output \t output data path.");
System.err.println("-delimiter data delimiter \t default comma.");
}
// 對屬性進行賦值
// 設定輸入的格式:-i1 xxx(輸入目錄) -i2 xxx(輸入目錄) -o xxx(輸出目錄) -delimiter x(分隔符)
private void setArgs(String[] args) {
for (int i = 0; i < args.length; i++) {
if ("-i1".equals(args[i])) {
input1 = args[++i]; // 將input1賦值為第一個檔案的輸入路徑
} else if ("-i2".equals(args[i])) {
input2 = args[++i];
} else if ("-o".equals(args[i])) {
output = args[++i];
} else if ("-delimiter".equals(args[i])) {
delimiter = args[++i];
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
ToolRunner.run(conf, new MultiMapMain(), args); // 呼叫run方法
}
}