1. 程式人生 > >MapReduce-提交job原始碼分析

MapReduce-提交job原始碼分析

                    MapReduce-提交job原始碼分析

                                           作者:尹正傑

版權宣告:原創作品,謝絕轉載!否則將追究法律責任。

 

 

 

一.環境準備

1>.順手的IDE,大家可以根據自己的喜好選擇你喜歡的IDE

  博主推薦以下2款IDE,大家可以自行百度官網,也看看我之前調研的筆記:

    eclipse:https://www.cnblogs.com/yinzhengjie/p/8733302.html

    idea:https://www.cnblogs.com/yinzhengjie/p/9080387.html

(我比較推薦它,挺好使的,而且我們公司的好多開發也在用它開發呢~)

2>.編寫Wordcount程式碼

/*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/
EMAIL:[email protected]
*/
package mapreduce.yinzhengjie.org.cn;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 輸出 for (String word : words) { k.set(word); context.write(k, v); } } }
WordcountMapper.java 檔案內容
/*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/
EMAIL:[email protected]
*/
package mapreduce.yinzhengjie.org.cn;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> value,
                          Context context) throws IOException, InterruptedException {

        // 1 累加求和
        int sum = 0;
        for (IntWritable count : value) {
            sum += count.get();
        }

        // 2 輸出
        context.write(key, new IntWritable(sum));
    }
}
WordcountReducer.java 檔案內容
 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/
 4 EMAIL:[email protected]
 5 */
 6 package mapreduce.yinzhengjie.org.cn;
 7 
 8 import java.io.IOException;
 9 import org.apache.hadoop.conf.Configuration;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.IntWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 
17 public class WordcountDriver {
18 
19     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
20 
21         //配置Hadoop的環境變數,如果沒有配置可能會拋異常:“ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path”,還有一件事就是你的HADOOP_HOME的bin目錄下必須得有winutils.exe
22         System.setProperty("hadoop.home.dir", "D:/yinzhengjie/softwares/hadoop-2.7.3");
23 
24         //獲取配置資訊
25         Configuration conf = new Configuration();
26         Job job = Job.getInstance(conf);
27 
28         //設定jar載入路徑
29         job.setJarByClass(WordcountDriver.class);
30 
31         //設定map和Reduce類
32         job.setMapperClass(WordcountMapper.class);
33         job.setReducerClass(WordcountReducer.class);
34 
35         //設定map輸出
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(IntWritable.class);
38 
39         //設定Reduce輸出
40         job.setOutputKeyClass(Text.class);
41         job.setOutputValueClass(IntWritable.class);
42 
43         //設定輸入和輸出路徑
44         FileInputFormat.setInputPaths(job, new Path(args[0]));
45         FileOutputFormat.setOutputPath(job, new Path(args[1]));
46 
47         //等待job提交完畢
48         boolean result = job.waitForCompletion(true);
49 
50         System.exit(result ? 0 : 1);
51     }
52 }
WordcountDriver.java 檔案內容
Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
1.txt 測試資料

3>.配置相關引數

4>.打斷點,點選debug進行除錯

 

 

二.程式碼除錯過程

1>.單步進入

2>.進入submit()方法

 

3>.進入connect()的方法

   新舊的API對比,可檢視官網:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

 

4>.

 

5>.

 

6>.

 

7>.

 

三.總結

1>.簡介job提交原始碼分析

waitForCompletion()
submit();
// 1建立連線
    connect();    
        // 1)建立提交job的代理
        new Cluster(getConfiguration());
            // (1)判斷是本地yarn還是遠端
            initialize(jobTrackAddr, conf); 
    // 2 提交job
submitter.submitJobInternal(Job.this, cluster)
    // 1)建立給叢集提交資料的Stag路徑
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    // 2)獲取jobid ,並建立job路徑
    JobID jobId = submitClient.getNewJobID();
    // 3)拷貝jar包到叢集
copyAndConfigureFiles(job, submitJobDir);    
    rUploader.uploadFiles(job, jobSubmitDir);
// 4)計算切片,生成切片規劃檔案
writeSplits(job, submitJobDir);
    maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);
// 5)向Stag路徑寫xml配置檔案
writeConf(conf, submitJobFile);
    conf.writeXml(out);
// 6)提交job,返回提交狀態
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2>.網上找的一張流程圖,畫得挺命令,摘下來方便自己以後理解