Hadoop基礎-MapReduce入門篇之編寫簡單的Wordcount測試程式碼
Hadoop基礎-MapReduce入門篇之編寫簡單的Wordcount測試程式碼
作者:尹正傑
版權宣告:原創作品,謝絕轉載!否則將追究法律責任。
本文主要是記錄一寫我在學習MapReduce時的一些瑣碎的學習筆記, 方便自己以後檢視。在呼叫API的時候,可能會需要maven依賴,新增依賴的包如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <projectxmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <parent> 6 <artifactId>HADOOP</artifactId> 7 <groupId>yinzhengjie.org.cn</groupId> 8 <version>1.0-SNAPSHOT</version> 9 </parent> 10 <modelVersion>4.0.0</modelVersion> 11 12 <artifactId>MapReduce</artifactId> 13 14 <dependencies> 15 <dependency> 16 <groupId>org.apache.hadoop</groupId> 17 <artifactId>hadoop-common</artifactId> 18 <version>2.6.0</version> 19 </dependency> 20 21 <dependency> 22 <groupId>org.apache.hadoop</groupId> 23 <artifactId>hadoop-client</artifactId> 24 <version>2.6.0</version> 25 </dependency> 26 27 <dependency> 28 <groupId>junit</groupId> 29 <artifactId>junit</artifactId> 30 <version>4.11</version> 31 <scope>test</scope> 32 </dependency> 33 34 </dependencies> 35 36 37 </project>
一.MapReduce定義
Mapreduce是一個分散式運算程式的程式設計框架,是使用者開發“基於hadoop的資料分析應用”的核心框架。
Mapreduce核心功能是將使用者編寫的業務邏輯程式碼和自帶預設元件整合成一個完整的分散式運算程式,併發執行在一個hadoop叢集上。
二.MapReduce優點
1>.MapReduce 易於程式設計。
它簡單的實現一些介面,就可以完成一個分散式程式,這個分散式程式可以分佈到大量廉價的PC機器上執行。也就是說你寫一個分散式程式,跟寫一個簡單的序列程式是一模一樣的。就是因為這個特點使得MapReduce程式設計變得非常流行。
2>.良好的擴充套件性。
當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴充套件它的計算能力。
3>.高容錯性。
MapReduce設計的初衷就是使程式能夠部署在廉價的PC機器上,這就要求它具有很高的容錯性。比如其中一臺機器掛了,它可以把上面的計算任務轉移到另外一個節點上執行,不至於這個任務執行失敗,而且這個過程不需要人工參與,而完全是由 Hadoop內部完成的。
4>.適合PB級以上海量資料的離線處理。
它適合離線處理而不適合線上處理。比如像毫秒級別的返回一個結果,MapReduce很難做到。
三.MapReduce缺點
MapReduce不擅長做實時計算、流式計算、DAG(有向圖)計算。
1>.實時計算。
MapReduce無法像Mysql一樣,在毫秒或者秒級內返回結果。
2>.流式計算。
流式計算的輸入資料是動態的,而MapReduce的輸入資料集是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了資料來源必須是靜態的。
3>.DAG(有向圖)計算。
多個應用程式存在依賴關係,後一個應用程式的輸入為前一個的輸出。在這種情況下,MapReduce並不是不能做,而是使用後,每個MapReduce作業的輸出結果都會寫入到磁碟,會造成大量的磁碟IO,導致效能非常的低下。
四.MapReduce程序
一個完整的mapreduce程式在分散式執行時有三類例項程序:
1>.MrAppMaster:
負責整個程式的過程排程及狀態協調。
2>.MapTask:
負責map階段的整個資料處理流程。
3>.ReduceTask:
負責reduce階段的整個資料處理流程。
五.MapReduce程式設計規範
使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(提交執行mr程式的客戶端)
1>.Mapper階段
(1)使用者自定義的Mapper要繼承自己的父類
(2)Mapper的輸入資料是KV對的形式(KV的型別可自定義)
(3)Mapper中的業務邏輯寫在map()方法中
(4)Mapper的輸出資料是KV對的形式(KV的型別可自定義)
(5)map()方法(maptask程序)對每一個<K,V>呼叫一次
2>.Reducer階段
(1)使用者自定義的Reducer要繼承自己的父類
(2)Reducer的輸入資料型別對應Mapper的輸出資料型別,也是KV
(3)Reducer的業務邏輯寫在reduce()方法中
(4)Reducetask程序對每一組相同k的<k,v>組呼叫一次reduce()方法
3>.Driver階段
整個程式需要一個Drvier來進行提交,提交的是一個描述了各種必要資訊的job物件
六.Hadoop序列化
1>.為什麼要序列化?
一般來說,“活的”物件只生存在記憶體裡,關機斷電就沒有了。而且“活的”物件只能由本地的程序使用,不能被髮送到網路上的另外一臺計算機。 然而序列化可以儲存“活的”物件,可以將“活的”物件傳送到遠端計算機。
2>.什麼是序列化?
序列化就是把記憶體中的物件,轉換成位元組序列(或其他資料傳輸協議)以便於儲存(持久化)和網路傳輸。
反序列化就是將收到位元組序列(或其他資料傳輸協議)或者是硬碟的持久化資料,轉換成記憶體中的物件。
3>.為什麼不用Java的序列化?
ava的序列化是一個重量級序列化框架(Serializable),一個物件被序列化後,會附帶很多額外的資訊(各種校驗資訊,header,繼承體系等),不便於在網路中高效傳輸。所以,hadoop自己開發了一套序列化機制(Writable),精簡、高效。
4>.為什麼序列化對Hadoop很重要?
因為Hadoop在叢集之間進行通訊或者RPC呼叫的時候,需要序列化,而且要求序列化要快,且體積要小,佔用頻寬要小。所以必須理解Hadoop的序列化機制。
序列化和反序列化在分散式資料處理領域經常出現:程序通訊和永久儲存。然而Hadoop中各個節點的通訊是通過遠端呼叫(RPC)實現的,那麼RPC序列化要求具有以下特點:
(1)緊湊:緊湊的格式能讓我們充分利用網路頻寬,而頻寬是資料中心最稀缺的資
(2)快速:程序通訊形成了分散式系統的骨架,所以需要儘量減少序列化和反序列化的效能開銷,這是基本的;
(3)可擴充套件:協議為了滿足新的需求變化,所以控制客戶端和伺服器過程中,需要直接引進相應的協議,這些是新協議,原序列化方式能支援新的協議報文;
(4)互操作:能支援不同語言寫的客戶端和服務端進行互動;
(5).常用資料序列化型別
5>.常用的資料型別對應的hadoop資料序列化型別
七.MapReduce案例實操
1>.編寫mapper類
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/ 4 EMAIL:[email protected]q.com 5 */ 6 package mapreduce.yinzhengjie.org.cn; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Mapper; 12 13 import java.io.IOException; 14 15 16 public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 17 18 Text k = new Text(); 19 IntWritable v = new IntWritable(1); 20 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 25 // 1 獲取一行 26 String line = value.toString(); 27 28 // 2 切割 29 String[] words = line.split(" "); 30 31 // 3 輸出 32 for (String word : words) { 33 34 k.set(word); 35 context.write(k, v); 36 } 37 } 38 }
2>.編寫Reduce類
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/ 4 EMAIL:[email protected] 5 */ 6 package mapreduce.yinzhengjie.org.cn; 7 8 import java.io.IOException; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Reducer; 12 13 public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 14 15 @Override 16 protected void reduce(Text key, Iterable<IntWritable> value, 17 Context context) throws IOException, InterruptedException { 18 19 // 1 累加求和 20 int sum = 0; 21 for (IntWritable count : value) { 22 sum += count.get(); 23 } 24 25 // 2 輸出 26 context.write(key, new IntWritable(sum)); 27 } 28 }
3>.編寫驅動類
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/ 4 EMAIL:[email protected] 5 */ 6 package mapreduce.yinzhengjie.org.cn; 7 8 import java.io.IOException; 9 import org.apache.hadoop.conf.Configuration; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.IntWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 public class WordcountDriver { 18 19 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 20 /** 21 * 配合Hadoop的環境變數,如果沒有配置可能會拋異常:“ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path”, 22 * 還有一件事就是你的HADOOP_HOME的bin目錄下必須得有winutils.exe 23 * 24 */ 25 System.setProperty("hadoop.home.dir", "D:\\yinzhengjie\\softwares\\hadoop-2.7.3"); 26 27 //獲取配置資訊 28 Configuration conf = new Configuration(); 29 Job job = Job.getInstance(conf); 30 31 //設定jar載入路徑 32 job.setJarByClass(WordcountDriver.class); 33 34 //設定map和Reduce類 35 job.setMapperClass(WordcountMapper.class); 36 job.setReducerClass(WordcountReducer.class); 37 38 //設定map輸出 39 job.setMapOutputKeyClass(Text.class); 40 job.setMapOutputValueClass(IntWritable.class); 41 42 //設定Reduce輸出 43 job.setOutputKeyClass(Text.class); 44 job.setOutputValueClass(IntWritable.class); 45 46 //設定輸入和輸出路徑 47 FileInputFormat.setInputPaths(job, new Path(args[0])); 48 FileOutputFormat.setOutputPath(job, new Path(args[1])); 49 50 //等待job提交完畢 51 boolean result = job.waitForCompletion(true); 52 53 System.exit(result ? 0 : 1); 54 } 55 }
八.本地測試
九.叢集測試