1. 程式人生 > >MapReduce啟動的Map/Reduce子任務簡要分析

MapReduce啟動的Map/Reduce子任務簡要分析

對於Hadoop來說,是通過在DataNode中啟動Map/Reduce java程序的方式來實現分散式計算處理的,那麼就從原始碼層簡要分析一下hadoop中啟動Map/Reduce任務的過程。 首先,對於Map/Reduce端啟動的任務,都是通過一些引數來控制java opts的,mapreduce.map.java.opts,mapreduce.reduce.java.opts,這些引數都在MRJobConfig類中,拿map.java.opts舉例來說,org.apache.hadoop.mapred.MapReduceChildJVM類中使用了這個引數,用來構造Map或Reduce端的JVM,在下面方法中拿到了ChildJavaOpts:
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask)
  如果是MapTask,JavaOpts的獲得具體方法內容如下(Reduce端邏輯基本一致):
if (isMapTask) {
      userClasspath =
          jobConf.get(
              JobConf.MAPRED_MAP_TASK_JAVA_OPTS,
              jobConf.get(
                  JobConf.MAPRED_TASK_JAVA_OPTS,
                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
          );
      adminClasspath =
          jobConf.get(
              MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);

    // Add admin classpath first so it can be overridden by user.
    return adminClasspath + " " + userClasspath;
  userClassPath按照下面的引數順序獲得:
mapreduce.map.java.opts, 
mapred.child.java.opts, 
DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m”;
  adminClassPath要獲得的引數的基本順序:
mapreduce.admin.map.child.java.opts,
DEFAULT_MAPRED_ADMIN_JAVA_OPTS ="-Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN ";
  最後,由於在JVM啟動的引數中,後面能夠覆蓋掉前面的,因此userClassPath的同個選項的設定是可以覆蓋adminClassPath,adminClassPath是不能保證所有引數不能被覆蓋。 從原始碼端向上走,跳轉到方法:
public static List<String> getVMCommand(InetSocketAddress taskAttemptListenerAddr, Task task, ID jvmID)
  從這個方法中可以看到一個隱藏的設定,如果在選項中使用了@[email protected],是可以被替換成具體的attemptID的。
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
javaOpts = javaOpts.replace("@[email protected]", attemptID.toString());
  程式碼中也以GC為例子,在註釋中舉例說明用法:    
 // Add child (task) java-vm options.
    //
    // The following symbols if present in mapred.{map|reduce}.child.java.opts
    // value are replaced:
    // + @
[email protected]
is interpolated with value of TaskID. // Other occurrences of @ will not be altered. // // Example with multiple arguments and substitutions, showing // jvm GC logging, and start of a passwordless JVM JMX agent so can // connect with jconsole and the likes to watch child memory, threads // and get thread dumps. // // <property> // <name>mapred.map.child.java.opts</name> // <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@[email protected] \ // -Dcom.sun.management.jmxremote.authenticate=false \ // -Dcom.sun.management.jmxremote.ssl=false \ // </value> // </property> // // <property> // <name>mapred.reduce.child.java.opts</name> // <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@[email protected] \ // -Dcom.sun.management.jmxremote.authenticate=false \ // -Dcom.sun.management.jmxremote.ssl=false \ // </value> // </property> //
  一般情況下,我們在列印gc日誌時,需要用-Xloggc:指定具體的目錄,但是在MapReduce任務中你無法指定,因為可能兩個Map會在同一臺機器上執行,那樣就肯定會發生gc檔案覆蓋,而實用@[email protected]就可以避免這個問題。同樣適用於OOM導致的堆疊列印避免檔案意外被覆蓋。 還會預設增加一個引數,用來設定java的臨時資料夾(所有臨時檔案的建立都在這個資料夾,比如File.createTempFile):
    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
  主類為:
org.apache.hadoop.mapred.YarnChild
  一個YarnChild程序的最終完整命令為:
/usr/java/jdk1.7.0_11//bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx2048M -Djava.io.tmpdir=/home/data5/hdfsdir/nm-local-dir/usercache/xxx/appcache/application_1413206225298_36914/container_1413206225298_36914_01_000098/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/home/workspace/hadoop/logs/userlogs/application_1413206225298_36914/container_1413206225298_36914_01_000098 -Dyarn.app.container.log.filesize=209715200 -Dhadoop.root.logger=INFO,CLA org.apache.hadoop.mapred.YarnChild 192.168.7.26 21298 attempt_1413206225298_36914_r_000001_0 98
  從原始碼端來分析該命令是如何生成的:
  • $JAVA_HOME: /usr/java/jdk1.7.0_11/
  • 原始碼中寫死: /bin/java
  • mapreduce.admin.map.child.java.opts:如果不設定,使用-Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN;
  • mapreduce.map.java.opts: Xmx2048M;
  • 原始碼中新增:-Djava.io.tmpdir=/home/data5/hdfsdir/nm-local-dir/usercache/tong/appcache/application_1413206225298_36914/container_1413206225298_36914_01_000098/tmp;
  • org.apache.hadoop.mapred.MapReduceChildJVM.setLog4jProperties中設定log4j,包括日誌級別,日誌大小等:-Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/home/workspace/hadoop/logs/userlogs/application_1413206225298_36914/container_1413206225298_36914_01_000098 -Dyarn.app.container.log.filesize=209715200 -Dhadoop.root.logger=INFO,CLA;
  • 主類:org.apache.hadoop.mapred.YarnChild;
  • TaskAttempt的主機地址:192.168.7.xx;
  • TaskAttempt的主機埠:212xx;
  • TaskAttempt ID:attempt_1413206225298_36914_r_000001_0;
  • JVMID:98(這是幹啥的不太清楚);
最後將程序的正常和異常輸出重定向:
// Finally add the jvmID
    vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
    vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));