outputformat自定義--資料過濾
outputformat自定義--資料過濾
需求:過濾日誌檔案
把包含itstaredu的放在一個檔案
把不包含itstaredu的放在一個檔案
public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttempContext job)
throws IOException, InterruptException {
FileRecordWriter fileRecordWriter = new FileRecordWriter(job);
return fileRecordWriter;
}
}
public class FileRecordWriter extends RecordWriter<Text, NullWritable>{
FSDataOutputStream other = null;
FSDataOutputStream itstarlog = null;
Configuration conf = null;
//1、定義資料輸出路徑
public FileRecordWriter(TaskAttempContext job){
//需要配置資訊
conf = job.getConfiguration();
//獲取檔案系統
FileSystem fs = FileSystem.get(conf);
//定義輸出路徑
itstarlog = fs.create(new Path("C:/outitstaredu/itstar.logs"));
other = fs.create(new Path("C:/outputother/other.logs"));
}
//2、資料輸出
@Override
public void writer(Text key, NullWritable value) throws IOException, InterruptException{
//判斷的話根據key
if(key.toString().contains("itstar")){
//寫出道檔案
itstarlog.write(key.getBytes());
}else if {
other.write(key.getBytes());
}
}
//3、關閉資源
@Override
public void close(TaskAttempContext context) throws IOException, InterruptException{
if(null != itstarlog){
itstarlog.close();
}
if(null != other){
other.close();
}
}
}
public class FileMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
public void map(LongWritable key, Text value, Context context)
throw IOException, InterruptException{
//輸出
context.write(value, NullWritable.get());
}
}
public class FileReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
@Override
public void reduce(Text key, Interable<NullWritable> value, Context context)
throw IOException, InterruptException{
//輸出
String k = key.toString;
context.write(new Text(k), NullWritable.get());
}
}
public class FileDriver{
public static void main(String[] args) throws IOException, ClassNotFoundException,InterruptException{
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(FileDriver.class);
job.setMapperClass(FileMapper.class);
job.setReducerClass(FileReducer.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//設定自定義的outputformat
job.setOutputFormatClass(FuncFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("C:/in"));
FileOutputFormat.setOutputPath(job, new Path("c:/out"));
job.waitForCompletion(true);
}
}