MapReduce設定輸出檔案到多個資料夾下
阿新 • • 發佈:2018-11-28
一:自定義OutputFormat類
MapReduce預設的OutPutFormat會將結果輸出檔案放置到一個我們指定的目錄下,但如果想把輸出檔案根據某個條件,把滿足不同條件的內容分別輸出到不同的目錄下,就需要自定義實現OutputFormat類,且重寫RecordWriter方法。
在驅動類中設定job.setOutputFormatClass方法為自定義實現的OutputFormat類
下面案例以一組購物文字資料,將其中的好評和差評分別輸出到對應的好評資料夾下、差評資料夾下。
二:自定義實現OutputFormat類程式碼實現
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 自定義實現OutputFormat類 */ public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //從這個方法裡面可以獲取一個configuration Configuration configuration = context.getConfiguration(); //獲取檔案系統的物件 FileSystem fileSystem = FileSystem.get(configuration); //好評檔案的輸出路徑 Path goodComment = new Path("file:///F:\\goodComment\\1.txt"); //差評檔案的輸出路徑 Path badComment = new Path("file:///F:\\badComment\\1.txt"); //獲取到了兩個輸出流 FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment); FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment); MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1); return myRecordWriter; } }
三:自定義實現RecordWriter類
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class MyRecordWriter extends RecordWriter<Text,NullWritable> { private FSDataOutputStream goodStream; private FSDataOutputStream badSteam; public MyRecordWriter(){ } public MyRecordWriter(FSDataOutputStream goodStream,FSDataOutputStream badSteam){ this.goodStream = goodStream; this.badSteam= badSteam; } /** * 重寫write方法 * 這個write方法就是往外寫出去資料,我們可以根據這個key,來判斷檔案究竟往哪個目錄下面寫 * goodStream:指定輸出檔案 * badSteam:自定輸出檔案 * @param key:k3 * @param value * @throws IOException * @throws InterruptedException */ @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String[] split = key.toString().split("\t"); //獲取評論狀態 0 好評 1 中評 2 差評 // split[9] //判斷評評論狀態,如果是小於等於1,都寫到好評檔案裡面去 if(Integer.parseInt(split[9])<=1){ //好評 goodStream.write(key.getBytes()); goodStream.write("\r\n".getBytes()); }else{ //差評 badSteam.write(key.getBytes()); badSteam.write("\r\n".getBytes()); } } /** * 關閉資源 * @param context:上下文物件 * @throws IOException * @throws InterruptedException */ @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(badSteam); IOUtils.closeStream(goodStream); } }
四:自定義Map類
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MyOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }
五:驅動程式
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyOutputMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "ownOutputFormat");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));
job.setMapperClass(MyOutputMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(MyOutputFormat.class);
//由於重寫了FileOutputFormat,所以下面這個指定的目錄內不會有輸出檔案
//輸出檔案在MyOutputFormat中重新指定
MyOutputFormat.setOutputPath(job ,new Path("file:///F:\\output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args);
System.exit(run);
}
}