1. 程式人生 > >MapReuce之輸入類InputFormat

MapReuce之輸入類InputFormat

使用hadoop jar執行mapreduce任務時首先從hdfs中讀取資料將這些資料解析為inputsplit,然後再將inputsplit中的內容解析為一個一個的<k,v>鍵值對,這個過程就是有InputFormat的子類完成的。之前在MR例子中有一段程式碼job.setInputFormatClass(TextInputFormat.class);就是指定TextInputFormat來完成這項工作,這個類是hadoop預設的其實可以不寫。

         InputFormat是一個抽象類,類中有兩個抽象方法List<InputSplit> getSplits和RecordReader<K,V>createRecordReader,getSplit負責將hdfs資料解析為InputSplit,createRecordReader負責將每個InputSplit中的每一行解析為<k,v>鍵值對。

TextInputFormat

getSplits

         FileInputFormat繼承了InputFormat並實現了getSplits方法。

主要完成的功能是:

根據路徑解析hdfs資料,判斷檔案是否可以被切分。

計算splitSize,預設等於blockSize,128M

獲取每一個hdfs物件並進行遍歷並將結果放入List<InputSplit>中返回。

       Hadoop中一個block對應一個inputsplit,一個inputsplit對應一個map任務。

注意:

hadoop不會對小於128M的檔案進行切分,例如一個檔案1G那就是8個map任務,如果有1000個100kb的檔案則對應1000個map任務,這樣會造成效率下降。所以MapReduce不適合處理小檔案。

如果inputsplit和blocksize不一樣比如大於,那麼在解析為inputsplit時一個block就不夠用,此時框架就會去別的節點上讀取資料來構造inputsplit,這樣會產生網路消耗影響效率。

 

createRecordReader

         TextInputFormat繼承了FileInputFormat並實現了createRecordReader方法。此方法的返回值是抽象類RecordReader,而最終返回的是LineRecordReader,LineRecordReader實現了RecordReader並在實現的抽象方法中完成解析。

         主要完成的功能是:

                在initialize方法中獲取FileSplit物件並讀取每一行內容。

                獲取<k,v>鍵值對作為map任務的入參再呼叫map任務。

         框架每獲取一個<k,v>就會呼叫一次map任務。

 

至此我們可以通過下圖大概瞭解一下這幾個類的關係


NlineInputFormat

Hadoop中預設是一個block一個inputsplit,但是在程式碼中可以指定其他的inputFormat子類,NLineInputFormat可以設定指定檔案中多少行為一個inputsplit(一個map處理),下面的程式碼指定每3行一個inputsplit(一個map處理)。

org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;

job.setInputFormatClass(NLineInputFormat.class);

NLineInputFormat.setNumLinesPerSplit(job,3);

 

KeyValueTextInputFormat

         記錄中有製表符(tab),以第一個製表符為分隔符,前面的作為key後面的作為value,若無製表符則全部為key,value為空。

job.setInputFormatClass(KeyValueTextInputFormat.class);

         同時也可以指定其他的字串為分隔符

         conf.setStrings(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");


CombineFileInputFormat

         以之前提到的wordcount例項中需要統計單詞出現的次數輸入類使用的是TextInputFormat,但是如果我有許多的小檔案那麼在執行mapreduce時split的數量就會很多。

如下圖,hdfs上有4個檔案對應的split數量也為4,map任務也為4

CombineFileInputFormat這個輸入類可以合併小檔案,下面來看一個例子。

package mapreduce;
 
import java.io.IOException;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
//import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
/**
 * hdfs上的hello中的內容為
tiger pig
pig cat dog
dog bird cat
tiger house
bus bike bus car
 * @author think
 *
 */
public class WordCount {
 
    public static void main(String[] args) throws Exception {
        String inPath = args[0];
        Path outPath = new Path(args[1]);
        
        //1:hdfs configuration,get SystemFile Object
        Configuration conf = new Configuration();
        URI uri = new URI("/");// URI uri = new URI("hdfs://192.168.79.128:9000/");
        FileSystem fileSystem = FileSystem.get(uri, conf);
 
        if (fileSystem.exists(outPath)) {
            fileSystem.delete(outPath, true);
        }
 
        // 2:job object
        String jobName = WordCount.class.getName();
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(WordCount.class);
 
        // 3:輸入路徑
        FileInputFormat.setInputPaths(job, inPath);
 
        // 4:指定inputFormat的子類,可選,預設是TextInputFormat
        //job.setInputFormatClass(TextInputFormat.class);
        job.setInputFormatClass(CombineSmallFileInputFormat.class);
        
        // 5:指定mapper類,指定mapper的輸出<k2,v2>型別
        job.setMapperClass(MapTask.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
 
        // 6:指定reduce類,指定reduce的輸出<k3,v3>型別
        job.setReducerClass(ReduceTask.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
 
        // 7:指定輸出路徑
        FileOutputFormat.setOutputPath(job, outPath);
 
        // 8:指定outputformat子類
        job.setOutputFormatClass(TextOutputFormat.class);
 
        // 9:提交yarn執行
        job.waitForCompletion(true);
    }
    
    /**
     * Map 任務
     * @author think
     * LongWritable, Text, Text, LongWritable這4個引數依次代表map任務的輸入鍵值對<k1,v1>和輸出鍵值對<k2,v2>
     */
    public static class MapTask extends Mapper<LongWritable, Text, Text, LongWritable>
    {
        Logger logger = LoggerFactory.getLogger(WordCount.class);
        
        Text k2 = new Text();
 
        LongWritable v2 = new LongWritable();
        
        /**
         * 重寫map方法
         * context是一個mapper的內部類
         */
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //1:key為內容的位元組序數,value為內容
            String content = value.toString();
            System.out.println("內容:" + key.get() + " ," + content);
            logger.info("內容:" + key.get() + " ," + content);
            
            String[] arrs = content.split(",");
            for(String word : arrs)
            {
                k2.set(word);
                v2.set(1);
                context.write(k2, v2);
                logger.info("map:" + k2.toString() + "," + v2);
            }
        }
    }
    
    /**
     * Reduce 任務 
     * @author think
     * Text, LongWritable, Text, LongWritable這4個引數依次代表reduce任務的輸入鍵值對<k2,v2s>和輸出鍵值對<k3,v3>
     */
    public static class ReduceTask extends Reducer<Text, LongWritable, Text, LongWritable>
    {
        LongWritable v3 = new LongWritable();
        
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                Reducer<Text, LongWritable, Text, LongWritable>.Context content)
                throws IOException, InterruptedException {
            System.out.println("k2:" + k2.toString());
            long sum = 0;
            for(LongWritable v2 : v2s)
            {
                System.out.println("v2:" + v2);
                sum += v2.get();
            }
            v3.set(sum);
            content.write(k2, v3);
            System.out.println("k3,v3:" + k2.toString() + "," + v3);
        }
    }
    
    /**
     * 自定義處理小檔案的mapreduce輸入類
     * @author think
     *
     */
    public static class CombineSmallFileInputFormat extends CombineFileInputFormat<LongWritable, Text>{
 
        /**
         * createRecordReader建立一個讀取器,實現RecordReader方法
         * <LongWritable, Text>是map任務的輸入引數,和之前的一樣。入參感覺要隨map任務的業務而定 
         * 返回值是CombineFileRecordReader物件例項
         *         這個物件繼承了RecordReader
         *         生成例項需要三個引數
         *             第一個需要強轉成CombineFileSplit
         *             第二個是上下文
         *             第三個是我們自定義的一個類,這個類必須繼承RecordReader
         */
        @Override
        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context)
                throws IOException {
            return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSmallFileRecordReader.class);
        }
        
    }
    
    /**
     * 繼承RecordReader類的<k,v>和上面一樣都是<LongWritable, Text>
     * 實現RecordReader方法
     * @author think
     *
     */
    public static class CombineSmallFileRecordReader extends RecordReader<LongWritable, Text> {
 
        private LineRecordReader lrr;
        
        /**
         *     在解析多個小檔案時,每個小檔案都會呼叫上面的
         *     return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSmallFileRecordReader.class);
         * 所以建構函式的第三個引數index就是每個小檔案的序號,比如第一個,第二個......
         * 
         * 
         * @param split
         * @param context
         * @param index 檔案的序號
         * @throws IOException 
         * @throws Interrupted Exception 
         */
        public CombineSmallFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException
        {
            //1.通過反射機制例項化lrr
            this.lrr = ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());
            
            //2.為初始化方法構造引數
            /**
             * 引數fileSplit就是我們處理的眾多小檔案(word,word1..)所以是FileSplit,我們需要自行構造
             * 4個引數分別是路徑資訊(file),起始位置(偏移量start),長度(length),所在位置hosts,我們需要構建這4個引數
             * 4個引數均從split中獲取,index是檔案的序號
             */
            Path file = split.getPath(index);
            long start = split.getOffset(index);
            long length = split.getLength(index);
            String[] hosts = split.getLocations();
            InputSplit fileSplit = new FileSplit(file, start, length, hosts);
            
            //3.呼叫初始化方法
            this.lrr.initialize(fileSplit, context);
        }
        
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            
        }
 
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return lrr.nextKeyValue();
        }
 
        @Override
        public LongWritable getCurrentKey() throws IOException,
                InterruptedException {
            return lrr.getCurrentKey();
        }
 
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return lrr.getCurrentValue();
        }
 
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return lrr.getProgress();
        }
 
        @Override
        public void close() throws IOException {
            lrr.close();
        }
 
        
    }
}

下圖中顯示的是日誌,可以看到相比於輸入類使用TextInputFormat,使用CombineFileInputFormat的split和map任務數量都要少,之間也應該更快。CombineFileInputFormat合併了小檔案。


--------------------- 
作者:臭小優 
來源:CSDN 
原文:https://blog.csdn.net/ty4315/article/details/52809913 
版權宣告:本文為博主原創文章,轉載請附上博文連結!