MapReduce中自定義檔案輸出名
阿新 • • 發佈:2018-12-16
MR的輸出結果預設為part-r-00000,我們可自定義易識別的名字替代part,如score-r-00000/
job.setOutputFormatClass(MyOut.class); MyOut.setOutputName(job, "score");//自定義輸出名 job.waitForCompletion(true); //自定義MyOut類繼承TextOutPutFormat,並覆蓋其中的setOutPutName方法,此方法在FileOutputFormat類中為protected修飾,不能直接呼叫 private static class MyOut extends TextOutputFormat{ protected static void setOutputName(JobContext job, String name) { job.getConfiguration().set(BASE_OUTPUT_NAME, name); } }
上述方法僅能簡單的替代檔名part,要想全部自定義檔名,需要重寫RecordWriter
/** * 自定義MyFileOutputFormat繼承FileOutputFormat,實現其中的getRecordWriter方法; * 該方法返回一個RecordWriter物件,需先建立此物件,實現其中的write、close方法; * 檔案通過FileSystem在write方法中寫出到hdfs自定義檔案中 */ public class MyFileOutputFormat extends FileOutputFormat<Text, Text> { @Override public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs = FileSystem.newInstance(job.getConfiguration()); //自定義輸出路徑及檔名,把數學成績和英語成績分別輸出到不同的檔案中 final FSDataOutputStream math = fs.create(new Path("/score/math.txt")); final FSDataOutputStream english = fs.create(new Path("/score/english.txt")); RecordWriter<Text, Text> recordWriter = new RecordWriter<Text, Text>() { @Override public void write(Text key, Text value) throws IOException, InterruptedException { if(key.toString().contains("math")){ math.writeUTF(key.toString()); } if(key.toString().contains("english")){ english.writeUTF(key.toString()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (math!=null) { math.close(); } if (english!=null) { english.close(); } } }; return recordWriter; } }