1. 程式人生 > >MapReduce之自定義InputFormat

MapReduce之自定義InputFormat

>在企業開發中,Hadoop框架自帶的`InputFormat`型別不能滿足所有應用場景,需要==自定義==InputFormat來解決實際問題。 自定義InputFormat步驟如下: - (1)自定義一個類繼承`FilelnputFormat`。 - (2)自定義一個類繼承`RecordReader`,實現一次讀取一個完整檔案,將檔名為key,檔案內容為value。 - (3)在輸出時使用`SequenceFileOutPutFormat`輸出合併檔案。 >無論HDFS還是MapReduce,在處理小檔案時效率都非常低,但又難免面臨處理大量小檔案的場景,此時,就需要有相應解決方案。可以自定義InputFormat實現==小檔案的合併==。 ## 1. 需求 將多個小檔案合併成一個`SequenceFile`檔案(SequenceFile檔案是Hadoop用來儲存二進位制形式的==key-value(bytes)== 對的檔案格式),SequenceFile裡面儲存著多個檔案,儲存的形式為==檔案路徑+名稱為key,檔案內容為value==。 (1)輸入資料 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200719120909545.png) (2)期望輸出檔案格式 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200719120924789.png) ## 2. 需求分析 1. 自定義一個類繼承`FileInputFormat` (1)重寫`isSplitable()`方法,返回`false`,讓檔案不可切,**整個檔案作為1片**。 (2)重寫createRecordReader(),返回自定義的RecordReader物件 2. 自定義一個類繼承`RecordReader` 在RecordReader中,`nextKeyValue`()是最重要的方法,返回當前讀取到的`key-value`,如果讀到返回`true`,呼叫Mapper的map()來處理,否則返回`false` ## 3. 編寫程式 **MyInputFormat.java** ```java /* * 1. 改變切片策略,一個檔案固定切1片,通過指定檔案不可切 * * 2. 提供RR ,這個RR讀取切片的檔名作為key,讀取切片的內容封裝到bytes作為value */ public class MyInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new MyRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } } ``` **MyRecordReader.java** ```java /* * RecordReader從MapTask處理的當前切片中讀取資料 * * XXXContext都是Job的上下文,通過XXXContext可以獲取Job的配置Configuration物件 */ public class MyRecordReader extends RecordReader { private Text key; private BytesWritable value; private String filename; private int length; private FileSystem fs; private Path path; private FSDataInputStream is; private boolean flag=true; // MyRecordReader在建立後,在進入Mapper的run()之前,自動呼叫 // 檔案的所有內容設定為1個切片,切片的長度等於檔案的長度 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit=(FileSplit) split; filename=fileSplit.getPath().getName(); length=(int) fileSplit.getLength(); path=fileSplit.getPath(); //獲取當前Job的配置物件 Configuration conf = context.getConfiguration(); //獲取當前Job使用的檔案系統 fs=FileSystem.get(conf); is = fs.open(path); } // 讀取一組輸入的key-value,讀到返回true,否則返回false // 將檔案的名稱封裝為key,將檔案的內容封裝為BytesWritable型別的value,返回true // 第二次呼叫nextKeyValue()返回false @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (flag) { //例項化物件 if (key==null) { key=new Text(); } if (value==null) { value=new BytesWritable(); } //賦值 //將檔名封裝到key中 key.set(filename); // 將檔案的內容讀取到BytesWritable中 byte [] content=new byte[length]; IOUtils.readFully(is, content, 0, length); value.set(content, 0, length); flag=false; return true; } return false; } //返回當前讀取到的key-value中的key @Override public Object getCurrentKey() throws IOException, InterruptedException { return key; } //返回當前讀取到的key-value中的value @Override public Object getCurrentValue() throws IOException, InterruptedException { return value; } //返回讀取切片的進度 @Override public float getProgress() throws IOException, InterruptedException { return 0; } // 在Mapper的輸入關閉時呼叫,清理工作 @Override public void close() throws IOException { if (is != null) { IOUtils.closeStream(is); } if (fs !=null) { fs.close(); } } } ``` **CustomIFMapper.java** ```java public class CustomIFMapper extends Mapper{ } ``` **CustomIFReducer.java** ```java public class CustomIFReducer extends Reducer{ } ``` **CustomIFDriver.java** ```java public class CustomIFDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("e:/mrinput/custom"); Path outputPath=new Path("e:/mroutput/custom"); //作為整個Job的配置 Configuration conf = new Configuration(); //保證輸出目錄不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // 建立Job Job job = Job.getInstance(conf); // 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別 job.setMapperClass(CustomIFMapper.class); job.setReducerClass(CustomIFReducer.class); // Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化 // 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 設定輸入目錄和輸出目錄 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 設定輸入和輸出格式 job.setInputFormatClass(MyInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); // ③執行Job job.waitForCompletion(true);