1. 程式人生 > >Hadoop(十二)MapReduce概述

Hadoop(十二)MapReduce概述

span 分享 相同 ioe rdquo line 進度 列數 程序

前言

  前面以前把關於HDFS集群的所有知識給講解完了,接下來給大家分享的是MapReduce這個Hadoop的並行計算框架。

一、背景

1)爆炸性增長的Web規模數據量

  技術分享

2)超大的計算量/計算復雜度

  技術分享

3)並行計算大趨所勢

  技術分享

二、大數據的並行計算

1)一個大數據若可以分為具有同樣計算過程的數據塊,並且這些數據塊之間不存在數據依賴關系,則提高處理速度最好的辦法就是並行計算

  技術分享

2)大數據並行計算

  技術分享

三、Hadoop的MapReduce概述

3.1、需要MapReduce原因

  技術分享

3.2、MapReduce簡介 

  1)產生MapReduce背景

    技術分享

  2)整體認識

    MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算,用於解決海量數據的計算問題
    MapReduce分成了兩個部分:
      1)映射(Mapping)對集合裏的每個目標應用同一個操作。即,如果你想把表單裏每個單元格乘以二,那麽把這個函數單獨地應用在每個單元格上的操作就屬於mapping。
      2)化簡(Reducing)遍歷集合中的元素來返回一個綜合的結果。即,輸出表單裏一列數字的和這個任務屬於reducing。
        你向MapReduce框架提交一個計算作業時,它會首先把計算作業拆分成若幹個Map任務,然後分配到不同的節點上去執行,

        每一個Map任務處理輸入數據中的一部分,當Map任務完成後,它會生成一些中間文件,這些中間文件將會作為Reduce任務的輸入數據。
    Reduce任務的主要目標就是把前面若幹個Map的輸出匯總到一起並輸出。
    MapReduce的偉大之處就在於編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。

3.3、MapReduce編程模型

  1)MapReduce借鑒了函數式程序設計語言Lisp中的思想,定義了如下的Map和Reduce兩個抽象的編程接口。由用戶去編程實現:

    技術分享

    註意:Map是一行一行去處理數據的。

  2)詳細的處理過程

    技術分享

四、編寫MapReduce程序

4.1、數據樣式與環境

  1)環境   

    我使用的是Maven,前面 有我配置的pom.xml文件。

  2)數據樣式

    這是一個專利引用文件,格式是這樣的:

    專利ID:被引用專利ID

    1,2

    1,3

    2,3

    3,4

    2,4

4.2、需求分析

  1)需求

    計算出被引用專利的次數

  2)分析

    從上面的數據分析出,我們需要的是一行數據中的後一個數據。分析一下:

    在map函數中,輸入端v1代表的是一行數據,輸出端的k2可以代表是被引用的專利,在一行數據中所以v2可以被賦予為1。

    在reduce函數中,k2還是被引用的專利,而[v2]是一個數據集,這裏是將k2相同的鍵的v2數據合並起來。最後輸出的是自己需要的數據k3代表的是被引用的專利,v3是引用的次數。

    畫圖分析:

      技術分享

4.3、代碼實現

  1)編寫一個解析類,用來解析數據文件中一行一行的數據。

import org.apache.hadoop.io.Text;

public class PatentRecordParser {
    //1,2
    //1,3
    //2,3
    //表示數據中的第一列
    private String patentId;
    //表示數據中的第二列
    private String refPatentId;
    //表示解析的當前行的數據是否有效
    private boolean valid;

    public void parse(String line){
        String[]  strs = line.split(",");
        if (strs.length==2){
            patentId = strs[0].trim();
            refPatentId = strs[1].trim();
            if (patentId.length()>0&&refPatentId.length()>0){
                valid = true;
            }
        }
    }

    public void parse(Text line){
        parse(line.toString());
    }

    public String getPatentId() {
        return patentId;
    }

    public void setPatentId(String patentId) {
        this.patentId = patentId;
    }

    public String getRefPatentId() {
        return refPatentId;
    }

    public void setRefPatentId(String refPatentId) {
        this.refPatentId = refPatentId;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }
}

  2)編寫PatentReference_0011去實現真正的計算

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


public class PatentReference_0011 extends Configured implements Tool {

//-Dinput=/data/patent/cite75_99.txt
    public static class PatentMapper
            extends Mapper<LongWritable,Text,Text,IntWritable>{
        private PatentRecordParser parser = new PatentRecordParser();
        private  Text key = new Text();
        //把進入reduce的value都設置成1
        private IntWritable value = new IntWritable(1);

        //進入map端的數據,每次進入一行。
        //MapReduce都是具有一定結構的數據,有一定含義的數據。
        //進入時候map的k1(該行數據首個字符距離整個文檔首個字符的距離),v1(這行數據的字符串)
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            parser.parse(value);
            if (parser.isValid()){
                this.key.set(parser.getRefPatentId());
                context.write(this.key,this.value);
            }
        }
    }

    public static class PatentReducer
            extends Reducer<Text,IntWritable,Text,IntWritable>{

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable iw:values){
                count+=iw.get();
            }
            context.write(key,new IntWritable(count));
            //註意:在map或reduce上面的打印語句是沒有辦法輸出的,但會記錄到日誌文件當中。
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        //構建作業所處理的數據的輸入輸出路徑
        Configuration conf = getConf();
        Path input = new Path(conf.get("input"));
        Path output = new Path(conf.get("output"));
        //構建作業配置
        Job job = Job.getInstance(conf,this.getClass().getSimpleName()+"Lance");//如果不指定取的名字就是當前類的類全名

        //設置該作業所要執行的類
        job.setJarByClass(this.getClass());

        //設置自定義的Mapper類以及Map端數據輸出時的類型
        job.setMapperClass(PatentMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //設置自定義的Reducer類以及輸出時的類型
        job.setReducerClass(PatentReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //設置讀取最原始數據的格式信息以及
        //數據輸出到HDFS集群中的格式信息
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //設置數據讀入和寫出的路徑到相關的Format類中
        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);

        //提交作業
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(
                ToolRunner.run(new PatentReference_0011(),args)
        );;
    }
}

  3)使用Maven打包好,上傳到安裝配置好集群客戶端的Linux服務器中

  4)運行測試

    技術分享

    執行上面的語句,註意指定輸出路徑的時候,一定是集群中的路徑並且目錄要預先不存在,因為程序會自動去創建這個目錄。

  5)然後我們可以去Web控制頁面去觀察htttp://ip:8088去查看作業的進度

  

喜歡就點“推薦”哦! 

Hadoop(十二)MapReduce概述