1. 程式人生 > 其它 >MapReduce原理深入理解3----WordCount程式流程圖解、combiner(合併)程式示例、InputSplit切片詳解

MapReduce原理深入理解3----WordCount程式流程圖解、combiner(合併)程式示例、InputSplit切片詳解

MapReduce原理深入理解3----WordCount程式流程圖解、combiner(合併)程式示例、3、InputSplit切片詳解

1、WordCount示例程式處理流程圖解
2、combiner(合併)程式示例

combiner,發生在map階段,又叫做預聚合;

相當於map端的Reduce,因為combiner的邏輯程式碼和Reduce端的邏輯程式碼一樣

求max、min、sum都可以使用預聚合,avg不能使用預聚合

檔案words.txt

hadoop hive hbase spark flink
hadoop hive hbase spark flink
hadoop hive hbase spark flink
hadoop hive hbase spark flink
hadoop hive hbase spark flink
java scala python
java scala python
java scala python
java scala python
java scala python
(1)在Map階段,新增一個自定義Combiner階段
(2)在Driver階段中,配置map和配置Reduce之間,新增一個Combiner配置
    
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

// 統計每個單詞出現的次數
public class Demo6WordCountCombiner {
    // Map階段
    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        /**
         * @param key     Map端輸入的key->偏移量
         * @param value   Map端輸入的value->一行資料
         * @param context MapReduce整個過程的上下文環境->可以獲取MapReduce程式執行時的一些引數、狀態,可以將Map的輸出傳送到Reduce
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 實現自己的map端邏輯
            String vStr = value.toString();
            // 按照空格進行切分,將每個單詞切分出來
            String[] words = vStr.split(" ");

            // 遍歷每一個單詞,構造成k-v格式
            /**
             * hadoop hive hbase spark flink
             * ====>
             * hadoop 1
             * hive 1
             * hbase 1
             * spark 1
             * flink 1
             */
            for (String word : words) {
                Text keyOut = new Text(word);
                IntWritable valueOut = new IntWritable(1);
                // 通過context將構建好的k-v傳送出去
                context.write(keyOut, valueOut);
            }

        }
    }

    // 自定義的Combiner
    //繼承Reducer,實現的邏輯和Reduce階段的邏輯一樣
    public static class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 實現自己的Reduce邏輯
            int sum = 0; // 儲存每個單詞的數量
            for (IntWritable value : values) {
                // 遍歷values迭代器
                sum += value.get();
            }

            // 將Reduce統計得到的結果輸出到HDFS
            context.write(key, new IntWritable(sum));
        }
    }

    // Reduce階段
    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        /**
         * @param key     Map端輸出的資料按照key進行分組過後的資料中的key,在這裡相當於每個單詞
         * @param values  Map端輸出的資料按照key進行分組過後,相同key的所有的value組成的集合(迭代器)
         * @param context MapReduce的上下文環境,主要用於輸出資料到HDFS
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 實現自己的Reduce邏輯
            int sum = 0; // 儲存每個單詞的數量
            for (IntWritable value : values) {
                // 遍歷values迭代器
                sum += value.get();
            }

            // 將Reduce統計得到的結果輸出到HDFS
            context.write(key, new IntWritable(sum));


        }
    }

    // Driver端(將Map、Reduce進行組裝)
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // 建立配置檔案
        Configuration conf = new Configuration();

        // 建立一個Job例項
        Job job = Job.getInstance(conf);
        // 對Job進行一些簡單的配置
        job.setJobName("Demo6WordCountCombiner");
        // 通過class類設定執行Job時該執行哪一個類
        job.setJarByClass(Demo6WordCountCombiner.class);

        // 對Map端進行配置
        // 對Map端輸出的Key的型別進行配置
        job.setMapOutputKeyClass(Text.class);
        // 對Map端輸出的Value的型別進行配置
        job.setMapOutputValueClass(IntWritable.class);
        // 配置Map任務該執行哪一個類
        job.setMapperClass(MyMapper.class);

        // 設定Combiner
        job.setCombinerClass(MyCombiner.class);

        // 對Reduce端進行配置
        // 對Reduce端輸出的Key的型別進行配置
        job.setOutputKeyClass(Text.class);
        // 對Reduce端輸出的Value的型別進行配置
        job.setOutputValueClass(IntWritable.class);
        // 配置Reduce任務該執行哪一個類
        job.setReducerClass(MyReducer.class);

        // 配置輸入輸出路徑
        FileInputFormat.addInputPath(job, new Path("/wordCount/input"));
        // 輸出路徑不需要提前建立,如果該目錄已存在則會報錯
        // 通過HDFS的JavaAPI判斷輸出路徑是否存在
        Path outPath = new Path("/wordCount/output");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }

        FileOutputFormat.setOutputPath(job, outPath);

        // 等待job執行完成
        job.waitForCompletion(true);

        /**
         * 1、準備資料,將words.txt上傳至HDFS的/wordCount/input目錄下面
         * hdfs dfs -mkdir -p /wordCount/input
         * hdfs dfs -put words.txt /wordCount/input
         * 2、提交MapReduce任務
         * hadoop jar Hadoop-1.0.jar com.shujia.MapReduce.Demo6WordCountCombiner
         */

    }

}
3、InputSplit切片詳解
(1)在執行mapreduce之前,原始資料被分割成若干split,每個split作為一個map任務的輸入。
(2)當Hadoop處理很多小檔案(檔案大小小於hdfs block大小)的時候,由於FileInputFormat不會對小檔案進行劃分,所以每一個小檔案都會被當做一個split並分配一個map任務,會有大量的map task執行,導致效率底下;
(3)例如:一個1G的檔案,會被劃分成8個128MB的split,並分配8個map任務處理,而10000個100kb的檔案會被10000個map任務處理
(4)Map任務的數量
	一個InputSplit對應一個Map task
	InputSplit的大小是由Math.max(minSize, Math.min(maxSize,blockSize))決定
	單節點建議執行10—100個map task
	map task執行時長不建議低於1分鐘,否則效率低
特殊:
	block=128M,MR切片的溢位率為1.1,當溢位率<1.1,就不會被切片
	(具體演算法在FileInputFormat類中的getSplits)
	
舉例1:一個260M的檔案會被切成兩個切片:128M,132M,產生兩個map任務
		因為132/128<1.1,132不會被切片
		
舉例2:一個輸入檔案大小為140M,會有1個map task
		因為140/128<1.1,140不會被切片
4、RecordReader
(1)每一個InputSplit都有一個RecordReader,作用是把InputSplit中的資料解析成Record,即<k1,v1>。
(2)在TextInputFormat中的RecordReader是LineRecordReader,每一行解析成一個<k1,v1>。其中,k1表示偏移量,v1表示行文字內容