1. 程式人生 > 實用技巧 >MapReduce的常見輸入格式之NlineInputFormat

MapReduce的常見輸入格式之NlineInputFormat

有兩個檔案:

NlineInputFormat

  • 切片策略: 讀取配置檔案中的引數mapreduce.input.lineinputformat.linespermap,預設為1,以檔案為單位,切片每此引數行作為1片!

  • 既然有引數,那就可以修改,設定為每N行切為一片:

Configuration conf = new Configuration();
conf.set("mapreduce.input.lineinputformat.linespermap", "2")

RecordReaderLineRecordReader,一次處理一行,將一行內容的偏移量

作為key,一行內容作為value
它們的資料型別:

LongWritable key
Text value

所以上面兩個檔案總共八行,若一行切一片,則有八片;兩行切一片,則有四片。

WCMapper.java

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	private Text out_key=new Text();
	private IntWritable out_value=new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
	
		System.out.println("keyin:"+key+"----keyout:"+value);
		
		String[] words = value.toString().split("\t");
		
		for (String word : words) {
			out_key.set(word);
			//寫出資料(單詞,1)
			context.write(out_key, out_value);
		}
		
	}
}

WCReducer.java

public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	private IntWritable out_value=new IntWritable();
	
	// reduce一次處理一組資料,key相同的視為一組
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		
		int sum=0;
		
		for (IntWritable intWritable : values) {
			sum+=intWritable.get();	
		}
		
		out_value.set(sum);
		
		//將累加的值寫出
		context.write(key, out_value);
		
	}
}

WCDriver.java

public class WCDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/nline");
		Path outputPath=new Path("e:/mroutput/nline");
	
		//作為整個Job的配置
		Configuration conf = new Configuration();
		
		conf.set("mapreduce.input.lineinputformat.linespermap", "2");//設定為每兩行切一片
		
		//保證輸出目錄不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		
		// ①建立Job
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(WCDriver.class);
		
		// ②設定Job
		// 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
		job.setMapperClass(WCMapper.class);
		job.setReducerClass(WCReducer.class);
		
		// Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
		// 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 宣告使用NLineInputFormat
		job.setInputFormatClass(NLineInputFormat.class);
		
		// 設定輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// ③執行Job
		job.waitForCompletion(true);
		
		
	}
}