1. 程式人生 > >MapReduce對輸入多檔案的處理

MapReduce對輸入多檔案的處理

 MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式
 現有兩份資料
 phone
 123,good number
 124,common number
 125,bad number

 user
 zhangsan,123
 lisi,124
 wangwu,125

 現在需要把user和phone按照phone number連線起來。得到下面的結果
 zhangsan,123,good number
 lisi,123,common number
 wangwu,125,bad number

分析思路

還是相當於兩張表的一對一join操作。join時對value設定個Bean(JavaBean實現writablecomparable介面),key為外來鍵值

本例中將通過value進行排序,即在value的JavaBean中通過實習CompareTo()方法,完成排序,使得phone表位於首位

1.對value實現JavaBean(實現writablecomparable介面)

package test.mr.multiinputs;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/*
 * 自定義的JavaBean
 */
public class FlagString implements WritableComparable<FlagString> {
	private String value;
	private int flag; // 標記 0:表示phone表 1:表示user表

	public FlagString() {
		super();
		// TODO Auto-generated constructor stub
	}

	public FlagString(String value, int flag) {
		super();
		this.value = value;
		this.flag = flag;
	}

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}

	public int getFlag() {
		return flag;
	}

	public void setFlag(int flag) {
		this.flag = flag;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(flag);
		out.writeUTF(value);

	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.flag = in.readInt();
		this.value = in.readUTF();
	}

	@Override
	public int compareTo(FlagString o) {
		if (this.flag >= o.getFlag()) {
			if (this.flag > o.getFlag()) {
				return 1;
			}
		} else {
			return -1;
		}
		return this.value.compareTo(o.getValue());
	}

}


2.多map類,map1(實現對phone表文件操作)

package test.mr.multiinputs;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultiMap1 extends Mapper<LongWritable, Text, Text, FlagString> {
	private String delimiter; // 定義分隔符,由job端設定

	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}

	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString().trim();
		if (line.length() > 0) {
			String[] str = line.split(delimiter);
			if (str.length == 2) {
				context.write(new Text(str[0].trim()),
						new FlagString(str[1].trim(), 0)); // flag=0,表示phone表
			}
		}
	}
}


2.map2(實現對user表文件操作)

package test.mr.multiinputs;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultiMap2 extends Mapper<LongWritable, Text, Text, FlagString> {
	private String delimiter; // 設定分隔符

	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}

	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString().trim();
		if (line.length() > 0) {
			String[] str = line.split(delimiter);
			if (str.length == 2) {
				context.write(new Text(str[1].trim()),
						new FlagString(str[0].trim(), 1)); // flag=1為user表
			}
		}
	}
}


3.reduce類

package test.mr.multiinputs;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MultiRedu extends Reducer<Text, FlagString, NullWritable, Text> {
	private String delimiter; // 設定分隔符

	@Override
	protected void setup(
			Reducer<Text, FlagString, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}

	@Override
	protected void reduce(Text key, Iterable<FlagString> values,
			Reducer<Text, FlagString, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		// 最後輸出的格式為: uservalue,key,phonevalue
		String phoneValue = "";
		String userValue = "";
		int num = 0;
		for (FlagString value : values) {
			// 第一個即為phone表
			if (num == 0) {
				phoneValue = value.getValue();
				num++;
			} else {
				userValue = value.getValue();
				context.write(NullWritable.get(),
						new Text(userValue + key.toString() + phoneValue));
			}
		}
	}
}


4.job類(關鍵!!實現多檔案的輸入格式等)

package test.mr.multiinputs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式
 現有兩份資料
 phone
 123,good number
 124,common number
 123,bad number

 user
 zhangsan,123
 lisi,124
 wangwu,125

 現在需要把user和phone按照phone number連線起來。得到下面的結果
 zhangsan,123,good number
 lisi,123,common number
 wangwu,125,bad number
 */
public class MultiMapMain extends Configuration implements Tool {
	private String input1 = null; // 定義的多個輸入檔案
	private String input2 = null;
	private String output = null;
	private String delimiter = null;

	@Override
	public void setConf(Configuration conf) {

	}

	@Override
	public Configuration getConf() {
		return new Configuration();
	}

	@Override
	public int run(String[] args) throws Exception {
		setArgs(args);
		checkParam();// 對引數進行檢測

		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(MultiMapMain.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlagString.class);

		job.setReducerClass(MultiRedu.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		// MultipleInputs類新增檔案路徑
		MultipleInputs.addInputPath(job, new Path(input1),
				TextInputFormat.class, MultiMap1.class);
		MultipleInputs.addInputPath(job, new Path(input2),
				TextInputFormat.class, MultiMap2.class);

		FileOutputFormat.setOutputPath(job, new Path(output));
		job.waitForCompletion(true);
		return 0;
	}

	private void checkParam() {
		if (input1 == null || "".equals(input1.trim())) {
			System.out.println("no input phone-data path");
			userMaunel();
			System.exit(-1);
		}
		if (input2 == null || "".equals(input2.trim())) {
			System.out.println("no input user-data path");
			userMaunel();
			System.exit(-1);
		}
		if (output == null || "".equals(output.trim())) {
			System.out.println("no output path");
			userMaunel();
			System.exit(-1);
		}
		if (delimiter == null || "".equals(delimiter.trim())) {
			System.out.println("no delimiter");
			userMaunel();
			System.exit(-1);
		}

	}

	// 使用者手冊
	private void userMaunel() {
		System.err.println("Usage:");
		System.err.println("-i1 input \t phone data path.");
		System.err.println("-i2 input \t user data path.");
		System.err.println("-o output \t output data path.");
		System.err.println("-delimiter data delimiter \t default comma.");
	}

	// 對屬性進行賦值
	// 設定輸入的格式:-i1 xxx(輸入目錄) -i2 xxx(輸入目錄) -o xxx(輸出目錄) -delimiter x(分隔符)
	private void setArgs(String[] args) {
		for (int i = 0; i < args.length; i++) {
			if ("-i1".equals(args[i])) {
				input1 = args[++i]; // 將input1賦值為第一個檔案的輸入路徑
			} else if ("-i2".equals(args[i])) {
				input2 = args[++i];
			} else if ("-o".equals(args[i])) {
				output = args[++i];
			} else if ("-delimiter".equals(args[i])) {
				delimiter = args[++i];
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		ToolRunner.run(conf, new MultiMapMain(), args); // 呼叫run方法
	}
}