Hadoop權威指南學習(三)——MapReduce應用開發
阿新 • • 發佈:2019-01-29
開發MapReduce程式,有一個特定流程:1.寫map和reduce函式,並經過單元測試;2. 編寫本地測試程式執行作業;3. 在叢集上執行,使用IsolationRunner在失敗的相同輸入資料上執行任務;4. 優化調整,任務剖析,Hadoop提供鉤子(hook)輔助分析。
1. 單元測試
2. 本地測試import static org.mockito.Mockito.*; // 使用mock建立模擬 public class MapperTest { @Test public void test() { Mapper mapper = new Mapper(); Test value ="..."; OutputCollector<Text, IntWriteable> output = mock(OutputCollector.class); mapper.map(null, value, output, null); verify(output).collect(new Test(".."), new IntWriteable(..)); // 缺失值測試 // verify(output, nerver).collect(any(Text.class), any(IntWriteable.class)); } }
public class Driver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // 配置jobConf, 輸入輸出路徑,map和reduce類 JobClient.runJob(conf); return 0; } } public class DriverTest { @Test public void test() { JobConf conf = new JobConf(); conf.set("fs.default.name", "file:///"); // 本地檔案系統 conf.set("mapred.job.tracker", "local"); // 本地執行器 FileSystem fs = FileSystem.getLocal(conf); fs.delete(output, true); // delete old output Driver driver = new Driver(); driver.setConf(conf); int res = driver.run(new String[]{...}); checkOutput(conf, output); // 逐行對比實際輸出與預期輸出 } }
3. 作業除錯(在叢集上執行:利用 hadoop jar xx.jar mainClass args執行)
System.err.println("error");// 輸出到日誌中,可通過Web UI檢視
reporter.setStatus("...");// 設定Task的status
reporter.incrCounter(...);// 設定Task的counter
任何到標準輸出或標準錯誤流的寫操作都直接寫到日誌相關檔案(Streaming方式標準輸出被用於map或reduce的輸出)
使用遠端偵錯程式:IsolationRunner
4. 作業調優
mapper數量,reducer數量,cominer,中間值壓縮,自定義序列,調整shuffle
5. MapReduce工作流
將一個問題分解成多個mapreduce作業來執行: 1. 可以將一個mapper實現的功能分割到不同的mapper中,使用Hadoop自帶的ChainMapper類庫將其連線成一個mapper,再結合ChainReducer; 2. 執行多個作業時,可使用現行的作業鏈或者有向無環圖(DAG)控制作業順序執行,如使用JobControl。