1. 程式人生 > 實用技巧 >Hadoop 學習筆記(十一)MapReduce 簡介

Hadoop 學習筆記(十一)MapReduce 簡介

MapReduce定義

Map Reduce 是一個分散式運算程式的程式設計框架,是使用者開發“基於 Hadoop 的資料分析” 應用的核心框架 ,Map Reduce 的核心功能是將使用者編寫的業務邏輯程式碼和自帶的預設元件,整合成完整的分散式應用程式,併發執行在 Hadoop 叢集上;

MapReduce 特點

1、Map Reduce 易於程式設計 它簡單實現了一些介面,就可以完成一個分散式程式,這個分散式程式可以分發到大量的廉價的PC機器上進行執行,也就是說:這裡寫一個分散式程式和寫一個普通的程式是一樣的,因此:使 Map Reduce 程式設計很是流行;
2、適合 PB 級以上海量資料的離線處理;可以實現成千臺伺服器叢集併發工作;提供資料處理能力;

3、不擅長實時資料計算:Map Reduce 無法像 Mysql 一樣在毫秒或秒級內返回結果;

4、不擅長流式計算;流式的資料計算的輸入是動態的,而 Map Reduce 的輸入是靜態的,不能動態變化,這是由於 Map Reduce 的設計特點決定了其輸入資料是靜態資料;
5、不擅長(DAG) 右向圖計算 多個應用程式存在依賴關係,後一個應用程式的輸入為前一個應用程式的輸出,在這種情況下,Map Redeuce 並不是不能做,而是使用後,每個 Map Reduce 的輸出結果都會寫入磁碟,導致大量的磁碟IO,從而使效能下降。

MapReduce 核心思想

1)分散式的運算程式往往需要分成至少2個階段。

2)第一個階段的MapTask併發例項,完全並行執行,互不相干。

3)第二個階段的ReduceTask併發例項互不相干,但是他們的資料依賴於上一個階段的所有MapTask併發例項的輸出。

4)MapReduce程式設計模型只能包含一個Map階段和一個Reduce階段,如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式,序列執行。

總結:分析WordCount資料流走向深入理解MapReduce核心思想。

MapReduce 程序:

  • MrAppMaster:負責整個程式的過程排程和狀態協調;
  • MapTask 負責整個 Map 階段的資料處理流程;
  • ReduceTask 負責整個 Reduce 階段資料處理流程

MapReduce 官方 pi 程式演示

[hui@hadoop103 data]$ cd /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/
[hui@hadoop103 mapreduce]$ pwd
/opt/module/hadoop-2.7.2/share/hadoop/mapreduce
[hui@hadoop103 mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.7.2.jar pi 5 5 

MapReduce 官方 wordcount 程式演示

資料準備:

[hui@hadoop103 wcinput]$ cat wc.input 
tianyi huichao lihua
zhangchen xiaoheng 
xinbo xinbo
gaoyang gaoyang yanjing yanjing 
[hui@hadoop103 wcinput]$ pwd
/opt/module/hadoop-2.7.2/wcinput
[hui@hadoop103 mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.7.2.jar wordcount /wcinput/ /wcoutput/

結果就是一個統計單詞的頻次的資料結果;

Windows 系統 wordcount 程式

package org.wdh01.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Map 階段處理
 * 
 * @author hui
 *
 */
//KEYIN LongWritable 輸入資料 Key
//VALUEIN 輸入資料的 Value
//KEYOUT 輸出資料型別
//VALUEOUT 輸出資料Value 型別
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // hui hu1
        // 1、|獲取一行
        String line = value.toString();
        // 2、切割單詞
        String[] words = line.split(" ");
        // 3、迴圈輸出
        for (String word : words) {

            k.set(word);
            context.write(k, v);
        }
    }
}
View Code
package org.wdh01.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//KEYIN, VALUEIN map 階段的輸出

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        // 1、累加求和
        for (IntWritable value : values) {
            sum += value.get();
        }

        v.set(sum);
        // 2、寫出
        context.write(key, v);
    }
}
View Code
package org.wdh01.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // 1、獲取 job 物件
        Job job = Job.getInstance(conf);
        // 2、設定 jar 存放位置通過 反射查詢
        job.setJarByClass(WordCountDriver.class);
        // 3、關聯 MR
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReduce.class);
        // 4、設定 Map 階段輸出資料的 KV 型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5、設定 最終資料的 KV 型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6、設定程式的輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7、提交 job
        Boolean result = job.waitForCompletion(true);
    }
}
View Code

MapReduce 編碼貴伐

使用者編寫程式分為 3 個部分:Map 、Reduce 和 Driver
1、Mapper 階段
使用者編寫的 Mapper 要繼承自己的父類;
Mapper 階段的輸入型別時 KV 形式;
Mapper 的業務邏輯寫在 map() 方法中;
Mapper 的輸出使是 KV 形式;
map() 方法(MapTask 程序) 對每一個 KV 呼叫一次
2、Reduce 階段
使用者自己編寫的Reduce 程式要繼承自己的父類;
Reduce 程式的輸入對應的是 Map 程式的輸出,也是 KV 形式;
Reduec 的業務邏輯寫在 reduce() 方法中;
Reduce 對每個相同的 K 的 KV 資料只調用一次;
3、Driver 階段
相當於 YARN 叢集的客戶端,用於提交整個程式到 YARN 叢集,提交的是 封裝了 MR 帶有引數的 JOB 物件;