1. 程式人生 > >(十)Mapper多輸入源及Reduce多輸出檔案

(十)Mapper多輸入源及Reduce多輸出檔案

目的

Mapper多個輸入源,處理後變成一個輸出。

reduce多輸出源,根據輸出的資料,按照自己的要求來決定,輸出到不同的檔案裡。

案例

有以下兩個檔案,作為輸入源(Mapper處理)

最後把每個人的成績列印到不同的檔案裡(Reduce處理)

tom
math 90
english 98
jary
math 78
english 87
rose
math 87
english 90
tom math 67 english 87
jary math 59 english 80
rose math 79 english 60

 程式碼實現

  • 寫自定義輸入輸出元件
  • 寫兩個mapper,分別處理不同的檔案
  • Reduce程式碼實現
package hadoop04;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class ReduceDemo extends Reducer<Text, Text, Text, Text>{
	
	private MultipleOutputs<Text, Text> mos;
	
	@Override
	protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
		mos = new MultipleOutputs<>(context);
	}
	
	@Override
	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		for(Text txt : value) {
			if(key.toString().equals("jary")){
				mos.write("jary", key, txt);
			}
			if(key.toString().equals("rose")){
				mos.write("rose", key, txt);
			}
			if(key.toString().equals("tom")){
				mos.write("tom", key, txt);
			}
		}
	}

}
  • Driver類實現
package hadoop04;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import hadoop03.AuthOutputFormat;


public class DriverDemo {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "JobName");
		
		job.setJarByClass(DriverDemo.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//如果一個Mapper程式碼不能通用的解決,則需要分別指定。此時,就不能去設定 setMapperClass()了
		MultipleInputs.addInputPath(job, new Path("hdfs://192.168.80.100:9000/input/test1.txt"), 
				AuthInputFomat.class, MapperDemo01.class);
		
		MultipleInputs.addInputPath(job, new Path("hdfs://192.168.80.100:9000/input/test2.txt"), 
				TextInputFormat.class, MapperDemo02.class);
		
		MultipleOutputs.addNamedOutput(job, "jary", AuthOutputFormat.class, Text.class, Text.class);
		MultipleOutputs.addNamedOutput(job, "tom", AuthOutputFormat.class, Text.class, Text.class);
		MultipleOutputs.addNamedOutput(job, "rose", AuthOutputFormat.class, Text.class, Text.class);
		
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.80.100:9000/result"));
		
		job.waitForCompletion(true);
	}
}