1. 程式人生 > 其它 >MapReduce的執行流程和程式設計規範以及簡單的WordCount案例

MapReduce的執行流程和程式設計規範以及簡單的WordCount案例

技術標籤:hadoopmapReduce大資料hadoopmapreduce

▶ \blacktriangleright MapReduce程式設計規範

  • Mapper階段(以WordCount為例)

    1. 使用者自定義的類要繼承Mapper類
    2. 輸入的資料是KV對的形式(K:當前行數,V:當前行的內容)
    3. Mapper中的業務邏輯寫在map()方法中,也就是說要重寫map方法
    4. 輸出資料也是KV對的形式(K:每個單詞 V: 1 )
    5. map方法對每個K V呼叫一次
  • Reducer階段
    8i9=-0o

    1. 使用者自定義的類要繼承Reducer類
    2. 輸入型別詩句對應的時Mapper的輸出資料型別,也是KV
    3. Reducer的業務邏輯寫在reduce方法中
    4. ReduceTask程序對每一組相同K的的<K,V>組呼叫一側reduce方法
  • Driver階段

    相當於yarn叢集的客戶端,用來提交我們整個程式到yarn叢集,提交的是封裝了MapRdeuce程式相關執行引數的job物件


▶ \blacktriangleright 案例實操

  • 在F盤新建一個hello.txt

      內容:Hello World
      Hello Hadoop
      Hello MapReduce
    
  • 新建WordCountMapper.java


import org.apache.hadoop.io.IntWritable;
import org.apache.
hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(); @Override protected
void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString();//行的內容 String[] words = line.split(" ");//空格拆分成單詞 for(String word : words){ k.set(word); v.set(1); context.write(k,v); } } }
  • 新建WordCountReducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> {
   /* 
    實際拿到的
    hadoop,1
    hello ,(1,1,1)
    mapreducer,1
    world ,1
    */
    int sum = 0;
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        for(IntWritable i : values){
            sum+=i.get();//結果轉成int型
        }
        context.write(key,new IntWritable(sum));
    }
}

  • 新建Driver.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Driver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();   //封裝成Job物件
        job.setMapOutputKeyClass(Text.class);//map輸出的key的型別
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);//reducer輸出的key的型別
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(WordCountMapper.class);//設定返回的型別
        job.setReducerClass(WordCountReducer.class);

        FileInputFormat.setInputPaths(job,new Path("F://hello.txt"));//讀取檔案路徑
        FileOutputFormat.setOutputPath(job,new Path("F://helloOutput"));//結果輸出的路徑

        job.waitForCompletion(true);//完成之後退出
    }
}


▶ \blacktriangleright MapReduce的執行流程

內容:Hello World
    Hello Hadoop
    Hello MapReduce
  1. 分片

    分片過程中主要的任務就是確定資料大小。這個值一旦確定就可以依次將原始檔按值進行劃分,小於這個值就成為單獨的一個區,大於這個值就按這個值劃分,不足的部分成為單獨的一個區。hadoop2.x的中這個值是128MB,也就是說原始檔有200MB時,就分成2個區,一個區是0-128,剩下的是一個單獨的區

2.輸入格式化

 是將劃分好的區格式轉化成<key,value>形式的資料,其中key是行數,value是當前行的內容。
每解析出一個數據源,就交給Mapper處理一次
根據如上內容,解析出的資料來源應該是
        <1,Hello World>
        <2,Hello Hadoop>
        <3,Hello MapReduce>

3.Map過程

Mapper接收<key,value>形式的資料,並處理成<key,value>形式的資料

在WOrdCount中,Mapper會解析傳過來的key值,以空格字元為標誌,如果碰到空格就把之前累計的字串作為輸出的key值 ,1當作value的值

根據如上內容,輸出的內容是
        <Hello,1>
        <World,1>
        <Hello,1>
        <Hadoop,1>
        <Hello,1>
        <MapReduce,1>

4.Shuffle過程

主要是對Map輸出結果進行分割槽、排序、合併等處理並交給Reduce的過程

根據如上內容,處理之後的結果是
        <Hadoop,{1}> 
        <Hello,{1,1,1}>
        <MapReduce,{1}>
        <World,{1}>

5.Reduce過程

Reduce接收<key,{value list}>形式的資料流,形成<key,value>形式的資料輸出,輸出資料直接寫入HDFS

在WordCount中,Reducer會將相同的key的value list進行累加,然後得到單詞的總次數,然後輸出

根據如上內容,寫入的資料是
        <Hadoop,{1}>     -------------><hadoop,1>
        <Hello,{1,1,1}>-------------><Hello,3>
        <MapReduce,{1}>  -------------><MapReduce,1>
        <World,{1}>      -------------><World,1>
  1. 輸出

    MapReduce看框架自動把Reducer生成的<key,value>傳入write()方法,write()方法實現檔案的寫入

    在WordCount中,呼叫的是預設的文字寫入的方法