1. 程式人生 > >hadoop2-MapReduce詳解

hadoop2-MapReduce詳解

本文是對Hadoop2.2.0版本的MapReduce進行詳細講解。請大家要注意版本,因為Hadoop的不同版本,原始碼可能是不同的。

以下是本文的大綱:

1.獲取原始碼
2.WordCount案例分析
3.客戶端原始碼分析
4.小結
5.Mapper詳解
  5.1.map輸入
  5.2.map輸出
  5.3.map小結
6.Reduce詳解
7.總結

若有不正之處,還請多多諒解,並希望批評指正。

請尊重作者勞動成果,轉發請標明blog地址

https://www.cnblogs.com/hongten/p/hongten_hadoop_mapreduce.html

 

1.獲取原始碼

大家可以下載Hbase

Hbase: hbase-0.98.9-hadoop2-bin.tar.gz

在裡面就包含了Hadoop2.2.0版本的jar檔案和原始碼。

 

2.WordCount案例分析

在做詳解之前,我們先來看一個例子,就是在一個檔案中有一下的內容

hello hongten 1
hello hongten 2
hello hongten 3
hello hongten 4
hello hongten 5
......
......

檔案中每一行包含一個hello,一個hongten,然後在每一行最後有一個數字,這個數字是遞增的。

我們要統計這個檔案裡面的單詞出現的次數(這個可以在網上找到很多相同的例子)

首先,我們要產生這個檔案,大家可以使用以下的java程式碼生成這個檔案

 1 import java.io.BufferedWriter;
 2 import java.io.File;
 3 import java.io.FileWriter;
 4 
 5 /**
 6  * @author Hongten
 7  * @created 11 Nov 2018
 8  */
 9 public class GenerateWord {
10 
11     public static void
main(String[] args) throws Exception { 12 13 double num = 12000000; 14 15 StringBuilder sb = new StringBuilder(); 16 for(int i=1;i<num;i++){ 17 sb.append("hello").append(" ").append("hongten").append(" ").append(i).append("\n"); 18 } 19 20 File writename = new File("/root/word.txt"); 21 writename.createNewFile(); 22 BufferedWriter out = new BufferedWriter(new FileWriter(writename)); 23 out.write(sb.toString()); 24 out.flush(); 25 out.close(); 26 System.out.println("done."); 27 } 28 }

 

進入Linux系統,編譯GenerateWord.java檔案

javac GenerateWord.java

編譯好了以後,會生成GenerateWord.class檔案,然後執行

java GenerateWord

等待一段時間....就會生成這個檔案了(大概252MB左右)。

 

接下來,我們來寫統計單詞的map,reduce,以及客戶端的實現。

專案結構

這裡總共有三個java檔案

客戶端

首先,我們需要定義Configuration和job,然後就是job的set操作,最後到job.waitForCompletion()方法,才觸發了動作的提交。

這裡可以理解為在客戶端,包含了一個配置分散式執行的相關配置資訊,最後提交動作。

 1 package com.b510.hongten.hadoop;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10 
11 /**
12  * @author Hongten
13  * @created 11 Nov 2018
14  */
15 public class WordCount {
16 
17     public static void main(String[] args) throws Exception {
18         //讀取配置檔案
19         Configuration conf = new Configuration();
20         //建立job
21         Job job = Job.getInstance(conf);
22 
23         // Create a new Job
24         job.setJarByClass(WordCount.class);
25 
26         // Specify various job-specific parameters
27         job.setJobName("wordcount");
28 
29         job.setMapperClass(MyMapper.class);
30         job.setMapOutputKeyClass(Text.class);
31         job.setMapOutputValueClass(IntWritable.class);
32 
33         job.setReducerClass(MyReducer.class);
34         job.setOutputKeyClass(Text.class);
35         job.setOutputValueClass(IntWritable.class);
36 
37         // job.setInputPath(new Path("/usr/input/wordcount"));
38         // job.setOutputPath(new Path("/usr/output/wordcount"));
39 
40         FileInputFormat.addInputPath(job, new Path("/usr/input/wordcount1"));
41 
42         Path output = new Path("/usr/output/wordcount");
43         if (output.getFileSystem(conf).exists(output)) {
44             output.getFileSystem(conf).delete(output, true);
45         }
46 
47         FileOutputFormat.setOutputPath(job, output);
48 
49         // Submit the job, then poll for progress until the job is complete
50         job.waitForCompletion(true);
51 
52     }
53 }

 

自定義的Mapper

 1 package com.b510.hongten.hadoop;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 
10 /**
11  * @author Hongten
12  * @created 11 Nov 2018
13  */
14 public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
15 
16     private final static IntWritable one = new IntWritable(1);
17     private Text word = new Text();
18 
19     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
20         StringTokenizer itr = new StringTokenizer(value.toString());
21         while (itr.hasMoreTokens()) {
22             word.set(itr.nextToken());
23             context.write(word, one);
24         }
25     }
26
27 }

 

自定義的Reduce

 1 package com.b510.hongten.hadoop;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 /**
10  * @author Hongten
11  * @created 11 Nov 2018
12  */
13 public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
14 
15     private IntWritable result = new IntWritable();
16 
17     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
18         int sum = 0;
19         for (IntWritable val : values) {
20             sum += val.get();
21         }
22         result.set(sum);
23         context.write(key, result);
24     }
25 
26 }

 

執行並檢視結果

cd /home/hadoop-2.5/bin/

--建立測試資料夾
./hdfs dfs -mkdir -p /usr/input/wordcount1

--把測試檔案放入測試資料夾
./hdfs dfs -put /root/word.txt /usr/input/wordcount1

--執行測試
./hadoop jar /root/wordcount.jar com.b510.hongten.hadoop.WordCount

--下載hdfs上面的檔案
./hdfs dfs -get /usr/output/wordcount/* ~/

--檢視檔案最後5行
tail -n5 /root/part-r-00000

 

執行結果

 

從yarn客戶端可以看到程式執行的時間長度

從11:47:46開始,到11:56:48結束,總共9min2s.(這是在我機器上面的虛擬機器裡面跑的結果,如果在真正的叢集裡面跑的話,應該要快很多)

資料條數:12000000-1條

 

3.客戶端原始碼分析

當我們在客戶端進行了分散式作業的配置後,最後執行

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

那麼在waiteForCompletion()方法裡面都做了些什麼事情呢?

//我們傳遞的verbose=true
public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
        //提交動作
      submit();
    }
    //verbose=true
    if (verbose) {
        //監控並且列印job的相關資訊
        //在客戶端執行分散式作業的時候,我們能夠看到很多輸出
        //如果verbose=false,我們則看不到作業輸出資訊
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    //返回作業的狀態
    return isSuccessful();
  }

這個方法裡面最重要的就是submit()方法,提交分散式作業。所以,我們需要進入submit()方法。

public void submit() 
        throws IOException, InterruptedException, ClassNotFoundException {
   ensureState(JobState.DEFINE);
   //設定新的API,我使用的2.2.0的HadoopAPI,區別於之前的API
   setUseNewAPI();
   //和叢集做連線,叢集裡面做出相應,分配作業ID
   connect();
   final JobSubmitter submitter = 
       getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
   status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
     public JobStatus run() throws IOException, InterruptedException, 
     ClassNotFoundException {
         //提交作業
         /*
         Internal method for submitting jobs to the system. 

         The job submission process involves: 
         1. Checking the input and output specifications of the job. 
         2. Computing the InputSplits for the job. 
         3. Setup the requisite accounting information for the DistributedCache of the job, if necessary. 
         4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 
         5. Submitting the job to the JobTracker and optionally monitoring it's status. 
         */
         //在這個方法裡面包含5件事情。
         //1.檢查輸入和輸出
         //2.為每個job計算輸入切片的數量
         //3.4.提交資原始檔
         //5.提交作業,監控狀態
         //這裡要注意的是,在2.x裡面,已經沒有JobTracker了。
         //JobTracker is no longer used since M/R 2.x. 
         //This is a dummy JobTracker class, which is used to be compatible with M/R 1.x applications.
       return submitter.submitJobInternal(Job.this, cluster);
     }
   });
   state = JobState.RUNNING;
   LOG.info("The url to track the job: " + getTrackingURL());
  }

所以我們需要進入submitter.submitJObInternal()方法去看看裡面的實現。

//在這個方法裡面包含5件事情。
//1.檢查輸入和輸出
//2.為每個job計算輸入切片的數量
//3.4.提交資原始檔
//5.提交作業,監控狀態
//這裡要注意的是,在2.x裡面,已經沒有JobTracker了。
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);
    
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
                                                     job.getConfiguration());
    //configure the command line options correctly on the submitting dfs
    Configuration conf = job.getConfiguration();
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    //設定Job的ID
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

      copyAndConfigureFiles(job, submitJobDir);
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      //寫切片資訊,我們主要關係這個方法 :))
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      //到這裡才真正提交job
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

在這裡我們關心的是

int maps = writeSplits(job, submitJobDir);

進入writeSplites()方法

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
          Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
    //可以從job裡面獲取configuration資訊
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
        //呼叫新的切片方法,我們使用的2.x的hadoop,因此
        //使用的是新的切片方法
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
        //舊的切片方法
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

我們使用的版本是2.x,所以,我們使用writeNewSplites()方法。

@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
    //可以從job裡面獲取configuration資訊
  Configuration conf = job.getConfiguration();
  //通過反射獲取一個輸入格式化
  //這裡面返回的是TextInputFormat,即預設值              
  InputFormat<?, ?> input =
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  // ==  1  ==
  
  //輸入格式化進行切片計算
  List<InputSplit> splits = input.getSplits(job);                  // ==  2  ==
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

  // sort the splits into order based on size, so that the biggest
  // go first
  Arrays.sort(array, new SplitComparator());
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
      jobSubmitDir.getFileSystem(conf), array);
  return array.length;
}

我們看到‘==  1 ==’,這裡是獲取輸入格式化,進入job.getInputFormatClass()方法

@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass() 
   throws ClassNotFoundException {
    //如果配置資訊裡面INPUT_FORMAT_CLASS_ATTR(mapreduce.job.inputformat.class)沒有配置
    //則返回TextInputFormat
    //如果有配置,則返回我們配置的資訊
    //意思是:預設值為TextInputFormat
  return (Class<? extends InputFormat<?,?>>) 
    conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}

我們看到,系統預設的輸入格式化為TextInputFormat。

我們看到‘==  2 ==’,這裡從輸入格式化裡面進行切片計算。那麼我們進入getSplites()方法

public List<InputSplit> getSplits(JobContext job) throws IOException {
    //minSize = Math.max(1, 1L)=1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));      // == A ==
    //maxSize = Long.MAX_VALUE
    long maxSize = getMaxSplitSize(job);                                         // == B ==

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //獲取輸入檔案列表
    List<FileStatus> files = listStatus(job);
    //遍歷檔案列表
    for (FileStatus file: files) {
      //一個檔案一個檔案的處理
      //然後計算檔案的切片
      Path path = file.getPath();
      //檔案大小
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
            //通過路徑獲取FileSystem
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          //獲取檔案所有塊資訊
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判斷檔案是否可以切片
        if (isSplitable(job, path)) {
            //可以切片
          //獲取檔案塊大小
          long blockSize = file.getBlockSize();
          //切片大小 splitSize = blockSize
          //預設情況下,切片大小等於塊的大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);          // == C == 

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            //塊的索引
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);     // == D ==
            //切片詳細資訊
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                     blkLocations[blkIndex].getHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts()));
          }
        } else { // not splitable
            //不可切片
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
  }

我們看‘== A ==’, getFormatMinSplitSize()方法返回1,getMinSplitSize()方法返回1L。

protected long getFormatMinSplitSize() {
    return 1;
  }

public static long getMinSplitSize(JobContext job) {
    //如果我們在配置檔案中有配置SPLIT_MINSIZE(mapreduce.input.fileinputformat.split.minsize),則取配置檔案裡面的
    //否則返回預設值1L
    //這裡我們,沒有配置,所以返回1L
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }

我們看‘== B ==’,getMaxSplitSize()方法返回Long.MAX_VALUE(我們沒有進行對SPLIT_MAXSIZE進行配置)

public static long getMaxSplitSize(JobContext context) {
    //如果我們在配置檔案中有配置SPLIT_MAXSIZE(mapreduce.input.fileinputformat.split.maxsize),則取配置檔案裡面的
    //否則返回預設值Long.MAX_VALUE
    //這裡我們,沒有配置,所以返回Long.MAX_VALUE
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }

我們看‘== C ==’,在我們沒有進行配置的情況下,切片大小等於塊大小。

//minSize=1
//maxSize=Long.MAX_VALUE
protected long computeSplitSize(long blockSize, long minSize,
        long maxSize) {
    //Math.min(maxSize, blockSize) -> Math.min(Long.MAX_VALUE, blockSize) -> blockSize
    //Math.max(minSize, blockSize) -> Math.max(1, blockSize) -> blockSize
return Math.max(minSize, Math.min(maxSize, blockSize));
}

我們看‘== D ==’,通過偏移量獲取塊的索引資訊。

protected int getBlockIndex(BlockLocation[] blkLocations, 
        long offset) {
    //通過偏移量獲取塊的索引
    for (int i = 0 ; i < blkLocations.length; i++) {
        // is the offset inside this block?
        if ((blkLocations[i].getOffset() <= offset) &&
        (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
            return i;
        }
    }
    BlockLocation last = blkLocations[blkLocations.length -1];
    long fileLength = last.getOffset() + last.getLength() -1;
    throw new IllegalArgumentException("Offset " + offset + 
                     " is outside of file (0.." +
                     fileLength + ")");
}

 

4.小結

用通俗的語言來描述上面的事情,可以用下面的圖來說明:

系統預設的塊大小為128MB,在我們沒有進行其他配置的時候,塊大小等於切片大小。

Type1:塊大小為45MB,小於系統預設大小128MB,

切片資訊:path, 0, 45, [3, 8, 10]

切片資訊:檔案的位置path, 偏移量0, 切片大小45, 塊的位置資訊[3, 8, 10]=該檔案(塊)存在HDFS檔案系統的datanode3,datanode8,datanode10上面。

 

Type2:塊大小為128MB,即等於系統預設大小128MB,不會分成兩個快,和Type1一樣。

 

Type3:塊大小為414MB,即大於系統預設128MB,那麼在我們上傳該檔案到HDFS的時候,系統就會把該檔案分成很多塊,每一塊128MB,每一塊128MB,直到分完為止,最後剩下30MB單獨為一塊。那麼,每一個切片資訊由檔案位置path, 偏移量,切片大小, 塊的位置資訊構成。我們把這一串資訊稱為檔案的切片清單。

當系統拿到了檔案的切片清單了以後,那麼就會把這些清單提交給分散式系統,再由分散式系統去處理各個切片。

 

  

5.Mapper詳解

5.1.map輸入

map從HDFS獲取輸入流,然後定位到切片的位置,除了第一個切片,其他切片都是從第二行開始讀取資料進行處理。

在org.apache.hadoop.mapred.MapTask裡面,包含了run()方法

//org.apache.hadoop.mapred.MapTask
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
        throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
      //我們在客戶端可以設定reduce的個數
      // job.setNumReduceTasks(10);
      //如果沒有Reduce,只有map階段,
      if (conf.getNumReduceTasks() == 0) {
          //那麼就執行這行
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        //只要有Reduce階段,
        mapPhase = getProgress().addPhase("map", 0.667f);
        //就要加入排序
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    TaskReporter reporter = startReporter(umbilical);
 
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    //是否使用新的API
    if (useNewApi) {
        //我們使用的是new mapper
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

我們進入到runNewMapper()方法,我們可以看到整個map的巨集觀動作

1.輸入初始化

2.呼叫org.apache.hadoop.mapreduce.Mapper.run()方法

3.更新狀態

4.關閉輸入

5.關閉輸出

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
  // make a task context so we can get the classes
    //獲取任務上下文
  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                getTaskID(),
                                                                reporter);
  // make a mapper
  // 通過反射構造mapper
  // 得到我們寫的mapper類
  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
    (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
      ReflectionUtils.newInstance(taskContext.getMapperClass(), job);        // == AA ==
  // make the input format
  // 通過反射獲取輸入格式化
  // 通過輸入格式化,在這裡,就可以獲取到檔案的切片清單
  org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
    (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
      ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);   // == BB ==
  // rebuild the input split
  //一個map對應的是一個切片,即一個切片對應一個map
  org.apache.hadoop.mapreduce.InputSplit split = null;
  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
      splitIndex.getStartOffset());
  LOG.info("Processing split: " + split);
  //這裡new了一個NewTrackingRecordReader()
  org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
    new NewTrackingRecordReader<INKEY,INVALUE>
      (split, inputFormat, reporter, taskContext);                          // == CC ==
  
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
  org.apache.hadoop.mapreduce.RecordWriter output = null;
  
  // get an output object
  if (job.getNumReduceTasks() == 0) {
    output = 
      new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
  } else {
    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
  }

  //建立一個map上下文物件
  //這裡傳入input物件
  //這裡MapContext,NewTrackingRecordReader,LineRecordReader他們之間的關係是什麼呢?
  //在MapContext,NewTrackingRecordReader,LineRecordReader類裡面都包含了nextKeyValue(),getCurrentKey(), getCurrentValue()方法
  //當我們呼叫MapContext裡面的nextKeyValue()的時候,會去掉用NewTrackingRecordReader裡面的nextKeyValue()方法,這個方法最終會去呼叫LineRecordReader裡面的nextKeyValue()方法
  //即LineRecordReader才是最終做事情的
  org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
  mapContext = 
    new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
        input, output, 
        committer, 
        reporter, split);                                                // == EE ==

  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
      mapperContext = 
        new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
            mapContext);

  try {
      //=============================
      // 這裡列出了mapper的巨集觀動作
      // 1. 輸入初始化
      // 2. 呼叫org.apache.hadoop.mapreduce.Mapper.run()方法
      // 3. 更新狀態
      // 4. 關閉輸入
      // 5. 關閉輸出
      //=============================
      //輸入初始化
    input.initialize(split, mapperContext);                           // == FF ==
    //然後呼叫mapper裡面的run()方法,即org.apache.hadoop.mapreduce.Mapper裡面的run()方法
    mapper.run(mapperContext);                                       //  == GG ==
    //map結束
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    //關閉輸入
    input.close();
    input = null;
    //關閉輸出
    output.close(mapperContext);
    output = null;
  } finally {
    closeQuietly(input);
    closeQuietly(output, mapperContext);
  }
}

我們看'== AA ==',由於我們在客戶端已經設定了我們自定義的mapper,所以系統會返回我們定義的mapper類

//在客戶端,我們通過job.setMapperClass(MyMapper.class);
//設定了我們自定義的mapper類,因此這裡返回我們寫的mapper
@SuppressWarnings("unchecked")
public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
   throws ClassNotFoundException {
  return (Class<? extends Mapper<?,?,?,?>>) 
    conf.getClass(MAP_CLASS_ATTR, Mapper.class);
}

我們看'== BB ==',在上面我們已經提到,系統預設為TextInputFormat輸入格式化

//系統預設為TextInputFormat
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass() 
   throws ClassNotFoundException {
  return (Class<? extends InputFormat<?,?>>) 
    conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}

 

我們看'== CC ==',這裡返回一個RecordReader物件

NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
        org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
        TaskReporter reporter,
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
        throws InterruptedException, IOException {
  this.reporter = reporter;
  this.inputRecordCounter = reporter
      .getCounter(TaskCounter.MAP_INPUT_RECORDS);
  this.fileInputByteCounter = reporter
      .getCounter(FileInputFormatCounter.BYTES_READ);

  List <Statistics> matchedStats = null;
  if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
    matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
        .getPath(), taskContext.getConfiguration());
  }
  fsStats = matchedStats;

  long bytesInPrev = getInputBytes(fsStats);
  //客戶端輸入格式化計算切片
  //而在map階段,輸入格式化會建立一個
  //org.apache.hadoop.mapreduce.RecordReader<KEYIN, VALUEIN>
  this.real = inputFormat.createRecordReader(split, taskContext);      // == DD ==
  long bytesInCurr = getInputBytes(fsStats);
  fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}

 

我們看'== DD ==', 這裡直接new一個LineRecordReader行讀取器。這個在後面還會提到。因為真正做事情的就是這個行讀取器。

//org.apache.hadoop.mapreduce.lib.input.TextInputFormat
@Override
public RecordReader<LongWritable, Text> 
  createRecordReader(InputSplit split,
                     TaskAttemptContext context) {
  String delimiter = context.getConfiguration().get(
      "textinputformat.record.delimiter");
  byte[] recordDelimiterBytes = null;
  if (null != delimiter)
    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
  //這裡建立了一個行讀取器
  return new LineRecordReader(recordDelimiterBytes);
}

 

我們看'== EE =='建立map上下文

//這裡的reader就是org.apache.hadoop.mapreduce.RecordReader
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
        RecordReader<KEYIN,VALUEIN> reader,
        RecordWriter<KEYOUT,VALUEOUT> writer,
        OutputCommitter committer,
        StatusReporter reporter,
        InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}

看到這裡以後,這裡MapContext,NewTrackingRecordReader,LineRecordReader他們之間的關係是什麼呢?

這要看這三個類裡面的一些共同的方法:

nextKeyValue()

getCurrentKey()

getCurrentValue()

當我們呼叫MapContext裡面的nextKeyValue()的時候,會去掉用NewTrackingRecordReader裡面的nextKeyValue()方法,這個方法最終會去呼叫LineRecordReader裡面的nextKeyValue()方法。

即LineRecordReader才是最終做事情的

 

我們看'== FF ==',輸入初始化

//輸入初始化
  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    //起始偏移量
    start = split.getStart();
    //結束偏移量
    end = start + split.getLength();
    //位置資訊
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    //開啟HDFS檔案
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {
      isCompressedInput = true;    
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        if (null == this.recordDelimiterBytes){
          in = new LineReader(cIn, job);
        } else {
          in = new LineReader(cIn, job, this.recordDelimiterBytes);
        }

        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        if (null == this.recordDelimiterBytes) {
          in = new LineReader(codec.createInputStream(fileIn, decompressor),
              job);
        } else {
          in = new LineReader(codec.createInputStream(fileIn,
              decompressor), job, this.recordDelimiterBytes);
        }
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      if (null == this.recordDelimiterBytes){
        in = new LineReader(fileIn, job);
      } else {
        in = new LineReader(fileIn, job, this.recordDelimiterBytes);
      }

      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    //如果不是第一個切片,即從第二個切片開始,通常情況下,不會去讀取第一行
    //而是從第二行開始讀取
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

怎樣理解下面程式碼呢?

if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }

我們可以通過下圖可以知道

一個檔案上傳到HDFS後,被分成很多block,然而每個block有一定的size,那麼在切分這些檔案的時候,就可能產生一個block的最後一行被放在兩個block裡面

e.g.Block1裡面的最後一行,原本應該是'hello hongten 5'

但是由於block的size的大小限制,該文字被分成兩部分'hello hong' 和 'ten 5'

現在切片個數大於1,那麼Block2在讀取內容的時候,從第二行開始讀取,即從'hello hongten 6'開始讀取。而對於Block1在讀取內容的時候,則會讀取Block2的第一行,即'ten 5'。

這樣就保證了資料的完整性了。

我們看'== GG ==',呼叫org.apache.hadoop.mapreduce.Mapper.run()方法

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        //最終呼叫LineRecordReader.nextKeyValue(),
        // 這裡是一行一行讀取資料
        // 即讀一行資料,呼叫map()方法
      while (context.nextKeyValue()) {
          //最終呼叫LineRecordReader.getCurrentKey(), LineRecordReader.getCurrentValue()
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

那麼LineRecordReader裡面的nextKeyValue()做了什麼呢?

public boolean nextKeyValue() throws IOException {
    if (key == null) {
        //key為偏移量,預設為LongWritable
      key = new LongWritable();
    }
    //給key賦值
    key.set(pos);
    if (value == null) {
        //value預設為Text
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    //這裡總是讀取多一行,為什麼要讀取多一行呢?現在知道了吧
    while (getFilePosition() <= end) {
        //給value賦值
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
      //因為在nextKeyValue()已經賦值,直接返回
    return key;
  }

  @Override
  public Text getCurrentValue() {
    //因為在nextKeyValue()已經賦值,直接返回
    return value;
  }

 

5.2.map輸出 

 

@SuppressWarnings("unchecked")
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    //.....其他程式碼省略
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    //如果沒有Reduce
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        //在我們客戶端定義了一個reduce
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
    //.....其他程式碼省略
  }

在NewOutputCollector裡面做了什麼呢?

@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                   ) throws IOException, ClassNotFoundException {
  //建立一個collecter容器
  collector = createSortingCollector(job, reporter);   // == OO1 ==
  //分割槽數量 = Reduce Task的數量
  partitions = jobContext.getNumReduceTasks();
  if (partitions > 1) {
      //多個分割槽
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
      ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);    // == OO2 ==
  } else {
      //第1個分割槽器,獲取0號分割槽器
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
      @Override
      public int getPartition(K key, V value, int numPartitions) {
        return partitions - 1;
      }
    };
  }
}

我們看看'== OO1 ==',呼叫createSortingCollector()建立一個collector容器

@SuppressWarnings("unchecked")
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
        createSortingCollector(JobConf job, TaskReporter reporter)
  throws IOException, ClassNotFoundException {
  MapOutputCollector<KEY, VALUE> collector
    = (MapOutputCollector<KEY, VALUE>)
     ReflectionUtils.newInstance(
                      job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
                      MapOutputBuffer.class, MapOutputCollector.class), job);
  LOG.info("Map output collector class = " + collector.getClass().getName());
  MapOutputCollector.Context context =
                         new MapOutputCollector.Context(this, job, reporter);
  //容器初始化
  collector.init(context);
  //返回容器
  return co