MapReduce的常見輸入格式之NlineInputFormat
阿新 • • 發佈:2020-07-17
有兩個檔案:
NlineInputFormat
-
切片策略: 讀取配置檔案中的引數
mapreduce.input.lineinputformat.linespermap
,預設為1,以檔案為單位,切片每此引數行作為1片! -
既然有引數,那就可以修改,設定為每N行切為一片:
Configuration conf = new Configuration();
conf.set("mapreduce.input.lineinputformat.linespermap", "2")
RecordReader
:LineRecordReader
,一次處理一行,將一行內容的偏移量
它們的資料型別:
LongWritable key
Text value
所以上面兩個檔案總共八行,若一行切一片,則有八片;兩行切一片,則有四片。
WCMapper.java
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text out_key=new Text(); private IntWritable out_value=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.println("keyin:"+key+"----keyout:"+value); String[] words = value.toString().split("\t"); for (String word : words) { out_key.set(word); //寫出資料(單詞,1) context.write(out_key, out_value); } } }
WCReducer.java
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable out_value=new IntWritable(); // reduce一次處理一組資料,key相同的視為一組 @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable intWritable : values) { sum+=intWritable.get(); } out_value.set(sum); //將累加的值寫出 context.write(key, out_value); } }
WCDriver.java
public class WCDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("e:/mrinput/nline");
Path outputPath=new Path("e:/mroutput/nline");
//作為整個Job的配置
Configuration conf = new Configuration();
conf.set("mapreduce.input.lineinputformat.linespermap", "2");//設定為每兩行切一片
//保證輸出目錄不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①建立Job
Job job = Job.getInstance(conf);
job.setJarByClass(WCDriver.class);
// ②設定Job
// 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
// Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
// 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 宣告使用NLineInputFormat
job.setInputFormatClass(NLineInputFormat.class);
// 設定輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// ③執行Job
job.waitForCompletion(true);
}
}