1. 程式人生 > >Hadoop Mapreduce 工作機制

Hadoop Mapreduce 工作機制

mat 自定義 rabl one 溢出 實現原理 form rri kvm

一.Mapreduce 中的Combiner

技術分享圖片

技術分享圖片

package com.gec.demo;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    
private IntWritable sum=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get(); } sum.set(count); context.write(key,sum); } }

在job類中聲明如下:

技術分享圖片

技術分享圖片

二.MapTask工作機制

主要的核心類:

讀:
FileInputFormat
TextInputFormat
createRecordReader
LineRecordReader
nextKeyValue

寫:
context.write
RecordWriter.write(k,value)
NewOutputCollector.write(key,value)
MapOutputCollector.collect(key,value,partitions)
MapOutputBuffer.collect(key,value,partitions)

技術分享圖片

核心map輸出源代碼分析類
NewOutputCollector類
構造器:
實例化MapOutputBuffer對象
調用MapOutputBuffer對象init方法
將MapOutputBuffer對象賦值給collector對象
解決分區值問題
//如果沒有自定義分區類,則默認使用HashPartitioner
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);

環形緩沖區實現原理

MapOutputBuffer實現緩沖區的核心實現

技術分享圖片

在這一頭存儲key和value,key和value依次排列,而那一頭存儲索引,向中間出發,當儲存的空間占比百分之八十的時候,則溢出,兩者的方向改變,分別開始從另外一頭開始存儲

技術分享圖片

如上圖,從赤道分別向不同方向出發

技術分享圖片

如上圖,到達溢出時,產生新赤道,又分別從新赤道往回走

init方法
1、分配溢出比
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
2、分配環形緩存區的大小
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
3、實例化快排對象
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
4、定義環形緩存區數組
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
5、開始化赤道
setEquator(0);
6、獲取key的比較器對象
comparator = job.getOutputKeyComparator();

7、是否定義combineCollector對象

8、啟動spillThread線程,監聽溢出比,觸發此 sortAndSpill()

技術分享圖片

技術分享圖片


對接Reducer類的方法:
reduce(Text key, Iterable<IntWritable> values, Context context)

ReduceTask工作機制

技術分享圖片

三.shuffer

shuffer緩存流程

技術分享圖片

技術分享圖片

Hadoop Mapreduce 工作機制