1. 程式人生 > 其它 >MapReduce 實現統計單數出現次數

MapReduce 實現統計單數出現次數

工程配置

  1. 在 windows 中配置 hadoop 及環境變數 HADOOP_
    下載 winutils.exe 放入 bin目錄中
    https://github.com/cdarlint/winutils
  2. 建立 maven 工程 引入依賴
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>2.14.1</version>
</dependency>
  1. 編寫 log4j 配置檔案
    參考:https://www.cnblogs.com/orginly/p/14847470.html

整體思路

仿照原始碼

Map 階段

  1. map()方法中把傳入的資料轉為 String 型別
  2. 根據空格切分出單詞
  3. 輸出<單詞,1>
package com.orginly.mapreduce.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 單詞記數
 * 繼承 Mapper 類
 * Mapper 類的泛型引數共4個 兩個key value
 * 第一對kv:map輸入引數型別 (LongWritable, Text 文字偏移量,一行文字內容)
 * 第二對kv:map輸出引數型別 (Text, IntWritable 單詞,1)
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    // 提升為成員變數避免每次執行 map 方法時都建立一次物件
    private final Text word = new Text();
    private final IntWritable intWritable = new IntWritable(1);

    /**
     * map 方法的輸入引數,一行文字就呼叫一次 map 方法
     *
     * @param key     文字偏移量
     * @param value   一行文字內容
     * @param context
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 1. 接收到文字內容,轉為String 型別
        String str = value.toString();
        // 2. 按照空格進行拆分單詞
        String[] words = str.split(" ");
        // 3. 輸出<單詞,1>
        for (String s : words) {
            word.set(s);
            context.write(word, intWritable);
        }

    }
}

Reduce 階段

  1. 總各個key(單詞)的個數,遍歷 value 資料進行累加
  2. 輸出 key 的總數
package com.orginly.mapreduce.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 繼承 Reducer 類
 * 有四個泛型,兩對 kv
 * 第一對kv要與 Mapper 輸出型別一致 (Text,IntWritable)
 * 第二對kv自己設計決定輸出結果資料是什麼型別
 */
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable total = new IntWritable();

    /**
     * 假設 map 方法 執行三次得到:hello,1 hello,1 hello,1
     * reduce 的 key => hello, values => <1,1,1>
     * <p>
     * 假設 map 方法得到 hello,1 hello,1 hello,1 hadoop,1 reduce,1 hadoop,1
     * reduce 方法何時呼叫:一組 key 相同的 kv 中 value 組成然後呼叫一次 reduce
     * 第一次:key => hello, values => <1,1,1>
     * 第一次:key => hadoop, values => <1,1>
     * 第三次:key => reduce, values => <1>
     *
     * @param key     方法輸出的key本案例中就是單詞
     * @param values  一組key相同的kv的value組成的集合
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 遍歷 key 對應的 values 進行累加
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        // 直接輸出當前 key 對應的 sum 值,結果就是單詞出現的總次數
        total.set(sum);
        context.write(key,total);
    }
}

Driver

  1. 獲取配置檔案物件,獲取 job 物件例項
  2. 指定程式 jar 的本地路徑
  3. 指定 Mapper / Reduce類
  4. 指定 Mapper 輸出的 kv 資料型別
  5. 指定 最終輸出的 kv 資料型別
  6. 指定 job 處理的原始資料路徑
  7. 指定 job 輸出結果路徑
  8. 提交作業
package com.orginly.mapreduce.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 封裝任務並提交執行
 */
public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 獲取配置檔案物件,獲取 job 物件例項
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "WordCountDriver");
        // 2. 指定程式 jar 的本地路徑
        job.setJarByClass(WordCountDriver.class);
        // 3. 指定 Mapper / Reduce類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReduce.class);
        // 4. 指定 Mapper 輸出的 kv 資料型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 5. 指定最終輸出的 kv 資料型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6. 指定 job 處理的原始資料路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 7. 指定 job 輸出結果路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 8. 提交作業
        boolean flag = job.waitForCompletion(true);// 等待完成 true為完成
        System.exit(flag ? 0 : 1);
    }

}

驗證程式

本地驗證

使用 IDEA 執行 Drive 中的 main() 方法
如果出現其他異常請檢視 https://www.cnblogs.com/orginly/p/15392871.html

  1. 先自動執行一次,此時會 args 下標異常
  2. 編輯執行配置新增引數
  3. 重新執行

Yarn 叢集驗證

  1. 把程式打成 jar 包,改名為 wordCount.jar 上傳到 Hadoop 叢集
# 上傳到伺服器
rz
# 重新命名
mv wordCount-1.0-SNAPSHOT.jar wordCount.jar
  1. 啟動 Hadoop 叢集(Hdfs,Yarn)
  2. 使用 Hadoop 命令提交任務執行

因為是叢集,原始檔不能存放在本地目錄,需要上傳至 HFDS 進行處理

hadoop jar wordCount.jar com.orginly.mapreduce.wc.WordCountDriver /mapReduce/wc.txt /wcoutput

執行成功


如果執行命令時出現版本過低提示,請安裝伺服器所使用的 jdk 版本進行 jar 的打包