1. 程式人生 > >大資料離線-MapReduce(上)--初識MapReduce

大資料離線-MapReduce(上)--初識MapReduce

本次主要介紹MapReduce,分為上篇,下篇兩個篇章

  • 上篇介紹MapReduce計算模型,MapReduce編碼規範及測試,程式執行模式。
  • 下篇介紹MapReduce序列化,MaoReduce排序初步,Mapreduce 的分割槽。

1. MapReduce計算模型

  • MapReduce是: 將使用者編寫的業務邏輯程式碼和自帶預設元件整合成一個完整的分散式運算程式,併發執行在Hadoop 叢集上。簡化平行計算的程式設計模型,降低了開發並行應用的入門門檻。
  • MapReduce思想:在生活中處處可見。或多或少都曾接觸過這種思想。MapReduce 的思想核心是“ 分而治之”, 適用於大量複雜的任務處理場景(大規模資料處理場景)。 即使是釋出過論文實現分散式計算的谷歌也只是實現了這種思想,而不是自己原創。

    • Map 負責“分”,即把複雜的任務分解為若干個“簡單的任務”來並行處理。 可以進行拆分的前提是這些小任務可以平行計算,彼此間幾乎沒有依賴關係
    • Reduce 負責“合”,即對 map 階段的結果進行全域性彙總。
  • 舉個栗子 我們要數圖書館中的所有書。你數 1 號書架,我數 2 號書架。這就是“ Map” 。我們人越多,數書就更快。現在我們到一起,把所有人的統計數加在一起。這就是“ Reduce” 。

  • 資料模型 Map: 對一組資料元素進行某種重複式的處理; Reduce: 對 Map 的中間結果進行某種進一步的結果整理。

    Map 和 Reduce 為程式設計師提供了一個清晰的操作介面抽象描述。 通過以上兩個程式設計介面,大家可以看出 MapReduce 處理的資料型別是key,value鍵值對

2. MapReduce編碼規範及測試

  • 程式設計規範 使用者編寫的程式分成三個部分: Mapper, Reducer, Driver(提交執行 mr 程式的客戶端)

    • Mapper 的輸入輸出都是資料是 KV 對的形式( KV 的型別可自定義),業務邏輯寫在map()方法中的,map()方法中(maptask)對每一個(k,v)呼叫一次
    • Reducer 的輸入資料型別對應 Mapper 的輸出資料型別,也是 KV, Reducer 的業務邏輯寫在 reduce()方法中, Reducetask 程序對每一組相同 k 的k,v組呼叫一次 reduce()方法
    • 使用者自定義的 Mapper 和 Reducer 都要繼承各自的父類
    • 整個程式需要一個 Drvier 來進行提交,提交的是一個描述了各種必要信 息的 job 物件
  • 例項程式碼

    • 需求:在一堆給定的文字檔案中統計輸出每一個單詞出現的總次數,準備檔案內容如下,兩份
hello hadoop HDFS hadoop
Stream Stocket Hello
hadoop HDFS Stocket
  • POM依賴:拷貝的時候注意POM中的提示
   <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
 //這裡需要自己定義,執行的主類路徑,請注意  <mainClass>cn.mapreduce.WordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>

                </configuration>
            </plugin>
        </plugins>
    </build>
  • Mapper類,實現業務邏輯,下面為引數解析:
引數解析: Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
KEYIN:表示mapper資料輸入的時候key的資料型別,在預設的讀取資料元件下,叫InputFormat,它的行為是一行一行的讀取待處理的資料一行,返回一行給我們的mr程式,這種情況下  keyin就表示每一行的起始偏移量 因此資料型別是Long

VALUEIN:表述mapper資料輸入的時候value的資料型別,在預設的讀取資料元件下 valuein就表示讀取的這一行內容  因此資料型別是String

KEYOUT 表示mapper資料輸出的時候key的資料型別  在本案例當中 輸出的key是單詞  因此資料型別是 String

VALUEOUT表示mapper資料輸出的時候value的資料型別  在本案例當中 輸出的key是單詞的次數  因此資料型別是 Integer

這裡所說的資料型別String Long都是jdk自帶的型別   在序列化的時候  效率低下 因此hadoop自己封裝一套資料型別
 *   long---->LongWritable
 *   String-->Text
 *   Integer--->Intwritable
 *   null-->NullWritable
package com.hadoop.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

         //拿到當前行讀入行的內容,轉換為String型別
        String line = value.toString();

        //使用空格將當前行的內容切分
        String[] words = line.split(" ");

        //遍歷當行的單詞,出現一次就標記1 形式<單詞,1>
        for (String word : words) {

            //使用MapReduce的上下文物件,把mapper階段處理的資料傳送出去
            //作為reduce節點的輸入資料
            //用第一行舉個例子:hello hadoop HDFS NIO---->  <hello,1><hadoop,1><HDFS,1><hadoop,1>
            context.write(new Text(word),new IntWritable(1));
        }


    }
}
  • Reducer類,實現業務邏輯,下面為引數解析:
PS:說明
Reducer接收來的資料,會按照Key的字典進行排序
輸入資料:<hello,1><hadoop,1><HDFS,1><hadoop,1>
排序資料:<hadoop,1><hadoop,1><hello,1><HDFS,1>
按照key是否相同去呼叫reduce()方法,這組的key就是這個相同的key
把這一組所有的v,作為一個迭代器傳入我們reduce()方法中<hadoop,[1,1]>
package com.hadoop.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        //定義一個計數器
        int count=0;

        //遍歷迭代器,把每一組的數量累加起來,就是單詞出現的次數
        for (IntWritable value : values) {
            count+=value.get();
        }

        //輸出最終結果
        context.write(key,new IntWritable(count));
    }
}
  • Driver類,實現業務邏輯,下面為引數解析:
package com.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {

    /**
     * 這個類就是mr程式執行時候的主類,本類中組裝了一些程式執行時候所需要的資訊
     * 比如:使用的是那個Mapper類  那個Reducer類  輸入資料在那 輸出資料在什麼地方
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //通過Job來封裝本次mr相關的資訊
        Configuration conf = new Configuration();
       //conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf);

        //指定本次MapReduce,job jar執行的主類
        job.setJarByClass(WordCountDriver.class);

        //指定本次mapReduce執行的mapper,Reducer類分別是什麼
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //指定本次Mapper階段輸出的資料型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定最終的輸出型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定檔案的輸入和輸出位置,都是在Windows的路徑下
        FileInputFormat.setInputPaths(job,new "/WordCount/Input");
        FileOutputFormat.setOutputPath(job,new Path("/WordCount/Output"));

        //提交程式  並且監控列印程式執行情況
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

3. MapReduce程式執行模式

  • 本地執行

    1. 本地執行需要釋放WordCountDirver類中的這一行程式碼conf.set("mapreduce.framework.name","local");
    2. 更改檔案的路徑
 //指定檔案的輸入和輸出位置,都是在Windows的路徑下
        FileInputFormat.setInputPaths(job,new "D:/temp/Input");
        FileOutputFormat.setOutputPath(job,new Path("D:/temp/Output"));

3.執行程式碼,會看到在D:/temp/Output路徑下的檔案part-r-00000,內容如下

這裡寫圖片描述

  • Linux的hadoop叢集執行

    • . 註釋或者刪除程式碼 //conf.set("mapreduce.framework.name","local");
    • . 將檔案打包 使用maven打包.jar檔案,上傳到虛擬機器目錄
    • . 建立目錄 啟動叢集,在叢集中建立對應的目錄
 FileInputFormat.setInputPaths(job,new "/WordCount/Input");
 FileOutputFormat.setOutputPath(job,new Path("/WordCount/Output"));
  • . 建立檔案,上傳到/WordCount/Input目錄
  • . 執行程式碼 找到對應jar的安裝包,開啟最高許可權,使用命令啟動 hadoop jar 上傳檔案.jar