1. 程式人生 > >reduce個數到底和哪些因素有關

reduce個數到底和哪些因素有關

reduce的數目到底和哪些因素有關

1、我們知道map的數量和檔案數、檔案大小、塊大小、以及split大小有關,而reduce的數量跟哪些因素有關呢?

 設定mapred.tasktracker.reduce.tasks.maximum的大小可以決定單個tasktracker一次性啟動reduce的數目,但是不能決定總的reduce數目。

  conf.setNumReduceTasks(4);JobConf物件的這個方法可以用來設定總的reduce的數目,看下Job Counters的統計:

	Job Counters 
		Data-local map tasks=2
		Total time spent by all maps waiting after reserving slots (ms)=0
		Total time spent by all reduces waiting after reserving slots (ms)=0
		SLOTS_MILLIS_MAPS=10695
		SLOTS_MILLIS_REDUCES=29502
		Launched map tasks=2
		Launched reduce tasks=4

 確實啟動了4個reduce:看下輸出:

[email protected]:~/IdeaProjects/test/build/classes$ hadoop fs -ls  /user/diegoball/join_ou1123
11/03/25 15:28:45 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
11/03/25 15:28:45 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/_SUCCESS
-rw-r--r--   1 diegoball supergroup        124 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00000
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00001
-rw-r--r--   1 diegoball supergroup        214 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00003

 只有2個reduce在幹活。為什麼呢?

shuffle的過程,需要根據key的值決定將這條<K,V> (map的輸出),送到哪一個reduce中去。送到哪一個reduce中去靠呼叫預設的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法來實現。
HashPartitioner類:

package org.apache.hadoop.mapred.lib;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.JobConf;

/** Partition keys by their {@link Object#hashCode()}. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

 numReduceTasks的值在JobConf中可以設定。預設的是1:顯然太小。
   這也是為什麼預設的設定中總啟動一個reduce的原因。

   返回與運算的結果和numReduceTasks求餘。

   Mapreduce根據這個返回結果決定將這條<K,V>,送到哪一個reduce中去。

key傳入的是LongWritable型別,看下這個LongWritable類的hashcode()方法:

 public int hashCode() {
    return (int)value;
  }

 簡簡單單的返回了原值的整型值。

 因為getPartition(K2 key, V2 value,int numReduceTask)返回的結果只有2個不同的值,所以最終只有2個reduce在幹活。
 

 HashPartitioner是預設的partition類,我們也可以自定義partition類 :

 package com.alipay.dw.test;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;

/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-10
 * Time: 下午5:26
 * To change this template use File | Settings | File Templates.
 */
public class MyPartitioner implements Partitioner<IntWritable, IntWritable> {
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
        int nbOccurences = key.get();
        if (nbOccurences > 20051210)
            return 0;
        else
            return 1;
    }

    public void configure(JobConf arg0) {

    }
}

 僅僅需要覆蓋getPartition()方法就OK。通過:
conf.setPartitionerClass(MyPartitioner.class);
可以設定自定義的partition類。
同樣由於之返回2個不同的值0,1,不管conf.setNumReduceTasks(4);設定多少個reduce,也同樣只會有2個reduce在幹活。

由於每個reduce的輸出key都是經過排序的,上述自定義的Partitioner還可以達到排序結果集的目的:

11/03/25 15:24:49 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/_SUCCESS
-rw-r--r--   1 diegoball supergroup      24546 2011-03-25 15:23 /user/diegoball/opt.del/part-00000
-rw-r--r--   1 diegoball supergroup      10241 2011-03-25 15:23 /user/diegoball/opt.del/part-00001
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00003

 part-00000和part-00001是這2個reduce的輸出,由於使用了自定義的MyPartitioner,所有key小於20051210的的<K,V>都會放到第一個reduce中處理,key大於20051210就會被放到第二個reduce中處理。
每個reduce的輸出key又是經過key排序的,所以最終的結果集降序排列。


但是如果使用上面自定義的partition類,又conf.setNumReduceTasks(1)的話,會怎樣? 看下Job Counters:

	Job Counters 
		Data-local map tasks=2
		Total time spent by all maps waiting after reserving slots (ms)=0
		Total time spent by all reduces waiting after reserving slots (ms)=0
		SLOTS_MILLIS_MAPS=16395
		SLOTS_MILLIS_REDUCES=3512
		Launched map tasks=2
		Launched reduce tasks=1

  只啟動了一個reduce。
  (1)、 當setNumReduceTasks( int a) a=1(即預設值),不管Partitioner返回不同值的個數b為多少,只啟動1個reduce,這種情況下自定義的Partitioner類沒有起到任何作用。
  (2)、 若a!=1:
   a、當setNumReduceTasks( int a)裡 a設定小於Partitioner返回不同值的個數b的話:

    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
        int nbOccurences = key.get();
        if (nbOccurences < 20051210)
            return 0;
        if (nbOccurences >= 20051210 && nbOccurences < 20061210)
            return 1;
        if (nbOccurences >= 20061210 && nbOccurences < 20081210)
            return 2;
        else
            return 3;
    }

  同時設定setNumReduceTasks( 2)。

 於是丟擲異常:

  11/03/25 17:03:41 INFO mapreduce.Job: Task Id : attempt_201103241018_0023_m_000000_1, Status : FAILED
java.io.IOException: Illegal partition for 20110116 (3)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:900)
	at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:508)
	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:397)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
	at org.apache.hadoop.mapred.Child.main(Child.java:211) 

 某些key沒有找到所對應的reduce去處。原因是隻啟動了a個reduce。
 
   b、當setNumReduceTasks( int a)裡 a設定大於Partitioner返回不同值的個數b的話,同樣會啟動a個reduce,但是隻有b個redurce上會得到資料。啟動的其他的a-b個reduce浪費了。

   c、理想狀況是a=b,這樣可以合理利用資源,負載更均衡。

相關推薦

reduce個數到底哪些因素有關

reduce的數目到底和哪些因素有關 1、我們知道map的數量和檔案數、檔案大小、塊大小、以及split大小有關,而reduce的數量跟哪些因素有關呢?  設定mapred.tasktracker.reduce.tasks.maximum的大小可以決定單個tasktra

大資料08-reduce task個數到底哪些因素有關

1、我們知道map的數量和檔案數、檔案大小、塊大小、以及split大小有關,而reduce的數量跟哪些因素有關呢? 設定mapred.tasktracker.reduce.tasks.maximum的大小可以決定單個tasktracker一次性啟動reduce的數目,但是不能決定總的reduce

mapreduce中mapreduce個數

case when 生成 task 輸入 slots align reducer 進行 很多 一、 控制hive任務中的map數: 1. 通常情況下,作業會通過input的目錄產生一個或者多個map任務。 主要的決定因素有: input的文件總個數,input的

MapReduce個數設定問題

看了Hadoop的API和官方文件,加上自己原創,大致分析如下: map和reduce是hadoop的核心功能,hadoop正是通過多個map和reduce的並行執行來實現任務的分散式平行計算,從這個觀點來看,如果將map和reduce的數量設定為1,那麼使用者的任務就

Hadoop MapReduce Job效能調優——修改MapReduce個數

MapReduce框架將檔案分為多個splits,併為每個splits建立一個Mapper,所以Mappers的個數直接由splits的數目決定。而Reducers的數目可以通過job.setNumReduceTasks()函式設定,預設情況只有一個Reducer。在真正的叢集環境下,如果預設,那麼所有的中

如何確定 Hadoop mapreduce個數--mapreduce數量之間的關係是什麼?

閱讀本文可以帶著下面問題:1.map和reduce的數量過多會導致什麼情況?2.Reduce可以通過什麼設定來增加任務個數?3.一個task的map數量由誰來決定?4.一個task的reduce數量由誰來決定?一般情況下,在輸入源是檔案的時候,一個task的map數量由splitSize來決定的,那麼spli

Hadoop MapReduce Job效能調優——MapReduce個數

 map task的數量即mapred.map.tasks的引數值,使用者不能直接設定這個引數。Input Split的大小,決定了一個Job擁有多少個map。預設input split的大小是64M(與dfs.block.size的預設值相同)。然而,如果輸入的資料量

hadoop中每個節點mapreduce個數的設定調優

hadoop中每個節點map和reduce個數的設定調優 2012-02-21 14:40:32|  分類:舉報|字號訂閱 map red.tasktracker.map.tasks.maximum 這個是一個task tracker中可同時執行的map的最大個數,預設值

MapReduce 個數的設定 (Hive優化)經典

1.    通常情況下,作業會通過input的目錄產生一個或者多個map任務。 主要的決定因素有: input的檔案總個數,input的檔案大小,叢集設定的檔案塊大小(目前為128M, 可在hive中通過set dfs.block.size;命令檢視到,該引數不能自定義修改);2.    舉例: a)   

軟件架構的時間情感因素

架構;時間;情感質量屬性是軟件架構的一個重要方面,而質量屬性中也包含了時間和情感的因素。公元前1世紀,古羅馬禦用工程師、建築師Marcus Vitruvius Pollio在其《建築十書》中最早提出了建築的三要素“堅固、實用、美觀”。英文的表述為Firmitas,Utilitas, Venustas,通俗的說

哪些因素可以反映出北京手機App軟件開發公司的實力?

業務 免費 工作量 信任 nbsp 情況 發布 最終 手機app 機遇往往都是伴隨著挑戰,現在手機App開發公司層次不齊,很多公司在開發中遇到這樣的問題,App不穩定、出現BUG(閃退)、工期不能保證、無後期維護、收了款不知所蹤...... 客戶面對諸多的手機軟件開

大盤指數受哪些因素影響

相對 我們 baidu 專家 amp font 沒有 貨幣 心理學 大體上分為4個方面。第一個我感覺對一般股民影響最大的,就是國家政策。其中有分為財政政策和貨幣政策。財政政策比如有“一二五規劃”,房產限購等限制房政策等。貨幣政策就是加息或者增加銀行儲備金率等。因為大體出了這

寫一個帶文本菜單的程序,菜單項如下 (1) 取五個數 (2) 取五個數的平均值 (X) 退出。

highlight 五個 div true while blog inpu input 重新 問題: 寫一個帶文本菜單的程序,菜單項如下(1) 取五個數的和 (2) 取五個數的平均值(X) 退出。由用戶做一個選擇, 然後執行相應的功能。當用戶選擇退出時程

目前最新版本ActiveMQ 5.15.3 JDK版本有關的問題

有關 pac min cto 分享圖片 官網 src img lang java.lang.UnsupportedClassVersionError: org/apache/activemq/ActiveMQConnectionFactory : Unsupported

abstract不能哪些關鍵字共存

abstract 有意義 強制 str args 不讓 方法體 system 沒有 A:面試題1 一個抽象類如果沒有抽象方法,可不可以定義為抽象類?如果可以,有什麽意義? 可以 這麽做目的只有一個,就是不讓其他類創建本類對象,交給子類完成 B:面試題2 abstra

abstract不能哪些關鍵字共存 學習

類重寫 AC void print private 類對象 子類 final 有意 /* A:面試題1 一個抽象類如果沒有抽象方法,可不可以定義為抽象類?如果可以,有什麽意義? 可以 這麽做目的只有一個,就是不讓其他類創建本類對象,交給子類完成 B:面試題2 abs

個數組中兩個數為N,找出這兩個數字的下標

完整 進行 代碼 ray 由於 比較 返回 put else 分析,兩個數字的和為N。那麽這兩個數字是否是唯一的呢?輸出的下標是否是第一對出現的呢? 1,我們假設這兩個數字是唯一的 和是唯一的,那麽其中一個數字越大,另一個數字就越小。想到大小關系,我們就想到了排序。那麽首先

第四課:YarnMap/Reduce配置啟動原理講解

data 程序 jar sheet 編輯 datanode 示例 數據 說過 前三節課主要講了hdfs,hdfs就是一個分魚展的大硬盤 分:分塊 魚:冗余 展:動態擴展 接下來講雲計算,也可以理解為分布式計算,其設計原則: 移動計算,而不是移動數據 前面說過,hadoo

Newcoder 39 A.約數個數(水~)

Description 給個 n n n,求

個數另一個數組比較,如果第一個數組裡面有的就不添加了

可以把兩個陣列先合併成一個數組然後再去重複項 比如兩個陣列分明是    arry1,arry2.    把arry2中和arry1不同的項新增進來。   var  arry1=[{},{},{}];