1. 程式人生 > >Hadoop動態調整Map Task記憶體資源大小

Hadoop動態調整Map Task記憶體資源大小

前言

我們都知道,在Hadoop中,一個Job的執行需要轉化成1個個的Task去執行,在Task中,有會有2個型別,一個為Map Task,另一個就是Reduce Task.當然,這不是最底層的級別,在Task內部,還可以再分為TaskAttempt,叫做任務嘗試,任務嘗試姑且不在本篇文章的論述範圍內.OK,針對每個Task,他當然會有他的資源使用量,廣義的來講,資源分為2個概念,1個是Memory 記憶體,另一個是Vcores,虛擬核數.這些資源的分配情況非常的關鍵,因為資源分少了,可能空閒叢集資源浪費了,又可能會導致oom記憶體不夠用的問題,假設你記憶體分小了,既然這樣,那我們把資源調大了是不就行了,當然會導致一個狼多羊少的問題,畢竟資源有限,你用的多了,別人就會用的少了.所以這裡就會衍生出一個問題,對於Job中的每個Task,我該如何去設定可使用的資源量呢,採用預設統一的map.memory.mb這樣的配置顯然不是一個好的解決辦法,其實讓人可以馬上聯想到的辦法就是能夠資料量的大小動態調整分配的資源量,這無疑是最棒的方案,下面就來簡單的聊聊這個方案.

資源調配的指標

什麼是資源調配的指標,通俗的講就是一個資源分配的參考值,依照這個值,我可以進行資源的動態分配,這也是非常符合正常邏輯思維的方式的.在這裡的標準值就是處理資料量的大小,所以調整的目標物件就是Map Task而不是Reduce Task.那麼這個資料可以怎麼拿到呢,稍微瞭解過Hadoop的人一定知道map的過程是如何拿到資料的,簡單的就是從inputSplit中拿到資料,而這個inputSplit當然會被保留在Map Task中.就是下面所示的程式碼中:

@SuppressWarnings("rawtypes")
public class MapTaskAttemptImpl extends TaskAttemptImpl {

  private final TaskSplitMetaInfo splitInfo;

  public MapTaskAttemptImpl(TaskId taskId, int attempt, 
      EventHandler eventHandler, Path jobFile, 
      int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
      TaskAttemptListener taskAttemptListener, 
      Token<JobTokenIdentifier> jobToken,
      Credentials credentials, Clock clock,
      AppContext appContext) {
    super(taskId, attempt, eventHandler, 
        taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
        jobToken, credentials, clock, appContext);
    this.splitInfo = splitInfo;
  }
在這個TaskSplitMetaInfo中就會有輸入資料長度的一個變數
  /**
   * This represents the meta information about the task split that the 
   * JobTracker creates
   */
  public static class TaskSplitMetaInfo {
    private TaskSplitIndex splitIndex;
    private long inputDataLength;
    private String[] locations;
    public TaskSplitMetaInfo(){
      this.splitIndex = new TaskSplitIndex();
      this.locations = new String[0];
    }
...
後面我們就用到這個關鍵變數.

需要調整的資源變數

上文中已經提過,目標資源調整的維度有2個,1個是記憶體,還有1個核數,資料量的大小一般會直接與記憶體大小相關聯,核數偏重於處理速度,所以我們應該調整的task的記憶體大小,也就是map.memory.mb這個配置項的值.這個配置項的預設值是1024M,就是下面這個配置:

public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
  public static final int DEFAULT_MAP_MEMORY_MB = 1024;
如果你在配置檔案中沒配的話,他走的就是預設值,個人感覺這個值還是有點偏大的,如果一個Job一不小心起了1000多個Task,那麼1T的記憶體就用掉了.OK,下面看下這個變數是儲存在哪個變數中的呢,進入到TaskAttemptImpl中你就可以發現了.
/**
 * Implementation of TaskAttempt interface.
 */
@SuppressWarnings({ "rawtypes" })
public abstract class TaskAttemptImpl implements
    org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
      EventHandler<TaskAttemptEvent> {

  static final Counters EMPTY_COUNTERS = new Counters();
  private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
  private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

  protected final JobConf conf;
  protected final Path jobFile;
  protected final int partition;
  protected EventHandler eventHandler;
  private final TaskAttemptId attemptId;
  private final Clock clock;
  private final org.apache.hadoop.mapred.JobID oldJobId;
  private final TaskAttemptListener taskAttemptListener;
  private final Resource resourceCapability;
  protected Set<String> dataLocalHosts;
...
就是上面這個resourceCapability,在這裡就會有核數和記憶體2個資源指標.
  @Public
  @Stable
  public static Resource newInstance(int memory, int vCores) {
    Resource resource = Records.newRecord(Resource.class);
    resource.setMemory(memory);
    resource.setVirtualCores(vCores);
    return resource;
  }

  /**
   * Get <em>memory</em> of the resource.
   * @return <em>memory</em> of the resource
   */
  @Public
  @Stable
  public abstract int getMemory();
  
  /**
   * Set <em>memory</em> of the resource.
   * @param memory <em>memory</em> of the resource
   */
  @Public
  @Stable
  public abstract void setMemory(int memory);
到時就可以在這邊進行設定.

動態調整Map TaskAttempt記憶體大小

剛剛上文中已經提到過,想要根據處理資料量的大小來調整Map的記憶體大小,首先你要有一個標準值,比如1個G的資料量對應1個G的記憶體值,然後如果你這次來了512M的資料,那麼我就配512/1024(就是1個G)*1024M=512M,所以我最後分的記憶體就是512M,如果資料量大了,同理.這個標準值當然要做出可配,有使用方來決定.姑且用下面新的配置值來定義:

  public static final String MAP_MEMORY_MB_AUTOSET_ENABLED = "map.memory-autoset.enabled";
  public static final String DEFAULT_MEMORY_MB_AUTOSET_ENABLED = "false";

  public static final String MAP_UNIT_INPUT_LENGTH = "map.unit-input.length";
  public static final int DEFAULT_MAP_UNIT_INPUT_LENGTH = 1024 * 1024 * 1024;
然後在Map TaskAttempt中加上動態調整方法,如果你開啟了此項新功能,則會執行方法中的部分操作.
public class MapTaskAttemptImpl extends TaskAttemptImpl {

  private final TaskSplitMetaInfo splitInfo;

  public MapTaskAttemptImpl(TaskId taskId, int attempt, 
      EventHandler eventHandler, Path jobFile, 
      int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
      TaskAttemptListener taskAttemptListener, 
      Token<JobTokenIdentifier> jobToken,
      Credentials credentials, Clock clock,
      AppContext appContext) {
    super(taskId, attempt, eventHandler, 
        taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
        jobToken, credentials, clock, appContext);

    this.splitInfo = splitInfo;
    autoSetMemorySize();
  }
...

  private void autoSetMemorySize() {
    int memory;
    int unitInputLength;
    int unitMemorySize;
    boolean isMemoryAutoSetEnabled;
    Resource resourceCapacity;

    isMemoryAutoSetEnabled =
        Boolean.parseBoolean(conf.get(
            MRJobConfig.MAP_MEMORY_MB_AUTOSET_ENABLED,
            MRJobConfig.DEFAULT_MEMORY_MB_AUTOSET_ENABLED));
    //判斷是否開啟動態調整記憶體功能
    if (isMemoryAutoSetEnabled) {
      unitInputLength =
          conf.getInt(MRJobConfig.MAP_UNIT_INPUT_LENGTH,
              MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH);
      unitMemorySize =
          conf.getInt(MRJobConfig.MAP_MEMORY_MB,
              MRJobConfig.DEFAULT_MAP_MEMORY_MB);

      memory =
          (int) (Math.ceil(1.0 * splitInfo.getInputDataLength()
              / unitInputLength) * unitMemorySize);
    } else {
      memory =
          conf.getInt(MRJobConfig.MAP_MEMORY_MB,
              MRJobConfig.DEFAULT_MAP_MEMORY_MB);
    }

    //調整記憶體資源量
    resourceCapacity = getResourceCapability();
    resourceCapacity.setMemory(memory);
  }
在這裡我做了特別處理,為了使分配的記憶體大小符合2的冪次方,我用了向上取整的方法算倍數,這樣規範化一些.下面是單元測試案例
  @Test
  public void testMapTaskAttemptMemoryAutoSet() throws Exception {
    int memorySize;
    int adjustedMemorySize;
    Resource resourceCapacity;

    EventHandler eventHandler = mock(EventHandler.class);
    String[] hosts = new String[3];
    hosts[0] = "host1";
    hosts[1] = "host2";
    hosts[2] = "host3";
    TaskSplitMetaInfo splitInfo =
        new TaskSplitMetaInfo(hosts, 0, 2 * 1024 * 1024 * 1024l);

    TaskAttemptImpl mockTaskAttempt =
        createMapTaskAttemptImplForTest(eventHandler, splitInfo);

    resourceCapacity = mockTaskAttempt.getResourceCapability();
    memorySize = resourceCapacity.getMemory();

    // Disable the auto-set memorySize function
    // memorySize will be equal to default size
    assertEquals(MRJobConfig.DEFAULT_MAP_MEMORY_MB, memorySize);

    // Enable the auto-set memory function
    Clock clock = new SystemClock();
    ApplicationId appId = ApplicationId.newInstance(1, 1);
    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
    Path jobFile = mock(Path.class);
    JobConf jobConf = new JobConf();
    jobConf.set(MRJobConfig.MAP_MEMORY_MB_AUTOSET_ENABLED, "true");
    jobConf.set(MRJobConfig.MAP_MEMORY_MB,
        String.valueOf(MRJobConfig.DEFAULT_MAP_MEMORY_MB));
    jobConf.set(MRJobConfig.MAP_UNIT_INPUT_LENGTH,
        String.valueOf(MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH));

    TaskAttemptImpl taImpl =
        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splitInfo,
            jobConf, taListener, null, null, clock, null);

    resourceCapacity = taImpl.getResourceCapability();
    memorySize = resourceCapacity.getMemory();
    adjustedMemorySize =
        (int) (Math.ceil(1.0 * splitInfo.getInputDataLength()
            / MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH) * MRJobConfig.DEFAULT_MAP_MEMORY_MB);

    // Enable the auto-set function,the memorySize will be changed
    assertEquals(adjustedMemorySize, memorySize);
  }
測試已通過.這只是一個小小的解決方案,還沒有對Hadoop的其他元件造成的影響做評估,希望給大家帶來一些好的想法.此功能我已經向開源社群提交了patch,,後面會給出連結.

相關連結

相關推薦

Hadoop動態調整Map Task記憶體資源大小

前言我們都知道,在Hadoop中,一個Job的執行需要轉化成1個個的Task去執行,在Task中,有會有2個型別,一個為Map Task,另一個就是Reduce Task.當然,這不是最底層的級別,在Task內部,還可以再分為TaskAttempt,叫做任務嘗試,任務嘗試姑且

linux ps 按程序消耗記憶體資源大小排序

linux ps 關於sort的解釋 --sort spec specify sorting order. Sorting syntax is [+|-]key[,[+|-]key[,...]] Choose a multi-letter key from the STA

MFC控件自適應大小(EASYSIZE動態調整控件位置、大小

ESS ott .cpp title 指定 () baidu ali 同時 MFC控件自適應大小(EASYSIZE動態調整控件位置、大小) 轉自 https://www.cplusplus.me/1178.html 近日在code project網站瀏覽時,看到一篇關於

記錄:動態調整div大小

vco wid ont border 動態 space 元素 contain spa *{box-sizing:border-box;}此情,容器元素css屬性必含overflow:hidden,不然會出問題 $(window).resize(()=>{ $(‘#wi

ResizingArrayStack 動態調整陣列大小

package com.arithmetic; import java.util.Iterator; import java.util.NoSuchElementException; // 能夠動態調整陣列大小的實現 public class ResizingArrayStack<Item

【Unity】Unity資源池的動態載入釋放和記憶體優化處理

需求環境         在上一級的【解決方案】文章中,我們設計出了動態載入資源的業務流程,而這一節,我們就通過一些簡單的程式碼,來實現出業務流程中的效果。        吸取之前文章的經驗,如

必須要注意的 C++ 動態記憶體資源管理(六)——vector的簡單實現

十六.myVector分析         我們知道,vector類將其元素存放在連續的記憶體中。為了獲得可接受的效能,vetor預先分配足夠大的記憶體來儲存可能需要的更多元素。vector的每個新增元素的成員函式會檢查是否有空間容納更多的元素。如果有,成員

Eclipse中檢視及調整JVM堆記憶體大小

一、檢視堆記憶體 設定路徑:Windows > Pregerences > General 勾選右側視窗的show heap status一項 點選 ok,即可在eclipse右下方看到實時的heap情況 二、調整堆記憶體 進入ecli

Hadoop Map/Reduce記憶體限制

如何設定hadoop  Map/Reduce任務的記憶體限制? Parameter Type Meaning mapred.cluster.map.memory.mb set by admin, cluster-wide Cluster definitio

hadoop輸入分片計算(Map Task個數的確定)

1 public List<InputSplit> getSplits(JobContext job 2 ) throws IOException { 3 //getFormatMinSplitSize():始終返回1 4 //getMinSplitSize(job):獲取” mapre

Hadoop執行因為container的記憶體大小而拋錯

本文轉載自:http://blog.chinaunix.net/uid-25691489-id-5587957.html Hadoop執行中丟擲如下異常: <span style="font-size:18px;">Container [pid=41355,co

hadoop 分片與分塊,map task和reduce task的理解

 分塊:Block   HDFS儲存系統中,引入了檔案系統的分塊概念(block),塊是儲存的最小單位,HDFS定義其大小為64MB。與單磁碟檔案系統相似,儲存在 HDFS上的檔案均儲存為多個塊,不同的是,如果某檔案大小沒有到達64MB,該檔案也不會佔據整個塊空間。在分

hadoop記憶體heap大小的配置引數

1、tasktracker的heapsize的大小設定 <property>   <name>mapred.child.java.opts</name>   <value>-Xmx200m</value>   &l

Yarn調優之調整ResourceManager堆記憶體大小

ResourceManager掛了。檢視到active的ResourceManager日誌有如下內容: java.lang.OutOfMemoryError: Java heap space 故障的原因是RM的堆記憶體空間size不夠了。 檢視到活躍節點RM的最

動態程式碼設定Textview的字型大小,引用Dimen資源

xml檔案設定Textview字型大小,可直接設定android:textSize="@dimen/common_font_sw320dp_of_13" 但是通過動態程式碼設定時,通常會遇到引用Dim

Delphi 動態調整印表機紙張大小

資訊管理系統中經常要提供各種列印功能,例如報表列印、憑證列印以及發票列印。在這些列印過程中所需要紙張的大小往往是不一致的,例如,列印報表有可能使用A4 紙或A3 紙,列印憑證或發票可能需要將列印紙張設定成自定義大小。如果在同一臺印表機上列印這些內容,那麼就應該針對不同的列印內

Hadoop Yarn記憶體資源隔離實現原理——基於執行緒監控的記憶體隔離方案

注:本文以hadoop-2.5.0-cdh5.3.2為例進行說明。 Hadoop Yarn的資源隔離是指為執行著不同任務的“Container”提供可獨立使用的計算資源,以避免它們之間相互干擾。目前支援兩種型別的資源隔離:CPU和記憶體,對於這兩種型別的資源,Yarn

Tomcat調整java jvm記憶體大小

原因:因為本人做的專案出現如下異常 java.lang.OutOfMemoryError: GC overhead limit exceeded 所以嘗試調整tomcat記憶體,是否能夠解決問題不清楚,記錄下調整tomcat記憶體的方法 ============================ 環境:wind

C#建立高質量(清晰)縮圖——動態調整大小

System.Drawing.Rectangle rectDestination = new System.Drawing.Rectangle(0, 0, thumbWidth, thumbHeight); gr.DrawI

必須要注意的 C++ 動態記憶體資源管理(一)——視資源為物件

一.前言         所謂資源就是,一旦你用了它,將來必須還給系統。如果不這樣,糟糕的事情就會發生。C++ 程式中最常見使用的資源就是動態分配記憶體(如果你分配了記憶體卻忘記歸還它,就會導致記憶