1. 程式人生 > >手動實現一個單詞統計MapReduce程序與過程原理分析

手動實現一個單詞統計MapReduce程序與過程原理分析

Hadoop MapReduce Java

[toc]


手動實現一個單詞統計MapReduce程序與過程原理分析

前言

我們知道,在搭建好hadoop環境後,可以運行wordcount程序來體驗一下hadoop的功能,該程序在hadoop目錄下的share/hadoop/mapreduce目錄中,通過下面的命令:

yarn jar $HADOOP_HOME/share/hadoop/mapreducehadoop-mapreduce-examples-2.6.4.jar wordcount inputPath outPath

即可對輸入文件執行單詞統計的計算。

那麽下面就通過手動寫一個wordcount的例子來加深對MapReduce的基本理解。

案例場景

假如有下面一個文本文件需要進行單詞統計:

$ cat hello
hello you
hello he
hello me

Note:該hello文件為李老師的經典文本文件。

下面就來演示MapReduce程序如何來對該文本文件進行計算,最後再依據此寫一個wordcount程序。

MapReduce計算分析

我們先來簡單分析一下MapReduce是如何處理上面的文本文件,然後才寫一個程序。

對於上面的一個文本文件,MapReduce程序分三個步驟進行處理:Map階段、Shuffle階段和Reduce階段。(三個階段的分析在代碼的註釋中也是非常詳細的解釋)

Map階段

上面的文本文件經過Map處理後會得到類似下面的結果:

<hello, 1>
<heelo, 1>
<hello, 1>
<you, 1>
<he, 1>
<me, 1>

shuffle階段

對Map階段的結果進行處理,會得到如下的結果:

<hello, [1, 1, 1]>
<you, [1]>
<he, [1]>
<me, [1]>

Reduce階段

經過reducer處理之後,結果如下:

<hello, 3>
<you, 1>
<he, 1>
<me, 1>

關於上面的過程分析,可以參考下面的幾張圖示以幫助理解:

圖示1:
技術分享圖片

圖示2:
技術分享圖片

圖示3:
技術分享圖片

程序思路分析

 * 整個的解題思路,使用map函數進行單詞的拆分,使用reduce函數進行匯總,中間進行shuffle
 * 要想讓我們的map函數和reduce函數進行接替運行,需要一個驅動程序
 * 代碼的思路:
 * 1、編寫一個類繼承Mapper,成為自定義的Mapper,主要業務邏輯就是復寫其中的map函數
 *  map
 *  首先要確定清楚Mapper類或者map函數的數據類型/類型參數--->泛型
 *  Mapper<K1, V1, K2, V2>
 * 2、編寫一個類繼承Reducer,成為自定義的Reducer,主要業務邏輯就是復寫其中的reduce函數
 *  reduce
 *  首先要確定清楚Reducer類或者reduce函數它的數據類型/類型參數--->泛型
 *  Reducer<K2, V2s, K3, V3>
 *
 * 需要我們用戶自定義的類型就是K2, V2, K3, V3
 * K1和V1一般情況下是固定的,只要數據格式確定,其類型就確定
 * 比如我們操作的是普通的文本文件,那麽K1=LongWritable,V1=Text
 * K1--->代表的是這一行記錄在整個文本中的偏移量,V1就是這一行文本的內容
 * (也就是說,K1和V1取決於我們要處理的是什麽文件)
 * 註意:與Hadoop的程序需要使用Hadoop提供的數據類型,而不能使用java中提供的數據類型

wordcount程序

程序代碼中有非常詳細的註釋,可以參考來進行理解。

WordCount.java

 package com.uplooking.bigdata.mr.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 統計hdfs://uplooking01:9000/input/mr/hello的單詞出現次數
 *
 * 整個的解題思路,使用map函數進行單詞的拆分,使用reduce函數進行匯總,中間進行shuffle
 * 要想讓我們的map函數和reduce函數進行接替運行,需要一個驅動程序
 * 代碼的思路:
 * 1、編寫一個類繼承Mapper,成為自定義的Mapper,主要業務邏輯就是復寫其中的map函數
 *  map
 *  首先要確定清楚Mapper類或者map函數的數據類型/類型參數--->泛型
 *  Mapper<K1, V1, K2, V2>
 * 2、編寫一個類繼承Reducer,成為自定義的Reducer,主要業務邏輯就是復寫其中的reduce函數
 *  reduce
 *  首先要確定清楚Reducer類或者reduce函數它的數據類型/類型參數--->泛型
 *  Reducer<K2, V2s, K3, V3>
 *
 * 需要我們用戶自定義的類型就是K2, V2, K3, V3
 * K1和V1一般情況下是固定的,只要數據格式確定,其類型就確定
 * 比如我們操作的是普通的文本文件,那麽K1=LongWritable,V1=Text
 * K1--->代表的是這一行記錄在整個文本中的偏移量,V1就是這一行文本的內容
 * (也就是說,K1和V1取決於我們要處理的是什麽文件)
 * 註意:與Hadoop的程序需要使用Hadoop提供的數據類型,而不能使用java中提供的數據類型
 */
public class WordCount {

    /**
     * 這裏的main函數就是用來組織map函數和reduce函數的
     * 最終mr的運行會轉變成一個個的Job
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        // 構建Job所需的配置文件和jobName
        Configuration configuration = new Configuration();
        String jobName = "wordcount";
        // 1.創建一個job
        Job job = Job.getInstance(configuration, jobName);

        // 添加mr要運行的主函數所在的類,就是WordCount這個類
        job.setJarByClass(WordCount.class);

        // 2.設置mr的輸入參數
        // 設置計算的文件
        Path inputPath = new Path("hdfs://uplooking01:9000/input/mr/hello");
        FileInputFormat.setInputPaths(job, inputPath);
        // 指定解析數據源的Format類,即將輸入的數據解析為<K1, V1>的形式,然後再交由mapper函數處理
        job.setInputFormatClass(TextInputFormat.class);
        // 指定使用哪個mapper來進行計算
        job.setMapperClass(WordCountMapper.class);
        // 指定mapper結果的key的數據類型(即K2的數據類型),註意要與我們寫的Mapper中定義的一致
        job.setMapOutputKeyClass(Text.class);
        // 指定mapper結果的value的數據類型(即V2的數據類型),註意要與我們寫的Mapper中定義的一致
        job.setMapOutputValueClass(IntWritable.class);

        // 3.設置mr的輸出參數
        // 設置輸出的目錄
        Path outputPath = new Path("hdfs://uplooking01:9000/output/mr/wc");
        // 如果outputPath目錄存在,會拋出目錄存在異常,這裏先刪除,保證該目錄不存在
        outputPath.getFileSystem(configuration).delete(outputPath, true);
        FileOutputFormat.setOutputPath(job, outputPath);
        // 指定格式化數據結果的Format類
        job.setOutputFormatClass(TextOutputFormat.class);
        // 指定使用哪個reducer來進行匯總
        job.setReducerClass(WordCountReducer.class);
        // 指定reduce結果的key的數據類型(即K3的數據類型),註意要與我們寫的Reducer中定義的一致
        job.setOutputKeyClass(Text.class);
        // 指定reduce結果的value的數據類型(即V3的數據類型),註意要與我們寫的Reducer中定義的一致
        job.setOutputValueClass(IntWritable.class);

        // 設置有幾個reducer來執行mr程序,默認為1個
        job.setNumReduceTasks(1);
        // 提交mapreduce job
        job.waitForCompletion(true);
    }
}

WordCountMapper.java

 package com.uplooking.bigdata.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 1、編寫一個類繼承Mapper,成為自定義的Mapper,主要業務邏輯就是復寫其中的map函數
 *  map
 *  首先要確定清楚Mapper類或者map函數的數據類型/類型參數--->泛型
 *  Mapper<K1, V1, K2, V2>
 *  K1:行的偏移量,如第998行
 *  V1:行的內容,如 hello you
 *  K2:輸出的數據的key值,如hello
 *  V2:輸出的數據的value值,如1
 *  註意,為了減少在網絡中傳輸的數據,map之後得到的結果還需要進行shuffle,將相同key的value匯總起來:
 *  如:
 *  map後的結果有:<hello, 1>, <hello, 1>, <hello, 1>, <you, 1>, <he, 1>, <me, 1>
 *  shuffle後的結果為:<hello, [1, 1, 1]>, <you, [1]>, <he, [1]>, <me, [1]>
 *  這樣相比原來map的結果,數據的量就少了許多
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        // 先將每一行轉換為java的String類型
        String line = v1.toString();
        // 將行中的單詞以空格作為分隔符分離出來得到一個字符串數組
        String[] words = line.split(" ");
        // 定義輸出數據的變量k2和v2,類型分別為Text和IntWritable
        Text k2 = null;
        IntWritable v2 = null;
        // 統計單詞並寫入到上下文變量context中
        for (String word : words) {
            k2 = new Text(word);
            v2 = new IntWritable(1);
            context.write(k2, v2);
        }
    }
}

WordCountReducer.java

 package com.uplooking.bigdata.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 2、編寫一個類繼承Reducer,成為自定義的Reducer,主要業務邏輯就是復寫其中的reduce函數
 *  reduce
 *  首先要確定清楚Reducer類或者reduce函數它的數據類型/類型參數--->泛型
 *  Reducer<K2, V2s, K3, V3>
 *  K2:map輸出中的key值
 *  V2s:map輸出中根據本周key值shuffle後得到的可叠代列表
 *  如:<hello, [1, 1, 1]>, <you, [1]>, <he, [1]>, <me, [1]>
 *  K3:reduce輸出中的key值
 *  V3:reduce輸出中的value值
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context) throws IOException, InterruptedException {
        // 定義某個key值k2出現次數的變量
        int sum = 0;
        // 統計k2孤個數
        for (IntWritable item : v2s) {
            sum += item.get();
        }
        // 構建reduce輸出的k3和v3,類型分別為Text和IntWritable
        Text k3 = k2;
        IntWritable v3 = new IntWritable(sum);
        // 結果reduce結果寫入到上下文變量context中
        context.write(k2, v3);
    }
}

測試

將上面的程序打包成jar包,然後上傳到我們的hadoop服務器上,執行下面的命令:

 yarn jar wordcount.jar com.uplooking.bigdata.mr.wc.WordCount

這樣就可以使用在hadoop中使用我們自己寫的wodcount程序來進行MapReduce的計算。

任務執行結束後,通過下面的命令查看結果:

 $ hdfs dfs -cat /output/mr/wc/part-r-00000
18/03/03 13:59:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
he  1
hello   3
me  1
you 1

這樣就完成了從編寫MR程序到測試的完整過程。

手動實現一個單詞統計MapReduce程序與過程原理分析