MapReduce-提交job原始碼分析
阿新 • • 發佈:2018-11-23
MapReduce-提交job原始碼分析
作者:尹正傑
版權宣告:原創作品,謝絕轉載!否則將追究法律責任。
一.環境準備
1>.順手的IDE,大家可以根據自己的喜好選擇你喜歡的IDE
博主推薦以下2款IDE,大家可以自行百度官網,也看看我之前調研的筆記:
eclipse:https://www.cnblogs.com/yinzhengjie/p/8733302.html
idea:https://www.cnblogs.com/yinzhengjie/p/9080387.html (我比較推薦它,挺好使的,而且我們公司的好多開發也在用它開發呢~)
2>.編寫Wordcount程式碼
/* @author :yinzhengjie Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/ EMAIL:[email protected] */ package mapreduce.yinzhengjie.org.cn; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable;WordcountMapper.java 檔案內容import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 輸出 for (String word : words) { k.set(word); context.write(k, v); } } }
/* @author :yinzhengjie Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/ EMAIL:[email protected] */ package mapreduce.yinzhengjie.org.cn; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { // 1 累加求和 int sum = 0; for (IntWritable count : value) { sum += count.get(); } // 2 輸出 context.write(key, new IntWritable(sum)); } }WordcountReducer.java 檔案內容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/ 4 EMAIL:[email protected] 5 */ 6 package mapreduce.yinzhengjie.org.cn; 7 8 import java.io.IOException; 9 import org.apache.hadoop.conf.Configuration; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.IntWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 public class WordcountDriver { 18 19 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 20 21 //配置Hadoop的環境變數,如果沒有配置可能會拋異常:“ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path”,還有一件事就是你的HADOOP_HOME的bin目錄下必須得有winutils.exe 22 System.setProperty("hadoop.home.dir", "D:/yinzhengjie/softwares/hadoop-2.7.3"); 23 24 //獲取配置資訊 25 Configuration conf = new Configuration(); 26 Job job = Job.getInstance(conf); 27 28 //設定jar載入路徑 29 job.setJarByClass(WordcountDriver.class); 30 31 //設定map和Reduce類 32 job.setMapperClass(WordcountMapper.class); 33 job.setReducerClass(WordcountReducer.class); 34 35 //設定map輸出 36 job.setMapOutputKeyClass(Text.class); 37 job.setMapOutputValueClass(IntWritable.class); 38 39 //設定Reduce輸出 40 job.setOutputKeyClass(Text.class); 41 job.setOutputValueClass(IntWritable.class); 42 43 //設定輸入和輸出路徑 44 FileInputFormat.setInputPaths(job, new Path(args[0])); 45 FileOutputFormat.setOutputPath(job, new Path(args[1])); 46 47 //等待job提交完畢 48 boolean result = job.waitForCompletion(true); 49 50 System.exit(result ? 0 : 1); 51 } 52 }WordcountDriver.java 檔案內容
Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
1.txt 測試資料
3>.配置相關引數
4>.打斷點,點選debug進行除錯
二.程式碼除錯過程
1>.單步進入
2>.進入submit()方法
3>.進入connect()的方法
新舊的API對比,可檢視官網:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
4>.
5>.
6>.
7>.
三.總結
1>.簡介job提交原始碼分析
waitForCompletion() submit(); // 1建立連線 connect(); // 1)建立提交job的代理 new Cluster(getConfiguration()); // (1)判斷是本地yarn還是遠端 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)建立給叢集提交資料的Stag路徑 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)獲取jobid ,並建立job路徑 JobID jobId = submitClient.getNewJobID(); // 3)拷貝jar包到叢集 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計算切片,生成切片規劃檔案 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路徑寫xml配置檔案 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交job,返回提交狀態 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2>.網上找的一張流程圖,畫得挺命令,摘下來方便自己以後理解