OutputFormat---自定義輸出方式
阿新 • • 發佈:2020-10-11
簡介
可以自定義輸出的格式和檔案,例如包含某欄位的輸出到一個指定檔案,不包含某欄位的輸出到另一個檔案。
案例
資料
www.nevesettle.com
www.baidu.com
www.qq.com
www.mi.com
www.jd.com
www.std.com
Mapper
package com.neve.outputformat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }
Reducer
package com.neve.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { for (NullWritable value : values) { context.write(key,value); } } }
Driver
package com.neve.outputformat; import com.neve.phone.FlowBean; import com.neve.phone.FlowMapper; import com.neve.phone.FlowReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.建立配置 Configuration configuration = new Configuration(); //2.建立job Job job = Job.getInstance(configuration); //3.關聯驅動類 job.setJarByClass(LogDriver.class); //4.關聯mapper和reducer類 job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); //5.設定mapper的輸出值和value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //6.設定最終的輸出值和value job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //7.設定輸入輸出路徑 FileInputFormat.setInputPaths(job,new Path("F:\\Workplace\\IDEA_Workplace\\hadoopStudy2\\outputformatinput")); FileOutputFormat.setOutputPath(job,new Path("F:\\Workplace\\IDEA_Workplace\\hadoopStudy2\\outputformatoutput")); //設定自定義的format類 job.setOutputFormatClass(LogOutputFormat.class); //8.提交job job.waitForCompletion(true); } }
LogOutputFormat
package com.neve.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
LogRecordWriter logw = new LogRecordWriter(job);
return logw;
}
}
LogRecordWriter
package com.neve.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
//定義輸出路徑
private String nelog = "F:\\nelog.log";
private String otherlog = "F:\\otherlog.log";
private FileSystem fs ;
private FSDataOutputStream neos;
private FSDataOutputStream otheros;
public LogRecordWriter(TaskAttemptContext job) throws IOException {
//獲取檔案系統物件
fs = FileSystem.get(job.getConfiguration());
neos = fs.create(new Path(nelog));
otheros = fs.create(new Path(otherlog));
}
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String string = key.toString();
if (string.contains("neve")){
neos.writeBytes(string + "\r");
}else {
otheros.writeBytes(string + "\r");
}
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(neos);
IOUtils.closeStream(otheros);
}
}