MapReduce04 框架原理Shuffle
MapReduce 框架原理
1.InputFormat可以對Mapper的輸入進行控制
2.Reducer階段會主動拉取Mapper階段處理完的資料
3.Shuffle可以對資料進行排序、分割槽、壓縮、合併,核心部分。
4.OutPutFomat可以對Reducer的輸出進行控制
2 MapReduce工作流程
3 Shuffle機制(重點)
3.1 Shuffle機制
Map方法之後,Reduce方法之前的資料處理(洗排)過程稱為Shuffle。、
如果把ReduceTask設定成0,不進行ruduce,shuffle階段也會消失
MapTask程序對每一個<K,V>
呼叫一次map()方法
ReduceTask程序對每一組相同k的<k,v>
組呼叫一次reduce()方法。
3.2 Partition分割槽
問題
要求統計結果按照條件輸出到不同的檔案中,比如:統計結果135開頭的輸入到一個檔案、136開頭的輸入一個檔案。
預設Partitioner分割槽
下面程式碼是分割槽大於1的情況下預設的分割槽類,自定義的分割槽實際上替換的是這個。
public class HashPartitioner<K,V> extends Partitioner<K,V>{ public int getPartition(K key,V value,int numReduceTask){ //預設分割槽是根據key的hashCode對ReduceTasks個數取模得到的,使用者沒法控制key儲存到哪個分割槽。 //key.hasCode() & Integer.MAX.VALUE 用於控制key最大不超過Integer.MAX.VALUE retrun(key.hasCode() & Integer.MAX.VALUE)%numReduceTask; } }
自定義Partitioner分割槽
1.自定義類繼承Partitioner,重寫getPartition()方法
public class CustomPartitioner extends Partitioner<K,V>{
@Override
public int getPartition(K key,V value,int numReduceTask){
//控制分割槽程式碼邏輯
return partition;
}
}
2.在Job驅動中,設定分割槽類為自定義的Partitioner
job.setPartitionerClass(CustomPartitioner.class);
3.在Job驅動中,要根據自定義Partitioner的邏輯設定相應數量的ReduceTask
如果setNumReduceTasks=1,直接走else。
job.setNumReduceTasks(2);
自定義Partition分割槽案例
需求
將統計結果按照手機號開頭輸出到不同檔案(分割槽)中
輸入資料:D:\hadoop_data\input\inputpartition
檔案
期望輸出資料:手機號136、137、138、139開頭的分別放入一個檔案中,剩下的放入一個檔案中
需求分析
輸入資料
輸出資料
檔案1 136開頭的資料
檔案2 137開頭的資料
檔案3 138開頭的資料
檔案4 139開頭的資料
檔案5 其他
自定義分割槽
分割槽0 136
分割槽1 137
分割槽2 138
分割槽3 139
分割槽4 其他
設定使用自定義分割槽,指定ReduceTasks的數量為5
案例實現
1.使用之前序列化的程式碼
2.新增ProvicePartitioner類
ProvicePatitioner.class
package ranan.mapreduce.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
//text是手機號
String phone = text.toString().substring(0,3);
int partition;
//防止空指標異常,常量寫在前面
if("136".equals(phone)){
partition=0;
}else if("137".equals(phone)){
partition=1;
}else if("138".equals(phone)){
partition=2;
}else if("139".equals(phone)){
partition=3;
}else {
partition=4;
}
return partition;
}
}
小技巧:字串比較相等時,把常量寫在前面可以防止空指標異常。
FlowDriver.class 新增一下程式碼
//設定使用自定義類
job.setPartitionerClass(ProvincePartitioner.class);
//設定ReduceTasks的個數
job.setNumReduceTasks(5); //有5個分割槽,可以設定5到5以上
//修改輸入路徑
輸出結果
總結
1.ReduceTask的數量>getPartition結果的數,則會產生幾個空的輸出檔案。
2.1<ReduceTask的數量<getPartition結果的數,有一部分分割槽資料沒有地方放,報錯。
3.ReduceTask的數量=1,最終只會產生一個輸出檔案。
4.分割槽號必須從0開始累加
3.3 WritableComparable排序
概述
MapTask和ReduceTask均會對資料按照key進行排序,該操作屬於Hadoop的預設行為。任何應用程式中的資料均會被排序,而不管邏輯上是否需要。
預設排序是按照字典順序排序,預設排序方法是快速排序。
對於MapTask,它會將處理的結果暫時放到環形緩衝區,當環形快取區使用率達到一定閾值後(80%),再對緩衝區中的資料進行一次快排。並將這些有序資料溢寫到磁碟上。當所有資料處理完畢後,它會對磁碟上所有檔案進行歸併排序。
對於ReduceTask,它從每個MapTask上遠端拷貝相應的資料檔案,如果檔案大小超過一定閾值,則合併後溢寫磁碟上,否則儲存在記憶體中。如果磁碟上檔案數目達到一定閾值,則進行一次歸併排序以生成一個更大檔案。所有資料拷貝完畢後,ReduceTask統一對記憶體和磁碟上的所有資料進行一次歸併排序。
自定義排序WritableComparable原理分析
bean物件做為key傳輸,需要實現WritableComparable介面重寫compareTo方法,就可
以實現排序。WritableComparable是繼承了Writable介面的
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
public class FlowBean implements WritableComparable <FlowBean> {
@Override
public int compareTo(FlowBean bean) {
int result;
//按照總流量大小,倒序排列
if (this.sumFlow >bean.getSumFlow()) {
result =-1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
}
1.WritableComparable 排序案例實操(全排序)
需求
根據序列化案例產生的結果檔案再此對總流量進行倒序排序
輸入資料D:\hadoop_data\input\inputpartition
檔案
第一次處理後的資料 part-r-00000,在這個檔案的基礎上,按總流量進行倒序排序。
期望輸出資料
一般需要進行兩次MapReduce,因為是按照key排序,第一次MapReduce的key是電話號碼,算出總流量,第二次MapReduce的key是FlowBean物件裡的總流量。
需求分析
需求:根據手機的總流量進行倒序排序
輸入資料
輸出資料
程式碼實現
FlowBean 類
package ranan.mapreduce.writableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 1、定義類實現writable介面
* 2、重寫序列化和反序列化方法
* 3、重寫空參構造
* 4、toString方法
*/
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 總流量
// 空參構造
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean bean) {
int result;
//按照總流量大小,倒序排列
if (this.sumFlow >bean.getSumFlow()) {
result =-1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
}
FlowMapper 類
package ranan.mapreduce.writableComparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//獲取一行
String line = value.toString();
//進行切割
String [] str = line.split("\t");
//封裝
outV.set(str[0]);
outK.setUpFlow(Long.parseLong(str[1]));
outK.setDownFlow(Long.parseLong(str[2]));
outK.setSumFlow();
context.write(outK,outV);
}
}
FlowReducer 類
package ranan.mapreduce.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean,Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text ,FlowBean>.Context context) throws IOException, InterruptedException {
for(Text value:values){
//這裡是不需要合併key的,輸出的key是手機號,輸出的value是FlowBean
context.write(value,key);
}
}
}
FlowDriver 類
package ranan.mapreduce.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 設定jar
job.setJarByClass(FlowDriver.class);
// 3 關聯mapper 和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 設定mapper 輸出的key和value型別
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5 設定最終資料輸出的key和value型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 設定資料的輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop_data\\input\\inputpartition"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop_data\\output"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
輸出結果
2.二次排序
需求:如果總流量相同,按上行流量從小到大排序
//寫法1
@Override
public int compareTo(FlowBean bean) {
int result;
//按照總流量大小,倒序排列
if (this.sumFlow >bean.sumFlow) {
return -1;
}else if (this.sumFlow < bean.sumFlow) {
return 1;
}else {
if(this.upFlow>bean.upFlow){
return 1;
}else if(this.upFlow<bean.upFlow){
return -1;
}
else {
return 0;
}
}
}
//寫法2
@Override
public int compareTo(FlowBean bean) {
int result;
//按照總流量大小,倒序排列
if (this.sumFlow == bean.sumFlow) {
//如果相同按上行流量從小到大排序
return (int)(this.upFlow - bean.upFlow);
}else {
return (int)(bean.sumFlow-this.sumFlow);
}
}
3.區內排序
需求:136 137 138 139 其他 分5個區,每個區按總流量降序排,相同按上行流量從小到大排序
用上面的程式碼其餘不變,增加類ProvincePartitioner.class
package ranan.mapreduce.writableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import ranan.mapreduce.partition.FlowBean;
public class ProvincePartitioner extends Partitioner<FlowBean,Text> {
@Override
public int getPartition(FlowBean flowBean,Text text,int numPartitions) {
//text是手機號
String phone = text.toString().substring(0,3);
int partition;
//防止空指標異常,常量寫在前面
if("136".equals(phone)){
partition=0;
}else if("137".equals(phone)){
partition=1;
}else if("138".equals(phone)){
partition=2;
}else if("139".equals(phone)){
partition=3;
}else {
partition=4;
}
return partition;
}
}
在FlowDriver掛載分割槽
//掛載分割槽
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
Combiner合併(可選)
提前對(a,1)(a,1) 進行合併成(a,2),幫助MapReduce先處理一點,提高效率。
說明:
1.Combiner是MR程式中Mapper和Reducer之外的一種元件
2.Combiner元件的父類是Reducer
3.和Reducer的區別在於執行的位置,Combiner是在每一個MapTask所在節點執行,Reduce是接受全域性所有Mapper的輸出結果。
4.Combiner的意義是對每一個MapTask的輸出進行區域性彙總,以減少網路傳輸量
5.Combiner能夠應用的前提是不能影響最終的業務邏輯,並且Combiner輸出的KV應該跟Reducer輸入的kv型別對應起來。
下圖是不可以使用的場景
3.4 自定義Combiner案例
需求
基於WordCount案例,統計過程中對每一個MapTask的輸出進行區域性彙總,以減小網路傳輸量,採用Combiner功能
資料輸入:D:\hadoop_data\input\combiner
//combiner.txt
xx rr
dd rr
yy rr
xx dd
期望結果
在Map階段處理資料
<rr,3>
<xx,2>
<yy,1>
<dd,2>
方案1
1.增加一個WordCountCombiner類繼承Reducer
2.在WordCountCombiner中 單詞彙總、將統計結果輸出
1.增加一個WordCountCombiner類繼承Reducer
package ranan.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
1.繼承Reducer在Combiner在Map階段,輸入的資料是Map傳遞過來的(Map的輸出),輸出給Reduce(Reduce的輸入)
*/
public class WordCountCombiner extends Reducer <Text,IntWritable,Text, IntWritable> {
//2.重寫reduce方法,遇見不同的key執行一次reduce
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum =0;
for (IntWritable num:values) {
sum += num.get(); //num轉化成int型別
}
outV.set(sum);
context.write(key,outV);
}
}
2.在job中進行配置,增加如下程式碼
job.setCombinerClass(WordCountCombiner.class);
3.執行結果,符合預期
4.補充
如果把ReduceTask設定成0,不進行ruduce,shuffle階段也會消失
方案2
我們發現WordCountReducer和WordCountCombiner實現的邏輯是一樣的,所以將WordCountReducer作為Combiner,在WordCountDriver驅動類中指定。
job.setCombinerClass(WordCountReducer.class);