MapReduce的Shuffle機制
1、MapReduce的shuffle機制
1.1、概述
MapReduce中,mapper階段處理的資料如何傳遞給reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫shuffle.
Shuffle:資料混洗---------(核心機制:資料分割槽,排序,區域性聚合,快取,拉取,再合併排序)
具體來說,就是將MapTask輸出的處理資料結果,按照Partitioner元件制定的規則分發ReduceTask,並在分發的過程中,對資料按key進行分割槽和排序
1.2、主要流程
Shuffle是MapReduce處理流程中的一個核心,它的每一個處理步驟是分散在各個Maptask和reducetask節點上完成的,整體來看,分為3個操作:
1、分割槽partition(如果reduceTask只有一個或者沒有,那麼partition將不起作用。設定沒設定相當於沒有)
2、Sort根據key排序(MapReduce程式設計中sort是一定會做的,並且只能按照key排序,當然如果沒有reduce階段,那麼就不會對key排序)
3、Combiner進行區域性value的合併(Combiner是可選的元件,作用是為了提高任務的執行效率)
1.3、詳細流程
1、mapTask收集我們map()方法輸出的kv對,放在記憶體緩衝區kvbuffer(環形緩衝區:記憶體中的一種首尾相連的資料結構,kvbuffer包含資料區和索引區)中,在存資料的時候,會呼叫partitioner進行分割槽編號的計算,並存入元資料中
2、當記憶體緩衝區的資料達到100*0.8時,就會開始溢寫到本地磁碟檔案file.out,可能會溢位多次,則會有多個檔案,相應的緩衝區中的索引區資料溢位為磁碟索引檔案file.out.index
3、在溢寫前,會先根據分割槽編號排序,相同的分割槽的資料,排在一起,再根據map的key排序(快排)
4、多個溢寫檔案會被合併成大的溢位檔案(歸併排序)
5、在資料量大的時.候,可以對maptask結果啟用壓縮,將mapreduce.map.ouput.compress設為true,並使用
mapreduce.map.output.compress.codec設定使用的壓縮演算法,可以提高資料傳輸到reduce端的效率
6、reduceTask根據自己的分割槽號,去各個mapTask機器上取相應的結果分割槽資料
7、reduceTask會取到同一個分割槽的來自不同mapTask的結果檔案,reduceTask會將這些檔案再進行合併(歸併排序)
8、合併成r大檔案後,shuffle的過程也就結束了,後面進入reduceTask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,呼叫使用者自定義的reduce()方法)
2、自定義Shuffle過程中的元件
1、自定義輸入
預設輸入類:TextInputFormat
自定義:
模仿 org.apache.hadoop.mapreduce.lib.input.LineRecordReader 和org.apache.hadoop.mapreduce.lib.input.TextInputFormat
1、自定義類繼承FileInputFormat
public class MyFileInputFormat extends FileInputFormat<Text, LongWritable>{
@Override
public RecordReader<Text, LongWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
//例項化一個
MyAllFileRecodReader reader = new MyAllFileRecodReader();
//split引數和context都是框架自動傳入的,把這兩個引數傳給reader進行處理,以便獲取相關資訊
reader.initialize(split, context);
return reader;
}
/**
* 給定的檔名可拆分嗎?返回false確保單個輸入檔案不會被分割。以便Mapper處理整個檔案。
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
2、自定義類實現RecordReader
public class MyFileRecodReader extends RecordReader<Text, LongWritable>{
//用於儲存檔案系統輸入流
private FSDataInputStream open = null;
//儲存檔案長度
private int fileSplitLength = 0;
/**
* 當前的MyAllFileRecodReader讀取到的一個key-value
*/
private Text key = new Text();
private LongWritable value = new LongWritable();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//通過InputSplit物件獲取檔案路徑
FileSplit fileSplit = (FileSplit)split;
Path path = fileSplit.getPath();
//獲取檔案長度
fileSplitLength = (int)fileSplit.getLength();
//通過context物件獲取到配置檔案資訊,通過配置檔案獲取到一個當前檔案系統
Configuration configuration = context.getConfiguration();
FileSystem fs = FileSystem.get(configuration);
//獲取檔案系統的一個輸入流
open = fs.open(path);
}
/**
* 已讀標記
* 如果為false,表示還沒有進行讀取
* 在需求中一個mapTask只處理一個小檔案,一個mapTask最終只需要讀取一次就完畢
* 如果一個檔案讀取完畢了,那麼就把isRead這個變數標記為true
*/
private boolean isRead = false;
/**
* 實現讀取規則:逐檔案讀取
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//如果沒有讀取過檔案就進入
if(!isRead){
//準備一個位元組陣列長度為檔案的長度
byte[] buffer = new byte[fileSplitLength];
//一次性把真個檔案讀入位元組陣列中
IOUtils.readFully(open, buffer);
//把讀取到的檔案傳給key
key.set(buffer, 0, fileSplitLength);
//設定已讀標記為true
isRead = true;
//返回讀取一個檔案成功標記
return true;
}else{
return false;
}
}
//獲取key的方法
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
//獲取當前value值
@Override
public LongWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 獲取資料的處理進度的
*/
@Override
public float getProgress() throws IOException, InterruptedException {
//已讀為真返回1.0,沒有讀返回0
return isRead ? 1.0F : 0F;
}
@Override
public void close() throws IOException {
//關閉輸入流
IOUtils.closeQuietly(open);
}
2、自定義分割槽
需要: 1、繼承 partitioner
2、重寫getpartition()方法
3、在main方法中指定分割槽類 job.setPartitionclass()
package homework;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class Mypartition extends Partitioner<Student, Text> {
@Override
public int getPartition(Student key, Text arg1, int arg2) {
if(key.getType().equals("math")){
return 0;
}
if(key.getType().equals("english")){
return 1;
}
if(key.getType().equals("computer")){
return 2;
}else{
return 3;
}
}
}
3、自定義排序
需要 : 1、實現writableComparable
2、重新write()、readFields()、compareTo()方法
package homework;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Student implements WritableComparable<Student> {
private String type;
private String name;
private Double avg;
public Student() {
super();
}
public Student(String type, String name, Double avg) {
super();
this.type = type;
this.name = name;
this.avg = avg;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getAvg() {
return avg;
}
public void setAvg(Double avg) {
this.avg = avg;
}
@Override
public String toString() {
return type + "\t" + name + "\t" + avg ;
}
@Override
public void readFields(DataInput in) throws IOException {
this.type=in.readUTF();
this.name=in.readUTF();
this.avg=in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(type);
out.writeUTF(name);
out.writeDouble(avg);
}
@Override
public int compareTo(Student o) {
int temp=o.getType().compareTo(this.getType());
if(temp==0){
if(o.getAvg()>this.getAvg()){
return 1;
}else if(o.getAvg()<this.getAvg()){
return -1;
}else{
return 0;
}
}
return temp;
}
}
4、自定義分組
需要 : 1、繼承writableComparable
2、重寫compare()方法
3、指定分組類 job.setGroupingComparatorClass(MyGroup.class);
4、既有分割槽又有排序的時候,分組欄位一定在排序欄位中
package homework;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroup extends WritableComparator {
public MyGroup() {
super(Student.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Student aa=(Student)a;
Student bb=(Student)b;
return aa.getType().compareTo(bb.getType());
}
}
5、自定義輸出
1)模仿 org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
public class MyMultipePathOutputFormat extends FileOutputFormat<Text, NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//獲得當前的檔案系統傳給自定義的RecordWriter元件
Configuration configuration = job.getConfiguration();
FileSystem fs = FileSystem.get(configuration);
try {
//返回一個RecordWriter正在處理輸出資料的元件
return new MyMutiplePathRecordWriter(fs);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
2)繼承RecordWriter 並實現write()方法
public class MyMutiplePathRecordWriter extends RecordWriter<Text, NullWritable>{
//宣告要輸出的兩個路徑
private DataOutputStream out_jige;
private DataOutputStream out_bujige;
public MyMutiplePathRecordWriter(FileSystem fs) throws Exception {
//建立系統輸出流
out_jige = fs.create(new Path("E:\\bigdata\\cs\\jige\\my_output_jige.txt"));
out_bujige = fs.create(new Path("E:\\bigdata\\cs\\bujige\\my_output_bujige.txt"));
}
/**
* 實現寫出方法,根據需要寫出的格式自定義
*/
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
//接受到的key格式為:course + "\t" + name + "\t" + avgScore
String keyStr = key.toString();
String[] split = keyStr.split("\t");
//獲取到平均分欄位
double score = Double.parseDouble(split[2]);
//沒一行資料加入個換行符
byte[] bytes = (keyStr + "\n").getBytes();
//如果平均分大於60就用DataOutputStream寫出到jige目錄
if(score >= 60){
out_jige.write(bytes, 0, bytes.length);
}else{//小於60分的寫道bujige目錄
out_bujige.write(bytes, 0, bytes.length);
}
}
/**
* 在close方法中關閉輸出流。
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeQuietly(out_jige);
IOUtils.closeQuietly(out_bujige);
}
}