1. 程式人生 > >outputformat自定義--資料過濾

outputformat自定義--資料過濾

outputformat自定義--資料過濾
    需求:過濾日誌檔案
        把包含itstaredu的放在一個檔案
        把不包含itstaredu的放在一個檔案
        public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable>{
            @Override
            public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttempContext job)
                    throws IOException, InterruptException {
                FileRecordWriter fileRecordWriter = new FileRecordWriter(job);

                return fileRecordWriter;
            }
        }

        public class FileRecordWriter extends RecordWriter<Text, NullWritable>{
            FSDataOutputStream other = null;
            FSDataOutputStream itstarlog = null;
            Configuration conf = null;

            //1、定義資料輸出路徑
            public FileRecordWriter(TaskAttempContext job){
                //需要配置資訊
                conf = job.getConfiguration();

                //獲取檔案系統
                FileSystem fs = FileSystem.get(conf);

                //定義輸出路徑
                itstarlog = fs.create(new Path("C:/outitstaredu/itstar.logs"));
                other = fs.create(new Path("C:/outputother/other.logs"));

            }
            //2、資料輸出
            @Override
            public void writer(Text key, NullWritable value) throws IOException, InterruptException{
                //判斷的話根據key
                if(key.toString().contains("itstar")){
                    //寫出道檔案
                    itstarlog.write(key.getBytes());
                }else if {
                    other.write(key.getBytes());
                }

            }
            //3、關閉資源
            @Override
            public void close(TaskAttempContext context) throws IOException, InterruptException{
                if(null != itstarlog){
                    itstarlog.close();
                }
                if(null != other){
                    other.close();
                }

            }
        }
        public class FileMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        
            @Override
            public void map(LongWritable key, Text value, Context context)
                    throw IOException, InterruptException{
                //輸出
                context.write(value, NullWritable.get());

            }
        }

        public class FileReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
        
            @Override
            public void reduce(Text key, Interable<NullWritable> value, Context context)
                    throw IOException, InterruptException{
                //輸出
                String k = key.toString;

                context.write(new Text(k), NullWritable.get());

            }
        }

        public class FileDriver{
        public static void main(String[] args) throws IOException, ClassNotFoundException,InterruptException{
            Configuration conf = new Configuration();
            Job job = Job.getInstance();

            job.setJarByClass(FileDriver.class);

            job.setMapperClass(FileMapper.class);
            job.setReducerClass(FileReducer.class);

            job.setMapOutputValueClass(NullWritable.class);
            job.setMapOutputKeyClass(Text.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            //設定自定義的outputformat
            job.setOutputFormatClass(FuncFileOutputFormat.class);

            FileInputFormat.setInputPaths(job, new Path("C:/in"));
            FileOutputFormat.setOutputPath(job, new Path("c:/out"));

            job.waitForCompletion(true);
        }
    }