MapReduce框架學習(3)——Job的建立及配置
- 參考: JeffreyZhou的部落格園
- 《Hadoop權威指南》第四版
0
一個MR作業,包括三點:
- 輸入資料
- MR程式
- Job配置資訊
前面兩篇學習了資料格式和MR過程(map函式和reduce函式),那麼今天再講一下配置資訊,是怎麼把 資料 和 程式結合起來的。
3.1 程式碼
Job物件指定作業執行規範,我們可以用它來控制整個作業的執行。打算對照程式碼來講解,這樣可能要有邏輯一點,不至於講到最後都不知道自己的線索在哪。先貼上WordCount中的Job任務程式碼:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); // 配置檔案
// System.out.println("url:" + conf.get("fs.default.name")); // deprecated
// System.out.println("url:" + conf.get("fs.defaultFS"));
// 獲取一個作業
// Job job = new Job(conf, "word count"); // deprecated
Job job = Job.getInstance(conf,"wordcount"); // 用job的靜態方法
// 設定job所用的那些類(class)檔案在哪個jar包
job.setJarByClass(WordCount.class);
// 設定所用的map reduce類
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 對每個節點的map輸出進行combine
job.setReducerClass(IntSumReducer.class); // 對所有節點的輸出進行reduce
// 設定資料輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定要處理的輸入、輸出路徑,
//此處輸入/出為固定檔案目錄
FileInputFormat.addInputPath(job, "input");
FileOutputFormat.setOutputPath(job, "output");
//此處為引數
// FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 將job提交給叢集執行
System.exit(job.waitForCompletion(true) ? 0 : 1);
那麼,現在對照上面的程式碼來分析作業提交到執行的整個過程吧。
3.2 建立Job
任務建立比較容易,其實就是new
一個例項,先建立一個配置檔案的物件,然後將配置檔案,以及作業名稱作為引數,構造一個Job物件就行了
Configuration conf = new Configuration(); // 配置檔案
Job job = Job.getInstance(conf,"wordcount"); // 用job的靜態方法
3.3 打包作業
我們在Hadoop叢集上執行這個作業時,要把程式碼打包成一個JAR檔案(Hadoop在叢集上釋出這個檔案),關於打包JAR,大概意思就是把程式執行所需要的包啊類啊啥的,全部形成一個壓縮包,但這個工作不用我們自己去一個個找,只要在Job物件的setJarByClass()
方法中傳遞一個類即可,Hadoop會利用這個類來查詢包含它的JAR檔案,進而找到相關的JAR檔案。
// 將job所用的那些類(class)檔案,打成jar包
job.setJarByClass(WordCount.class);
3.4 設定各個環節的函式
這個很好理解,上一篇博文分析了MR過程中的各個環節,這些環節都是可以自定義的,就是在Job裡面設定,將自定義的函式和具體作業聯絡起來。
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 對每個節點的map輸出進行combine
job.setPartitionerClass(MyPartitioner.class); // 對每個節點的map輸出進行partition
job.setReducerClass(IntSumReducer.class); // 對所有節點的輸出進行reduce
3.5 設定輸入輸出資料型別
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
3.6 設定輸入輸出檔案目錄
在設定輸入輸出檔案目錄時,可以選擇使用絕對目錄,就是直接在語句中寫入目錄;也可以使用引數輸入,即在執行程式時,再在控制檯輸入目錄。
// 指定要處理的輸入、輸出路徑,
//此處輸入/出為固定檔案目錄
FileInputFormat.addInputPath(job, "input");
FileOutputFormat.setOutputPath(job, "output");
//此處為引數
// FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
3.7 提交併執行作業
單個任務的執行
可以直接使用語句:
job.waitForCompletion(true)
waitForCompletion()方法提交作業並等待執行完成,該方法唯一的引數是一個標識,指示是否已生成詳細輸出,當標識為
true
(成功)時,作業會把其進度寫到控制檯。
–《Hadoop權威指南》第四版,28頁
多個任務執行
多個任務的話,就有多種組織形式,例如序列、並行、無關、組合。如下圖:
圖中,Job2和Job3將會等Job1執行完了再執行,且可以同時開始,而Job4必須等Job2和Job3同時結束後才結束。
這個組合,就可以採用這樣的程式碼來實現:
Configuration conf = new Configuration();
Job job1 = new Job(conf, "job1"); //.. config Job1
Job job2 = new Job(conf, "job2"); //.. config Job2
Job job3 = new Job(conf, "job3"); //.. config Job3
Job job4 = new Job(conf, "job4"); //.. config Job4
//新增依賴關係
job2.addDependingJob(job1);
job3.addDependingJob(job1);
job4.addDependingJob(job2);
job4.addDependingJob(job3);
JobControl jc = new JobControl("jbo name");
jc.addJob(job1);
jc.addJob(job2);
jc.addJob(job3);
jc.addJob(job4);
jc.run();
3.x 後記
這裡講的其實偏向於作業的程式設計方面,但是在程式中把這些都設定好了,提交給集群后,又是怎樣的執行機制呢?這個就是關於application master
、map task
等方面的分析了。