1. 程式人生 > >大資料08-reduce task個數到底和哪些因素有關

大資料08-reduce task個數到底和哪些因素有關

1、我們知道map的數量和檔案數、檔案大小、塊大小、以及split大小有關,而reduce的數量跟哪些因素有關呢?

設定mapred.tasktracker.reduce.tasks.maximum的大小可以決定單個tasktracker一次性啟動reduce的數目,但是不能決定總的reduce數目。

conf.setNumReduceTasks(4);JobConf物件的這個方法可以用來設定總的reduce的數目,看下Job Counters的統計:

Job Counters 
        Data-local map tasks=2
        Total time spent by all maps waiting after reserving slots (ms)=0
        Total time spent by all reduces waiting after reserving slots (ms)=0
        SLOTS_MILLIS_MAPS=10695
        SLOTS_MILLIS_REDUCES=29502
        Launched map tasks=2
        Launched reduce tasks=4

確實啟動了4個reduce:看下輸出:

[email protected]:~/IdeaProjects/test/build/classes$ hadoop fs -ls  /user/diegoball/join_ou1123
11/03/25 15:28:45 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
11/03/25 15:28:45 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/_SUCCESS
-rw-r--r--   1 diegoball supergroup        124 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00000
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00001
-rw-r--r--   1 diegoball supergroup        214 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00003

只有2個reduce在幹活。為什麼呢?

shuffle的過程,需要根據key的值決定將這條<K,V> (map的輸出),送到哪一個reduce中去。送到哪一個reduce中去靠呼叫預設的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法來實現。

HashPartitioner類:

package org.apache.hadoop.mapred.lib;
 
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.JobConf;
 
/** Partition keys by their {@link Object#hashCode()}. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
 
  public void configure(JobConf job) {}
 
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

numReduceTasks的值在JobConf中可以設定。預設的是1:顯然太小。
這也是為什麼預設的設定中總啟動一個reduce的原因。

返回與運算的結果和numReduceTasks求餘。

Mapreduce根據這個返回結果決定將這條<K,V>,送到哪一個reduce中去。

key傳入的是LongWritable型別,看下這個LongWritable類的hashcode()方法:

public int hashCode() {
    return (int)value;
  }

簡簡單單的返回了原值的整型值。

因為getPartition(K2 key, V2 value,int numReduceTask)返回的結果只有2個不同的值,所以最終只有2個reduce在幹活。

HashPartitioner是預設的partition類,我們也可以自定義partition類 :

package com.alipay.dw.test;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
 
/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-10
 * Time: 下午5:26
 * To change this template use File | Settings | File Templates.
 */
public class MyPartitioner implements Partitioner<IntWritable, IntWritable> {
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
        int nbOccurences = key.get();
        if (nbOccurences > 20051210)
            return 0;
        else
            return 1;
    }
 
    public void configure(JobConf arg0) {
 
    }
}

僅僅需要覆蓋getPartition()方法就OK。通過:
conf.setPartitionerClass(MyPartitioner.class);
可以設定自定義的partition類。
同樣由於之返回2個不同的值0,1,不管conf.setNumReduceTasks(4);設定多少個reduce,也同樣只會有2個reduce在幹活。

由於每個reduce的輸出key都是經過排序的,上述自定義的Partitioner還可以達到排序結果集的目的:

11/03/25 15:24:49 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/_SUCCESS
-rw-r--r--   1 diegoball supergroup      24546 2011-03-25 15:23 /user/diegoball/opt.del/part-00000
-rw-r--r--   1 diegoball supergroup      10241 2011-03-25 15:23 /user/diegoball/opt.del/part-00001
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00003

part-00000和part-00001是這2個reduce的輸出,由於使用了自定義的MyPartitioner,所有key小於20051210的的<K,V>都會放到第一個reduce中處理,key大於20051210就會被放到第二個reduce中處理。
每個reduce的輸出key又是經過key排序的,所以最終的結果集降序排列。

但是如果使用上面自定義的partition類,又conf.setNumReduceTasks(1)的話,會怎樣? 看下Job Counters:

Job Counters 
        Data-local map tasks=2
        Total time spent by all maps waiting after reserving slots (ms)=0
        Total time spent by all reduces waiting after reserving slots (ms)=0
        SLOTS_MILLIS_MAPS=16395
        SLOTS_MILLIS_REDUCES=3512
        Launched map tasks=2
        Launched reduce tasks=1

同時設定setNumReduceTasks( 2)。

於是丟擲異常:

11/03/25 17:03:41 INFO mapreduce.Job: Task Id : attempt_201103241018_0023_m_000000_1, Status : FAILED
java.io.IOException: Illegal partition for 20110116 (3)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:900)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:508)
    at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:397)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)

某些key沒有找到所對應的reduce去處。原因是隻啟動了a個reduce。

b、當setNumReduceTasks( int a)裡 a設定大於Partitioner返回不同值的個數b的話,同樣會啟動a個reduce,但是隻有b個redurce上會得到資料。啟動的其他的a-b個reduce浪費了。

c、理想狀況是a=b,這樣可以合理利用資源,負載更均衡。