技術改變生活,商業成就夢想
1.1 MapReduce是什麼
Hadoop MapReduce是一個軟體框架,基於該框架能夠容易地編寫應用程式,這些應用程式能夠執行在由上千個商用機器組成的大叢集上,並以一種可靠的,具有容錯能力的方式並行地處理上TB級別的海量資料集。這個定義裡面有著這些關鍵詞,
一是軟體框架,二是並行處理,三是可靠且容錯,四是大規模叢集,五是海量資料集。
1.2 MapReduce做什麼
MapReduce擅長處理大資料,它為什麼具有這種能力呢?這可由MapReduce的設計思想發覺。MapReduce的思想就是“分而治之”。
(1)Mapper負責“分”,即把複雜的任務分解為若干個“簡單的任務”來處理。“簡單的任務”包含三層含義:
一是資料或計算的規模相對原任務要大大縮小;二是就近計算原則,即任務會分配到存放著所需資料的節點上進行計算;三是這些小任務可以平行計算,彼此間幾乎沒有依賴關係。
(2)Reducer負責對map階段的結果進行彙總。至於需要多少個Reducer,使用者可以根據具體問題,通過在mapred-site.xml配置檔案裡設定引數mapred.reduce.tasks的值,預設值為1。
一個比較形象的語言解釋MapReduce:
我們要數圖書館中的所有書。你數1號書架,我數2號書架。這就是“Map”。我們人越多,數書就更快。
現在我們到一起,把所有人的統計數加在一起。這就是“Reduce
”。
1.3 MapReduce工作機制
MapReduce的整個工作過程如上圖所示,它包含如下4個獨立的實體:
實體一:客戶端,用來提交MapReduce作業。
實體二:JobTracker,用來協調作業的執行。
實體三:TaskTracker,用來處理作業劃分後的任務。
實體四:HDFS,用來在其它實體間共享作業檔案。
二、Hadoop中的MapReduce框架
一個MapReduce作業通常會把輸入的資料集切分為若干獨立的資料塊,由Map任務以完全並行的方式去處理它們。
框架會對Map的輸出先進行排序,然後把結果輸入給Reduce任務。通常作業的輸入和輸出都會被儲存在檔案系統中,整個框架負責任務的排程和監控,以及重新執行已經關閉的任務。
通常,MapReduce框架和分散式檔案系統是執行在一組相同的節點上,也就是說,計算節點和儲存節點通常都是在一起的。這種配置允許框架在那些已經存好資料的節點上高效地排程任務,這可以使得整個叢集的網路頻寬被非常高效地利用。
2.1 MapReduce框架的組成
(1)JobTracker
JobTracker負責排程構成一個作業的所有任務,這些任務分佈在不同的TaskTracker上(由上圖的JobTracker可以看到2 assign map 和 3 assign reduce)。你可以將其理解為公司的專案經理,專案經理接受專案需求,並劃分具體的任務給下面的開發工程師。
(2)TaskTracker
TaskTracker負責執行由JobTracker指派的任務,這裡我們就可以將其理解為開發工程師,完成專案經理安排的開發任務即可。
2.2 MapReduce的輸入輸出
MapReduce框架運轉在<key,value>鍵值對上,也就是說,框架把作業的輸入看成是一組<key,value>鍵值對,同樣也產生一組<key,value>鍵值對作為作業的輸出,這兩組鍵值對有可能是不同的。
一個MapReduce作業的輸入和輸出型別如下圖所示:可以看出在整個流程中,會有三組<key,value>鍵值對型別的存在。
2.3 MapReduce的處理流程
這裡以WordCount單詞計數為例,介紹map和reduce兩個階段需要進行哪些處理。單詞計數主要完成的功能是:統計一系列文字檔案中每個單詞出現的次數,如圖所示:
(1)map任務處理
(2)reduce任務處理
三、第一個MapReduce程式:WordCount
WordCount單詞計數是最簡單也是最能體現MapReduce思想的程式之一,該程式完整的程式碼可以在Hadoop安裝包的src/examples目錄下找到。
WordCount單詞計數主要完成的功能是:統計一系列文字檔案中每個單詞出現的次數;
3.1 初始化一個words.txt檔案並上傳HDFS
首先在Linux中通過Vim編輯一個簡單的words.txt,其內容很簡單如下所示:
Hello Edison Chou Hello Hadoop RPC Hello Wncud Chou Hello Hadoop MapReduce Hello Dick Gu
通過Shell命令將其上傳到一個指定目錄中,這裡指定為:/testdir/input
3.2 自定義Map函式
在Hadoop 中, map 函式位於內建類org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函式位於內建類org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。
我們要做的就是覆蓋map 函式和reduce 函式,首先我們來覆蓋map函式:繼承Mapper類並重寫map方法
/** * @author Edison Chou * @version 1.0 * @param KEYIN * →k1 表示每一行的起始位置(偏移量offset) * @param VALUEIN * →v1 表示每一行的文字內容 * @param KEYOUT * →k2 表示每一行中的每個單詞 * @param VALUEOUT * →v2 表示每一行中的每個單詞的出現次數,固定值為1 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split(" "); for (String word : spilted) { context.write(new Text(word), new LongWritable(1L)); } }; }
Mapper 類,有四個泛型,分別是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面兩個KEYIN、VALUEIN 指的是map 函式輸入的引數key、value 的型別;後面兩個KEYOUT、VALUEOUT 指的是map 函式輸出的key、value 的型別;
從程式碼中可以看出,在Mapper類和Reducer類中都使用了Hadoop自帶的基本資料型別,例如String對應Text,long對應LongWritable,int對應IntWritable。這是因為HDFS涉及到序列化的問題,Hadoop的基本資料型別都實現了一個Writable介面,而實現了這個介面的型別都支援序列化。
這裡的map函式中通過空格符號來分割文字內容,並對其進行記錄;
3.3 自定義Reduce函式
現在我們來覆蓋reduce函式:繼承Reducer類並重寫reduce方法
/** * @author Edison Chou * @version 1.0 * @param KEYIN * →k2 表示每一行中的每個單詞 * @param VALUEIN * →v2 表示每一行中的每個單詞的出現次數,固定值為1 * @param KEYOUT * →k3 表示每一行中的每個單詞 * @param VALUEOUT * →v3 表示每一行中的每個單詞的出現次數之和 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { long count = 0L; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); }; }
Reducer 類,也有四個泛型,同理,分別指的是reduce 函式輸入的key、value型別(這裡輸入的key、value型別通常和map的輸出key、value型別保持一致)和輸出的key、value 型別。
這裡的reduce函式主要是將傳入的<k2,v2>進行最後的合併統計,形成最後的統計結果。
3.4 設定Main函式
(1)設定輸入目錄,當然也可以作為引數傳入
public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";
(2)設定輸出目錄(輸出目錄需要是空目錄),當然也可以作為引數傳入
public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";
(3)Main函式的主要程式碼
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 0.0:首先刪除輸出路徑的已有生成檔案 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(conf, "WordCount"); job.setJarByClass(MyWordCountJob.class); // 1.0:指定輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 1.1:指定對輸入資料進行格式化處理的類(可以省略) job.setInputFormatClass(TextInputFormat.class); // 1.2:指定自定義的Mapper類 job.setMapperClass(MyMapper.class); // 1.3:指定map輸出的<K,V>型別(如果<k3,v3>的型別與<k2,v2>的型別一致則可以省略) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.4:分割槽(可以省略) job.setPartitionerClass(HashPartitioner.class); // 1.5:設定要執行的Reducer的數量(可以省略) job.setNumReduceTasks(1); // 1.6:指定自定義的Reducer類 job.setReducerClass(MyReducer.class); // 1.7:指定reduce輸出的<K,V>型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 1.8:指定輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 1.9:指定對輸出資料進行格式化處理的類(可以省略) job.setOutputFormatClass(TextOutputFormat.class); // 2.0:提交作業 boolean success = job.waitForCompletion(true); if (success) { System.out.println("Success"); System.exit(0); } else { System.out.println("Failed"); System.exit(1); } }
在Main函式中,主要做了三件事:一是指定輸入、輸出目錄;二是指定自定義的Mapper類和Reducer類;三是提交作業;匆匆看下來,程式碼有點多,但有些其實是可以省略的。
(4)完整程式碼如下所示
View Code
3.5 執行吧小DEMO
(1)除錯檢視控制檯狀態資訊
(2)通過Shell命令檢視統計結果
四、使用ToolRunner類改寫WordCount
Hadoop有個ToolRunner類,它是個好東西,簡單好用。無論在《Hadoop權威指南》還是Hadoop專案原始碼自帶的example,都推薦使用ToolRunner。
4.1 最初的寫法
下面我們看下src/example目錄下WordCount.java檔案,它的程式碼結構是這樣的:
public class WordCount { // 略... public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 略... Job job = new Job(conf, "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); } }
WordCount.java中使用到了GenericOptionsParser這個類,它的作用是將命令列中引數自動設定到變數conf中。舉個例子,比如我希望通過命令列設定reduce task數量,就這麼寫:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5
上面這樣就可以了,不需要將其硬編碼到java程式碼中,很輕鬆就可以將引數與程式碼分離開。
4.2 加入ToolRunner的寫法
至此,我們還沒有說到ToolRunner,上面的程式碼我們使用了GenericOptionsParser幫我們解析命令列引數,編寫ToolRunner的程式設計師更懶,它將 GenericOptionsParser呼叫隱藏到自身run方法,被自動執行了,修改後的程式碼變成了這樣:
public class WordCount extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Job job = new Job(getConf(), "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }
看看這段程式碼上有什麼不同:
(1)讓WordCount繼承Configured並實現Tool介面。
(2)重寫Tool介面的run方法,run方法不是static型別,這很好。
(3)在WordCount中我們將通過getConf()獲取Configuration物件。
可以看出,通過簡單的幾步,就可以實現程式碼與配置隔離、上傳檔案到DistributeCache等功能。修改MapReduce引數不需要修改java程式碼、打包、部署,提高工作效率。
4.3 重寫WordCount程式
public class MyJob extends Configured implements Tool { public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... } }; } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... }; } // 輸入檔案路徑 public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt"; // 輸出檔案路徑 public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount"; @Override public int run(String[] args) throws Exception { // 首先刪除輸出路徑的已有生成檔案 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "WordCount"); // 設定輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設定自定義Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 設定自定義Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 設定輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int res = ToolRunner.run(conf, new MyJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
--------------------- 本文來自 burpee 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/burpee/article/details/78769161?utm_source=copy