1. 程式人生 > >大資料第四天——MapReduce原理及IDEA Maven下WordCount例項兩種實現

大資料第四天——MapReduce原理及IDEA Maven下WordCount例項兩種實現

1.MapReduce概述

HDFS實現了分散式檔案儲存,儲存問題解決了,我們就需要考慮如何對資料進行處理,MapReduce是一個計算框架(程式設計模型),基於該計算框架,可以很容易的編寫資料處理程式,從而以較高的效率處理海量資料集。

MR框架對於程式設計師的最大意義在於,不需要掌握分散式計算程式設計,不需要考慮分散式程式設計裡可能存在的種種難題,比如任務排程和分配、檔案邏輯切塊、位置追溯、工作。這樣,程式設計師能夠把大部分精力放在核心業務層面上,大大簡化了分散式程式的開發和調試周期。

2.MapReduce的設計思想

MapReduce的設計思想簡單概括而言,就是“分而治之”。整個MapReduce階

段分為兩大部分,分別是Map、Reduce。

(1)map負責“分”,即把複雜的任務分解為若干個“簡單的任務”來處理。“簡單的任務”包含三層含義:一是資料或計算的規模相對原任務要大大縮小;二是就近計算原則,即任務會分配到存放著所需資料的節點上進行計算;三是這些小任務可以平行計算,彼此間幾乎沒有依賴關係。

(2)reducer負責”合”,即對map階段輸出的處理結果進行彙總。

3.MapReduce組成架構

在MapReduce程式進行計算時,需要yarn進行資源管理,yarn主要分為ResourceManager和NodeManager兩部分。

ResourceManager

是一個全域性的資源管理器,ResourceManager控制整個叢集並管理程式向叢集其他DataNode進行資源的分配。

NodeManager是每個DataNode節點上的資源和工作管理員,一方面,它會定時地向ResourceManager彙報本節點上的資源使用情況以及執行狀態;另一方面,它接收並處理來自ResourceManager分配的任務。

4.WordCount第一種實現(jar)

1.啟動MapReduce(先啟動hadoop,命令:sh start-dfs.sh)

在啟動hadoop前提下,進入hadoop中的sbin目錄,執行:sh start-yarn.sh

通過jps檢視是否啟動:

如果有NodeManager和ResourceManager,則證明啟動成功。

2.IDEA的pom.xml檔案中需要的架包

<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-jobclient -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.1</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.1</version>
        </dependency>

3.封裝工具類-ClassUtil和MrUtil(自定義的封裝,不是必要的,這是為了減少程式碼)

/**
 * Title:    ClassUtil
 * Description:    抽象物件類
 *
 * @author dtt
 * @data 2018-09-06 23:14
 **/

public class ClassUtil {
    private Class classname;
    private Class okey;
    private Class ovalue;

    public ClassUtil(Class a,Class b,Class c){
        this.classname = a;
        this.okey = b;
        this.ovalue = c;
    }
    public Class getClassname() { return classname; }
    public Class getOkey() { return okey;  }
    public Class getOvalue() {  return ovalue; }
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * Title:    MrUtil
 * Description:    MapReduce工具類
 *
 * @author dtt
 * @data 2018-09-06 22:50
 **/
public class MrUtil {
    private Configuration conf;
    public MrUtil(){ this.conf = new Configuration(); }
    public void go(String jobname, Class mapreduce, ClassUtil map, ClassUtil reduce,String iPath,String oPath) throws Exception{
        // 建立job物件
        Job job = Job.getInstance(conf,jobname);
        // 設定執行job的類
        job.setJarByClass(mapreduce);
        // 設定 mapper 類
        job.setMapperClass(map.getClassname());
        // 設定 map 輸出的key value
        job.setMapOutputKeyClass(map.getOkey());
        job.setOutputValueClass(map.getOvalue());
        // 設定 reduce 類
        job.setReducerClass(reduce.getClassname());
        // 設定reduce 輸出的key value
        job.setOutputKeyClass(reduce.getOkey());
        job.setOutputValueClass(reduce.getOvalue());
        // 設定輸入輸出路徑
        FileInputFormat.setInputPaths(job,new Path(iPath));
        FileOutputFormat.setOutputPath(job,new Path(oPath));
        // 提交job
        boolean b = job.waitForCompletion(true);
        if(!b){
            System.out.println(jobname + " task fail!");
        } else{
            System.out.println(jobname + " task success");
        }
    }
}

4.程式碼

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * Title:    WordCountMapper
 * Description:    wordCount mapper程式
 * @author dtt
 * @data 2018-09-06 10:20
 **/
/*
*Mapper<LongWritable, Text, Text, IntWritable>
* 其中的4個型別分別是:輸入key型別、輸入value型別、輸出key型別、輸出value型別。
* */
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    protected  void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {
        //得到輸入的每一行資料
        String line = value.toString();
        //通過空格分割
        String[] words = line.split(" ");
        //迴圈遍歷 輸出
        for (String word : words) {
            context.write(new Text(word),new IntWritable(1));
        }
    }
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * Title:    WordCountReduce
 * Description:    wordcount reduce程式
 * @author dtt
 * @data 2018-09-06 10:28
 **/
/*
* Reducer<Text, IntWritable, Text, IntWritable>
* 4個型別分別指:輸入key的型別、輸入value的型別、輸出key的型別、輸出value的型別。
* */
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
    protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
        Integer count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * Title:    WordCountMapReduce
 * Description:    wordcount 主程式
 * @author dtt
 * @data 2018-09-06 10:33
 **/
public class WordCountMapReduce {
    private static String myurl = "hdfs://自己的IP:9000";
    public static void main(String[] args) throws Exception {
        ClassUtil map = new ClassUtil(WordCountMapper.class,Text.class,IntWritable.class);
        ClassUtil reduce = new ClassUtil(WordCountReduce.class,Text.class,IntWritable.class);
        Class mr = WordCountMapReduce.class;
        MrUtil job = new MrUtil();
        job.go("wordcounnt",mr,map,reduce,myurl+"/wordcount/input",myurl+"/wordcount/output");
    }
}

5.兩種實現

1).jar包實現,點選IDEA右側的Maven Project。點選裡面的package。

 

在xshell中通過rz命令上傳word.jar到指定目錄

執行jar包:hadoop jar word.jar

2).直接執行。