1. 程式人生 > >MapReduce設定輸出檔案到多個資料夾下

MapReduce設定輸出檔案到多個資料夾下

一:自定義OutputFormat類

MapReduce預設的OutPutFormat會將結果輸出檔案放置到一個我們指定的目錄下,但如果想把輸出檔案根據某個條件,把滿足不同條件的內容分別輸出到不同的目錄下,就需要自定義實現OutputFormat類,且重寫RecordWriter方法。

在驅動類中設定job.setOutputFormatClass方法為自定義實現的OutputFormat類

下面案例以一組購物文字資料,將其中的好評和差評分別輸出到對應的好評資料夾下、差評資料夾下。

二:自定義實現OutputFormat類程式碼實現


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 自定義實現OutputFormat類
 */
public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
       //從這個方法裡面可以獲取一個configuration
        Configuration configuration = context.getConfiguration();
        //獲取檔案系統的物件
        FileSystem fileSystem = FileSystem.get(configuration);
        //好評檔案的輸出路徑
        Path goodComment = new Path("file:///F:\\goodComment\\1.txt");

        //差評檔案的輸出路徑
        Path badComment = new Path("file:///F:\\badComment\\1.txt");

        //獲取到了兩個輸出流
        FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
        FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);

        MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);

        return myRecordWriter;
    }
}

三:自定義實現RecordWriter類


import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter<Text,NullWritable> {
    private FSDataOutputStream goodStream;
    private FSDataOutputStream badSteam;

    public MyRecordWriter(){

    }

    public  MyRecordWriter(FSDataOutputStream goodStream,FSDataOutputStream badSteam){
        this.goodStream = goodStream;
        this.badSteam= badSteam;

    }

    /**
     * 重寫write方法
     * 這個write方法就是往外寫出去資料,我們可以根據這個key,來判斷檔案究竟往哪個目錄下面寫
     * goodStream:指定輸出檔案
     * badSteam:自定輸出檔案
     * @param key:k3
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String[] split = key.toString().split("\t");
        //獲取評論狀態  0  好評  1   中評  2 差評
     //   split[9]
        //判斷評評論狀態,如果是小於等於1,都寫到好評檔案裡面去
        if(Integer.parseInt(split[9])<=1){
            //好評
            goodStream.write(key.getBytes());
            goodStream.write("\r\n".getBytes());
        }else{
            //差評
            badSteam.write(key.getBytes());
            badSteam.write("\r\n".getBytes());
        }
    }

    /**
     * 關閉資源
     * @param context:上下文物件
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(badSteam);
        IOUtils.closeStream(goodStream);
    }
}

四:自定義Map類


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

五:驅動程式


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyOutputMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "ownOutputFormat");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));


        job.setMapperClass(MyOutputMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        job.setOutputFormatClass(MyOutputFormat.class);
        //由於重寫了FileOutputFormat,所以下面這個指定的目錄內不會有輸出檔案
        //輸出檔案在MyOutputFormat中重新指定
        MyOutputFormat.setOutputPath(job ,new Path("file:///F:\\output"));

        boolean b = job.waitForCompletion(true);

        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args);
        System.exit(run);
    }

}