大數據學習之自定義輸出 13
阿新 • • 發佈:2019-05-12
系統 java pub 什麽 rri args sda stream out
二:自定義輸出
自定義輸出
需求:過濾日誌文件
把包含itstaredu的放在一個文件中 d:/itstaredu.log
把不包含itstaredu的放在另外一個文件 d:/other.log
1:自定義編寫FileOutputFormate
package it.dawn.YARNPra.自定義.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author Dawn * @date 2019年5月11日23:45:47 * @version 1.0 * 類似自定義輸入,根據源碼自己寫一個FileOutputFormat * 繼承FileOutputFormat */ public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileRecordWriter recordWriter = new FileRecordWriter(job); return recordWriter; } }
2 : 自定義編寫FileRecordWriter類
package it.dawn.YARNPra.自定義.outputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * @author Dawn * @date 2019年5月11日23:48:31 * @version 1.0 * 繼承 RecordWriter */ public class FileRecordWriter extends RecordWriter<Text, NullWritable>{ Configuration conf=null; FSDataOutputStream itstarlog=null; FSDataOutputStream otherlog=null; //1.定義數據輸出路徑 public FileRecordWriter(TaskAttemptContext job) throws IOException { //獲取配置信息 conf=job.getConfiguration(); //獲取文件系統 FileSystem fs=FileSystem.get(conf); //定義輸出路徑 //默認就是那個我們很熟悉的part-r-00000。這裏我們把它自定義成itstar.log other.log itstarlog=fs.create(new Path("f:/temp/outputformateSelf/fileoutSelf1/itstar.log")); otherlog=fs.create(new Path("f:/temp/outputformateSelf/fileoutSelf2/other.log")); } //2.數據輸出 @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { //判斷的話根據key if(key.toString().contains("itstar")) { //寫出到文件 itstarlog.write(key.getBytes()); }else { otherlog.write(key.getBytes()); } } //3.關閉資源 @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(null != itstarlog) { itstarlog.close(); } if(null != otherlog) { otherlog.close(); } } }
3:編寫MR
mapper
package it.dawn.YARNPra.自定義.outputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author Dawn * @date 2019年5月11日23:58:27 * @version 1.0 * 直接代碼一把梭,寫出去 */ public class FileMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } }
Reduce:
package it.dawn.YARNPra.自定義.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FileReducer extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //換個行吧! String k = key.toString()+"\n"; context.write(new Text(k), NullWritable.get()); } }
Driver類:
package it.dawn.YARNPra.自定義.outputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author Dawn * @date 2019年5月12日00:03:03 * @version 1.0 * * 這裏大家可能有個小疑問? * 就是我們已近在自定義輸出的時候,已經指定了輸出位置。為什麽我們這裏還是要寫輸出位置? * * 大家可以這樣想下,就是我們不進行自定義輸出的時候,是不是每次任務之後, * 會出現一大堆的文件 ._SUCCESS.crc .part-r-00000.crc _SUCCESS part-r-00000這4個的嘛。 * 而我們再自己寫的自定義輸出的時候,其實只是對part-r-00000文件指定了位置,而其他的什麽 ._SUCCESS.crc ...這些沒做處理啊!! * */ public class FileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.獲取job信息 Configuration conf = new Configuration(); Job job=Job.getInstance(conf); // 2.獲取jar包 job.setJarByClass(FileDriver.class); // 3.獲取自定義的mapper與reducer類 job.setMapperClass(FileMapper.class); job.setReducerClass(FileReducer.class); // 4.設置map輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5.設置reduce輸出的數據類型(最終的數據類型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //設置自定outputFormat job.setOutputFormatClass(FuncFileOutputFormat.class); // 6.設置輸入存在的路徑與處理後的結果路徑 FileInputFormat.setInputPaths(job, new Path("f:/temp/流量日誌.dat")); FileOutputFormat.setOutputPath(job, new Path("f:/temp/outputformateSelf")); // 7.提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs? "成功":"失敗"); } }
運行截圖:
輸入:
輸出(看好了 路徑根據 FileRecordWriter類中的一樣 ):
===============================================================
=============================================================================================
大數據學習之自定義輸出 13