1. 程式人生 > >MapReduce之自定義OutputFormat

MapReduce之自定義OutputFormat

@[toc] ## OutputFormat介面實現類 `OutputFormat`是`MapReduce`輸出的基類,所有實現`MapReduce`輸出都實現了`OutputFormat`介面。下面介紹幾種常見的OutputFormat實現類。 - 文字輸出`TextoutputFormat` 預設的輸出格式是TextOutputFormat,==它把每條記錄寫為文字行==。它的鍵和值可以是任意型別,因為TextOutputFormat呼叫toString()方法把它們轉換為==字串==。 - `SequenceFileOutputFormat` 將SecquenceFileOutputFormat輸出==作為後續MapReduce任務的輸入==,這便是一種好的輸出格式,因為它的==格式緊湊,很容易被壓縮。== - ==**自定義OutputFormat**== 根據使用者需求,自定義實現輸出。 ## 自定義OutputFormat使用場景及步驟 ### 使用場景 - 為了實現控制最終檔案的輸出路徑和輸出格式,可以自定義OutputFormat。 例如:要在一個MapReduce程式中根據資料的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義OutputFormat來實現。 - 自定義OutputFormat步驟 (1)自定義一個類繼承`FileOutputFormat`。 (2)改寫`RecordWriter`,具體改寫輸出資料的方法`write()`。 ## 自定義OutputFormat 案例實操 **需求** 過濾輸入的log日誌,包含atguigu的網站輸出到==e:/atguigu.log==,不包含atguigu的網站輸出到==e:/other.log==。 **輸入資料** ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200805165733702.png) **==什麼時候需要Reduce==** ①合併 ②需要對資料排序 ==所以本案例不需要Reduce階段,key-value不需要實現序列化== **CustomOFMapper.java** ```java public class CustomOFMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String content = value.toString(); //value不需要,但是不能用Null這個關鍵字,要使用NullWritable物件 context.write(content+"\r\n", NullWritable.get()); } } ``` **MyOutPutFormat.java** ```java public class MyOutPutFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new MyRecordWriter(job);//傳遞job物件,才能在RecordWriter中獲取配置 } } ``` **MyRecordWriter.java** ```java public class MyRecordWriter extends RecordWriter { private Path atguiguPath=new Path("e:/atguigu.log"); private Path otherPath=new Path("e:/other.log"); private FSDataOutputStream atguguOS ; private FSDataOutputStream otherOS ; private FileSystem fs; private TaskAttemptContext context; public MyRecordWriter(TaskAttemptContext job) throws IOException { context=job; Configuration conf = job.getConfiguration(); fs=FileSystem.get(conf); atguiguOS = fs.create(atguiguPath); otherOS = fs.create(otherPath); } // 將key-value寫出到檔案 @Override public void write(String key, NullWritable value) throws IOException, InterruptedException { if (key.contains("atguigu")) { atguguOS.write(key.getBytes());//寫到atguigu.log //統計輸出的含有atguigu字串的key-value個數 context.getCounter("MyCounter", "atguiguCounter").increment(1); }else { otherOS.write(key.getBytes());//寫到other.log context.getCounter("MyCounter", "otherCounter").increment(1); } } // 關閉流 @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (atguguOS != null) { IOUtils.closeStream(atguguOS); } if (otherOS != null) { IOUtils.closeStream(otherOS); } if (fs != null) { fs.close(); } } } ``` **CustomOFDriver.java** ```java public class CustomOFDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("e:/mrinput/outputformat"); Path outputPath=new Path("e:/mroutput/outputformat"); //作為整個Job的配置 Configuration conf = new Configuration(); //保證輸出目錄不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①建立Job Job job = Job.getInstance(conf); //重點,設定為自定義的輸出格式 job.setJarByClass(CustomOFDriver.class); // ②設定Job // 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別 job.setMapperClass(CustomOFMapper.class); // 設定輸入目錄和輸出目錄 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 設定輸入和輸出格式 job.setOutputFormatClass(MyOutPutFormat.class); // 取消reduce階段。設定為0,預設為1 job.setNumReduceTasks(0); // ③執行Job job.waitForCompletion(true); } } ``` 輸出檔案: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200805170615910.png) ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200805170629430.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFF