1. 程式人生 > >MapReduce 詳解

MapReduce 詳解

宣告: https://blog.csdn.net/shujuelin/article/details/79119214
上面這篇部落格真的寫得很好,可以點選進去看

MapReduce是一個分散式運算程式的程式設計框架,是使用者開發"基於hadoop的資料分析應用"
MapReduce 核心功能是將使用者編寫的業務邏輯和自帶預設元件整合成一個完整的分散式運算程式,併發執行在一個hadoop叢集上

為什麼要MAPREDUCE
(1)海量資料在單機上處理因為硬體資源限制,無法勝任
(2)而一旦將單機版程式擴充套件到叢集來分散式執行,將極大增加程式的複雜度和開發難度
(3)引入mapreduce框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分散式計算中的複雜性交由框架來處理

設想一個海量資料場景下的wordcount需求:

單機版:記憶體受限,磁碟受限,運算能力受限
分散式:
1、檔案分散式儲存(HDFS)
2、運算邏輯需要至少分成2個階段(一個階段獨立併發,一個階段匯聚)
3、運算程式如何分發
4、程式如何分配運算任務(切片)
5、兩階段的程式如何啟動?如何協調?
6、整個程式執行過程中的監控?容錯?重試?

可見在程式由單機版擴成分散式時,會引入大量的複雜工作。為了提高開發效率,可以將分散式程式中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。

而mapreduce就是這樣一個分散式程式的通用框架,其應對以上問題的整體結構如下:

1、MRAppMaster(mapreduce application master)
2、MapTask
3、ReduceTask
分散式運算程式

1.分散式運算程式往往至少需要2個階段,完全並行
2.第一個階段的task併發例項各司其職互不相干,完全並行
3.第二個階段的task併發例項互不相干,但是他們的資料依賴上一個階段的task併發例項的輸出,這個依賴是全域性的依賴,每一個都依賴上一個階段的所有的併發例項的輸出
4.MapReduce是隻有兩個階段:Map階段和Reduce階段
如果你的程式很複雜,兩個階段搞不定,那麼就需要多個MapReduce程式,序列執行

第一個階段就是Map階段,第一個階段的執行例項就是 Map task
第二個階段就是Reduce階段,第二個階段的執行例項就是 Reduce task

如果需要統計檔案中每一個單詞出現的次數:

Map裡面的邏輯:
1.讀取資料
2.按行處理
3.按空格切分行內單詞
4.計數 HashMap(單詞, value+1)

等分給自己的資料片,全部讀完之後

5.把HashMap 輸出給下一個階段 – 將HashMap按照首字母範圍分成3個hashMap
6.將3個hashMap分別傳遞給3個 reduce task

若干複雜的細節問題:
1.你的Map task 如何進行任務分配
2.Reduce task 如何分配要處理的任務
3.Map task 和 Reduce task 之間如何銜接
4.如果某些Map task 執行失敗,如何處理
5.Map task 如果都要自己負責輸出資料的分割槽,那麼會很麻煩

如果要解決上面的問題(兩個階段的例項難以協調)
我們就需要一個主管 : mr application master


MapReduce程式設計例項 wordcount
由於使用MapReduce,所以之前的hdfsjar是不能滿足的,需要新的jar包.
mapReduceJars
這些jar包在hadoop的原始碼裡面的share檔案裡面都有.
需要的jar包:
hadoop-common-2.6.4.jar
然後還需要common裡面lib下的所有jar
hadoop-hdfs-2.6.4.jar
然後還需要hdfs裡面的lib下的所有的jar
還有mapReduce下面的jar包,但是不能包括 hadoop-mapreduce-examples-2.6.4.jar – 這個包裡面有已經寫好了的mapreduce例子
然後再家mapreduce的lib下的所有的jar
還需要yarn下的jar,不需要server
在這裡插入圖片描述

yarn下的lib所有的jar也需要

package com.thp.bigdata.wcdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * KEYIN : 預設情況下是mr框架所讀到的一行文字的起始偏移量  (預設是一行一行讀取資料,讀到一行就把起始偏移量傳遞到KEYIN裡面)   -- Long
 * VALUEIN : 預設情況下是mr框架所讀到一行文字的內容     --  String
 * KEYOUT : 是使用者自定義邏輯處理完成之後輸出資料中的key,   是我們自己來決定的  在此處是單詞  -- String
 * VALUEOUT : 是使用者自定義邏輯處理完成之後輸出資料的value    在此處是單詞次數  -- Integer
 * @author 湯小萌
 *
 */

/**
 * 我們要輸出的資料,需要經過網路傳輸,既然要經過網路傳輸,那麼這寫資料就需要經過序列化
 * 使用Serializable 進行序列化,比較冗餘,會將類的繼承資訊全部序列化
 * 但是在這裡我們只需要將物件裡面的資料進行序列化就可以
 * hadoop裡面有一個自己的序列化框架  --  更加精簡   Long - LongWritable  String - Text  Integer - IntWritable
 * @author 湯小萌
 *
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	/**
	 * map階段的業務邏輯就寫在自定義的map()方法中
	 * maptask會對每一行輸入資料呼叫一次我們自定義的方法
	 * key 是偏移量,對我進行業務邏輯的處理是沒有用的
	 */
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// 將maptask傳遞給我們的文字內容轉換成String
		String line = value.toString();
		// 根據空格將這一行切分單詞
		String[] words = line.split(" ");
		// 輸出的是  <單詞, 1>
		for(String word : words) {
			// 將單詞作為key,將次數1作為value,以便於後續的資料分發,可以根據單詞分發,以便於相同的單詞會到想的reduce task
			context.write(new Text(word), new IntWritable(1));
		}
	}
}

package com.thp.bigdata.wcdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 輸入的型別要跟Map階段的輸出的型別對應
 * 
 * KEYOUT, VALUEOUT 自定義Reduce邏輯處理結果的輸出資料型別
 * KEYOUT 是單詞
 * VALUEOUT 是總次數
 * @author 湯小萌
 * 
 */
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	/**
	 * 入參 key : 是一組相同單詞kv對的key
	 * 同一組 <hello,1>  <hello,1>  <hello,1>  <hello,1>  <hello,1>
	 * key 指的是 hello , 只需要一個 因為一組的單詞是一樣的  ,
	 * values 就是迭代器了,可以迭代裡面的每一個 1 進行計數
	 */
	// 每一次呼叫reduce是完成了一組單詞的統計,下一次呼叫就是統計另一組的單詞的統計
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int count = 0;
		for(IntWritable value : values) {
			count += value.get();  // 這個count 最後就是某一個單詞的彙總的值
		}
		context.write(key, new IntWritable(count));
	}
}

package com.thp.bigdata.wcdemo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * 驅動類
 * 相當於一個yarn叢集的客戶端
 * 需要在此封裝mapreduce程式的相關執行引數,指定jar包
 * 最後提交給yarn
 * @author 湯小萌
 *
 */
public class WordcountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		// 指定本程式的jar包所在的本地路徑
		job.setJarByClass(WordcountDriver.class);
		
		// 指定本業務job要使用的mapper/Reduce業務類
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReduce.class);
		
		// 指定mapper輸出資料的kv型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 指定最終輸出的資料的kv型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 指定job的輸入原始檔案所在的目錄
		// 待處理檔案可以在多個目錄裡面
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		// 指定job的輸出結果
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 將job中配置的相關引數,以及job所用的的java類所在的jar包,提交給yarn去執行
		/*job.submit();*/
		boolean res = job.waitForCompletion(true); // 會等待程式處理完成之後,程式才退出
		System.exit(res ? 0 : 1);
		
	}
}

先啟動hdfs

start-dfs.sh

還必須啟動 yarn , 需要執行在yarn 上

start-yarn.sh

啟動完之後需要檢查一下是否啟動成功


一個完整的mapreduce程式在分散式執行時有三類例項程序:

1、MRAppMaster:負責整個程式的過程排程及狀態協調
2、mapTask:負責map階段的整個資料處理流程
2、mapTask:負責map階段的整個資料處理流程


MapReduce 的程式設計規範:
(1)使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(提交執行mr程式的客戶端)
(2)Mapper的輸入資料是KV對的形式(KV的型別可自定義)
(3)Mapper的輸出資料是KV對的形式(KV的型別可自定義)
(4)Mapper中的業務邏輯寫在map()方法中
(5)map()方法(maptask程序)對每一個<K,V>呼叫一次
(6)Reducer的輸入資料型別對應Mapper的輸出資料型別,也是KV
(7)Reducer的業務邏輯寫在reduce()方法中
(8)Reducetask程序對每一組相同k的<k,v>組呼叫一次reduce()方法
(9)使用者自定義的Mapper和Reducer都要繼承各自的父類
(10)整個程式需要一個Drvier來進行提交,提交的是一個描述了各種必要資訊的job物件


MAPREDUCE框架結構及核心執行機制

  • 結構
    一個完整的mapreduce程式在分散式執行時有三類例項程序:
    1、MRAppMaster:負責整個程式的過程排程及狀態協調
    2、mapTask:負責map階段的整個資料處理流程
    3、ReduceTask:負責reduce階段的整個資料處理流程

  • MR程式執行流程
    1.流程示意圖:
    在這裡插入圖片描述

2.流程解析
a. 一個mr程式啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動後根據本次job的描述資訊,計算出需要的maptask例項數量,然後向叢集申請機器啟動相應數量的maptask程序
b. maptask程序啟動之後,根據給定的資料切片範圍進行資料處理,主體流程為:

利用客戶指定的inputformat來獲取RecordReader讀取資料,形成輸入KV對
將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到快取
將快取中的KV對按照K分割槽排序後不斷溢寫到磁碟檔案

c. MRAppMaster監控到所有maptask程序任務完成之後,會根據客戶指定的引數啟動相應數量的reducetask程序,並告知reducetask程序要處理的資料範圍(資料分割槽)
d. Reducetask程序啟動之後,根據MRAppMaster告知的待處理資料所在位置,從若干臺maptask執行所在機器上獲取到若干個maptask輸出結果檔案,並在本地進行重新歸併排序,然後按照相同key的KV為一個組,呼叫客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然後呼叫客戶指定的outputformat將結果資料輸出到外部儲存