1. 程式人生 > >Hadoop MapReduce輸入輸出類型

Hadoop MapReduce輸入輸出類型

imu finally configure 獲得 命名 pfile 計算 uil 大文件

一、輸入格式

  1、輸入分片split

      一個分片對應一個map任務;

      一個分片包含一個表(整個文件)上的若幹行,而一條記錄(單行)對應一行;

      分片包含一個以字節為單位的長度 和 一組存儲位置,分片不包含實際的數據;

      map處理時會用分片的大小來排序,優先處理最大的分片;

  hadoop中Java定義的分片為InputSplit抽象類:主要兩個方法,涉及分片長度,分片起始位置

    public abstract class InputSplit{
         public abstract long getLength() throws
IOException, InterruptedException; public abstract String[] getLocations() throws IOException, InterruptedException; }

  InputSplit不需要手動去處理它,它是由InputFormat生成;InputFormat負責產生輸入分片並將它們分割成記錄:

    public abstract class InputFormat<K, V> {
        public abstract List<InputSplit> getSplits( JobContext context) throws
IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }

  InputFormat抽象類定義的兩個方法:getSplits() 和 createRecordReader()

  運行作業的客戶端會調用getSplits()來計算分片,然後將它們發送到jobtracker,jobtracker會使用其存儲位置來調度map任務從而在tasktracker上來處理這個分片數據。在tasktracker上,map任務把輸入分片傳給InputFormat的getRecordReader()方法來獲得這個分片的RecordReader。RecordReader就是一個集合叠代器,map任務用一個RecordReader來生成記錄的鍵/值對,然後再傳遞給map函數。

  2、FileInputFormat類

    FileInputFormat類是所有指定數據源實現類的基類,它本身主要有兩個功能:a. 指定輸入文件位置;b. 輸入文件生成分片的實現代碼段,具體實現由子類完成;

    繼承圖:

    技術分享    

    設置輸入文件位置:

      FileInputFormat.addInputPath(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));

      或 FileInputFormat.setInputPaths(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));      

      可添加文件過濾器, FileInputFormat 中靜態方法:

        public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)

        文件添加時,默認就會有一個過濾器,過濾掉"." 和 "_"開頭的文件,會過濾掉隱藏文件;自定義的過濾器也是在默認過濾的基礎上過濾;

    切分的分片大小:

        一個split的大小計算:max( minimumSize, min( maximumSize, blockSize ));

        minimumSize默認為1,maximumSize默認為Long.MAX_VALUE;

        所以通常 blockSize 在 minimumSize和maximumSize之間,所以一般分片大小就是塊大小。

    設置不切分文件:

        兩種方法:

          a. 設置minimumSize的大小為Long.MAX_VALUE;

          b. 在實現FileInputFormat的子類時,重寫isSplitable()方法返回為false;

     

    在mapper中獲取文件分片信息:

        在mapper中可以獲取當前處理的分片的信息,可通過context.getInputSplit()方法來獲取一個split;當輸入的格式源於FileInputFormat時,該方法返回的InputSplit可以被強制轉換化一個FileSplit(繼承自InputSplit),可調用如下信息:

           a. getPath() Path/String 文件的路徑

           b. getStart() long

           c. getLength() long

    

     自定義一個輸入格式,把整個文件作為一條記錄: 

技術分享
// Example 7-2. An InputFormat for reading a whole file as a record
class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

//主要是實現RecordReader類
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // do nothing }
    }
}
View Code

       整個文件作為一條記錄的應用,把多個小文件合並為一個大文件:

技術分享
public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
    static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
        private Text filenameKey;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            filenameKey = new Text(path.toString());
        }

        @Override
        protected void map(NullWritable key, BytesWritable value, Context context)
                throws IOException, InterruptedException {
            context.write(filenameKey, value);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setMapperClass(SequenceFileMapper.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);
        System.exit(exitCode);
    }
}
View Code

     文本輸入:

        a. TextInputFormat 行首偏移量:行內容

        b. KeyValueTextInputFormat  以tab劃分一行的key value

        c. NLineInputFormat  讓每個map收到定義的相同行數,每個分片只包含N行

     二進制輸入:

        Hadoop的MapReduce不只是可以處理文本信息,還可以處理二進制格式,通過會用以下幾個類:

          SequenceFileInputFormat,處理SequenceFile 和 MapFile的文件類型;

          SequenceFileAsTextInputFormat 是 SequenceFileInputFormat的擴展,它將SequenceFile的鍵值轉換為Text對象,這個轉化是通過鍵和值上調用toString()方法實現。

          SequenceFileAsBinaryInputFormat 也是SequenceFileInputFormat的擴展,它將SequenceFile的鍵值作為二進制對象。它們被封裝為BytesWritable對象,因而可以任意解釋這些字節數組。

      多輸入MultipleInputs:

        它可為每條輸入路徑指定InputForamt 和 Mapper:       

MutipleInputs.addInputPath(job , ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class);
MutipleInputs.addInputPath(job ,metofficeInputPath, TextInputFormat.class, MetofficeMaxTemperatureMapper.class);

//MutipleInputs還有一個重載,當只用一個Mapper時
public static void addInputPath(Job job, Path path, class<? extends InputFormat> inputFormatClass);  

        它取代了FileInputFormat.addInputPath() 和 job.setMapperClass()的調用。

二、輸出格式

繼承圖:  

    技術分享

  文體輸出TextOutputFormat:

  默認的輸出是文本輸出TextOutputFormat,它把每條記錄寫為文本行,它調用toString()方法把key value轉化為字符串。

  與之對應的輸入為KeyValueTextInputFormat;

  二進制輸出:與輸入對應。

多輸出:

  默認一個reducer生成一個輸出文件,命名為part-r-00000,part-r-00001等等;

  有時需要對輸出的文件名進行控制 或 讓每個redeucer輸出多個文件,可利用 MultipleOutputFormat 類;

  範例:按氣象站來區分氣象數據,各個氣象站輸出到不同的文件中:

    方法一:可利用每個reducer創建一個輸出文件的特點,通過設置多個分區,來輸出到各個文件,這樣做有兩點不好:

          a. 分區個數必須預先就知道;可能有空reducer,可能有的獲取不到氣象站信息導致值丟失;

          b. 每個reducer處理一個氣象站,可能需要過多的reducer,也會有嚴重的數據傾斜問題;

    方法二:使用 MutipleOutputs 類:

技術分享
public class PartitionByStationUsingMultipleOutputs extends Configured implements Tool {
    static class StationMapper extends Mapper<LongWritable, Text, Text, Text> {
        private NcdcRecordParser parser = new NcdcRecordParser();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            parser.parse(value);
            context.write(new Text(parser.getStationId()), value);
        }
    }

    static class MultipleOutputsReducer extends Reducer<Text, Text, NullWritable, Text> {
        private MultipleOutputs<NullWritable, Text> multipleOutputs;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text value : values) {
                multipleOutputs.write(NullWritable.get(), value, key.toString());
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            multipleOutputs.close();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setMapperClass(StationMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setReducerClass(MultipleOutputsReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(), args);
        System.exit(exitCode);
    }
}
View Code

        輸出文件結果如下:          

          output/010010-99999-r-00027

          output/010050-99999-r-00013

          output/010100-99999-r-00015

          output/010280-99999-r-00014

  

Hadoop MapReduce輸入輸出類型