1. 程式人生 > 實用技巧 >MapReduce之自定義InputFormat

MapReduce之自定義InputFormat

在企業開發中,Hadoop框架自帶的InputFormat型別不能滿足所有應用場景,需要自定義InputFormat來解決實際問題。

自定義InputFormat步驟如下:

  • (1)自定義一個類繼承FilelnputFormat
  • (2)自定義一個類繼承RecordReader,實現一次讀取一個完整檔案,將檔名為key,檔案內容為value。
  • (3)在輸出時使用SequenceFileOutPutFormat輸出合併檔案。

無論HDFS還是MapReduce,在處理小檔案時效率都非常低,但又難免面臨處理大量小檔案的場景,此時,就需要有相應解決方案。可以自定義InputFormat實現小檔案的合併。

1. 需求

將多個小檔案合併成一個SequenceFile檔案(SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value(bytes) 對的檔案格式),SequenceFile裡面儲存著多個檔案,儲存的形式為檔案路徑+名稱為key,檔案內容為value。

(1)輸入資料



(2)期望輸出檔案格式

2. 需求分析

  1. 自定義一個類繼承FileInputFormat

    (1)重寫isSplitable()方法,返回false,讓檔案不可切,整個檔案作為1片

    (2)重寫createRecordReader(),返回自定義的RecordReader物件

  2. 自定義一個類繼承RecordReader



    在RecordReader中,nextKeyValue()是最重要的方法,返回當前讀取到的key-value,如果讀到返回true,呼叫Mapper的map()來處理,否則返回false

3. 編寫程式

MyInputFormat.java

/*
* 1. 改變切片策略,一個檔案固定切1片,通過指定檔案不可切
*
* 2. 提供RR ,這個RR讀取切片的檔名作為key,讀取切片的內容封裝到bytes作為value
*/
public class MyInputFormat extends FileInputFormat { @Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new MyRecordReader();
} @Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}

MyRecordReader.java

/*
* RecordReader從MapTask處理的當前切片中讀取資料
*
* XXXContext都是Job的上下文,通過XXXContext可以獲取Job的配置Configuration物件
*/
public class MyRecordReader extends RecordReader { private Text key;
private BytesWritable value; private String filename;
private int length; private FileSystem fs;
private Path path; private FSDataInputStream is; private boolean flag=true; // MyRecordReader在建立後,在進入Mapper的run()之前,自動呼叫
// 檔案的所有內容設定為1個切片,切片的長度等於檔案的長度
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit=(FileSplit) split; filename=fileSplit.getPath().getName(); length=(int) fileSplit.getLength(); path=fileSplit.getPath(); //獲取當前Job的配置物件
Configuration conf = context.getConfiguration(); //獲取當前Job使用的檔案系統
fs=FileSystem.get(conf); is = fs.open(path); } // 讀取一組輸入的key-value,讀到返回true,否則返回false
// 將檔案的名稱封裝為key,將檔案的內容封裝為BytesWritable型別的value,返回true
// 第二次呼叫nextKeyValue()返回false
@Override
public boolean nextKeyValue() throws IOException, InterruptedException { if (flag) { //例項化物件
if (key==null) {
key=new Text();
} if (value==null) {
value=new BytesWritable();
} //賦值
//將檔名封裝到key中
key.set(filename); // 將檔案的內容讀取到BytesWritable中
byte [] content=new byte[length]; IOUtils.readFully(is, content, 0, length); value.set(content, 0, length); flag=false; return true; }
return false;
} //返回當前讀取到的key-value中的key
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return key;
} //返回當前讀取到的key-value中的value
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return value;
} //返回讀取切片的進度
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
} // 在Mapper的輸入關閉時呼叫,清理工作
@Override
public void close() throws IOException {
if (is != null) {
IOUtils.closeStream(is);
}
if (fs !=null) {
fs.close();
}
}
}

CustomIFMapper.java

public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFReducer.java

public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFDriver.java

public class CustomIFDriver {

	public static void main(String[] args) throws Exception {

		Path inputPath=new Path("e:/mrinput/custom");
Path outputPath=new Path("e:/mroutput/custom"); //作為整個Job的配置
Configuration conf = new Configuration();
//保證輸出目錄不存在
FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
} // 建立Job
Job job = Job.getInstance(conf); // 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
job.setMapperClass(CustomIFMapper.class);
job.setReducerClass(CustomIFReducer.class); // Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
// 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class); // 設定輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath); // 設定輸入和輸出格式
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class); // ③執行Job
job.waitForCompletion(true); }
}