大資料之八 hadoop MapReduce-WordCount
前兩篇中,我們瞭解了MapReduce的執行流程及其架構實現,今天我們就在本地通過經典例項WordCount來了解一下MapReduce的程式設計實現
叢集配置
stop-dfs.sh
- 配置mapred-site.xml檔案
<!-- MapReduce的架構體系,這裡使用MapReduceV2,即YARN --> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>
- 配置yarn-site.xml檔案
<!-- NodeManager上執行的附屬服務。需配置成mapreduce_shuffle,才可執行MapReduce程式 --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> <!-- 啟用RM的高可用 --> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> <!-- YARN對外提供的服務的id --> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> <!-- 實現RM高可用的節點id --> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>node01</value> <!-- rm1對應的真實節點 --> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>node02</value> <!-- rm2對應的真實節點 --> </property> <!--配置三臺zookeeper的位置資訊 --> <property> <name>yarn.resourcemanager.zk-address</name> <value>node02:2181,node03:2181,node04:2181</value> </property>
- 將配置完成的hadoop安裝包傳送到其他節點,覆蓋之前的安裝包,也可以先刪除之前的安裝包
scp -r hadoop-2.6.5 [email protected]:/opt/zgl/
scp -r hadoop-2.6.5 [email protected]:/opt/zgl
scp -r hadoop-2.6.5 [email protected]:/opt/zgl/
- 在node01 或 node02 上啟動HDFS
start-dfs.sh
- 在node01 或 node02 上啟動YARN,這裡我們在node01上啟動
satrt-yarn.sh
因為 HDFS 和 YARN 都是hadoop的元件之一,這裡步驟6,7可以用 start-sll.sh
- 啟動後使用jps檢視,成功的話node01節點上會啟動ResourceManager程序,node02,node03,node04上會啟動NodeManager程序。
[[email protected] ~]# jps
2390 NameNode
2726 DFSZKFailoverController
3111 Jps
2839 ResourceManager
2584 JournalNode
[[email protected] ~]# jps
2834 Jps
2706 NodeManager
2487 JournalNode
2393 DataNode
2571 DFSZKFailoverController
2268 QuorumPeerMain
2333 NameNode
- 這裡我們會發現Node02上並沒有啟動standby狀態的RM,這是因為系統預設就是不啟動的,這裡我們可以手動將其啟動
yarn-daemon.sh start resourcemanager
[[email protected] ~]# jps
3008 ResourceManager
2706 NodeManager
2487 JournalNode
2393 DataNode
2571 DFSZKFailoverController
2268 QuorumPeerMain
2333 NameNode
3055 Jps
- YARN也提供Web UI來顯示job相關資訊,瀏覽器輸入node01:8088。後面我們執行WordCount時,就可以在這裡檢視job狀態
eclipse配置
開啟Windows->Show View中的Map/Reduce Locations,右鍵選擇Edit Hadoop location… 在下圖對MapReduce進行配置(確保node02上已啟動RM) 新建一個java專案,將hadoop安裝包中share\hadoop目錄下 common,hdfs,tools,yarn,mapreduce五個資料夾中的jar包及其下lib資料夾中的jar包整合匯入專案並build path
將mapred-site.xml,yarn-site.xml檔案拷貝到source資料夾中 接下來,我們就可以開始寫程式碼了
WordCount
我們先來回顧一下MapReduce處理大資料集的過程 做一些準備工作 首先我們在本地建立兩個文字檔案file1.txt和file2.txt,使file1.txt內容為"Hello World\nBye World",file2.txt的內容為"Hello Hadoop\nBye Hadoop"。 在HDFS 上建立輸入資料夾 input,上傳本地檔案到叢集的input 目錄下
再來分析一下WordCount的執行過程
- 將檔案拆分成splits,由於測試用的檔案較小,所以我們寫兩個檔案,每個檔案為一個split,並將檔案按行分割形成<key,value>對。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所佔的位元組數
- 將分割好的<key,value>對交給使用者定義的map方法進行處理,生成新的<k2,v2>對
- 得到map方法輸出的<key,value>對後,Mapper會將它們按照key值進行排序分組,並執行Combine,將key至相同value值累加,得到Mapper的最終輸出結果
- Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的<k3,v3>對,並作為WordCount的輸出結果 分析完執行過程,我們來看一下程式碼實現。 要程式設計實現MapReduce計算,只需要繼承Mapper類實現其map()方法,繼承Reduce類實現其reduce()方法,並在main()函式中對job進行設定
序列化 因為HDFS涉及到序列化的問題,Hadoop的基本資料型別都實現了一個Writable介面,而實現了這個介面的型別都支援序列化。在Mapper類和Reducer類中都使用Hadoop自帶的基本資料型別
map()
/**
* @param KEYIN
* →k1 表示每一行的起始位置(偏移量offset) LongWritable
* @param VALUEIN
* →v1 表示每一行的文字內容 Text
* @param KEYOUT
* →k2 表示每一行中的每個單詞 Text
* @param VALUEOUT
* →v2 表示每一行中的每個單詞的出現次數,固定值為1 IntWritable
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text myKey = new Text(); //設定Text型別的物件,用來封裝KEYOUT,即k2
IntWritable myValue = new IntWritable(1); //設定IntWritable型別的物件並固定其值為1,即v2
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(key+"=========="+value); //輸出<k1,v1>值,這裡的key為offset
//使用hadoop自帶的工具類StringUtil進行分詞(按空格進行切割),相當於value.toString().split(" ")
String[] words = StringUtils.split(value.toString(), ' ');
//遍歷分完的詞,即k2,產生<k2,v2>,使用context進行輸出。context是個上下文物件
for (String word : words) {
myKey.set(word);
context.write(myKey,myValue);
}
}
}
reduce()
/**
* KEYIN 即K2 表示行中出現的單詞 Text
* VALUEIN 即V2 表示出現的單詞的次數 IntWritable
* KEYOUT 即K3 表示行中出現的不同單詞 Text
* VALUEOUT 即V3 表示行中出現的不同單詞的總次數 IntWritable
*/
public class WCReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
//因為reduce處理的是“相同”key的一組資料,這裡的key是真的相同,所以只要遍歷v2累加計數即可
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
//輸出<k3,v3>
context.write(key, new IntWritable(sum));
}
}
main()
public class WC {
public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
//讀取MapReduce配置資訊,包括HDFS。設定為true則從本地專案source資料夾中讀取配置檔案
Configuration conf = new Configuration(true);
//構建一個job並指定job名稱
Job job = Job.getInstance(conf,"WordCount");
//設定當前main函式所在類
job.setJarByClass(WC.class);
//設定本地jar包位置 (第三種執行模式使用,另兩種不用配置。執行模式在下面介紹)
job.setJar("d:/mapreduce/wc.jar");
//設定輸入路徑 args[0]表示引數手動輸入
FileInputFormat.setInputPaths(job, args[0]);
//設定輸出路徑 這裡路徑寫死,先判斷路徑下是否有檔案,有則刪除
Path outputPath = new Path("/output/wordcount");
FileSystem fs = outputPath.getFileSystem(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
FileOutputFormat.setOutputPath(job, outputPath);
//設定Map class
job.setMapperClass(WCMapper.class);
//設定map輸出key、value的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定reduce class
job.setReducerClass(WCReduce.class);
//設定reduce輸出key、value的型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定reduce task的個數
job.setNumReduceTasks(2);
//提交作業
job.waitForCompletion(true);
}
}
main函式中主要做了如下幾件事: 一是構建job,指定main函式所在類,指定輸入、輸出目錄; 二是指定自定義的Mapper類和Reducer類及其對應輸入輸出key,value的型別; 三是提交作業
接下來我們就可以執行程式碼,執行前我們先來看幾種執行模式
執行模式
1、local(在本地的eclipse上啟動多個執行緒來模擬map task,reduce task執行,並未啟動叢集,所以YARN的job web UI頁面不會有任務顯示) ,用於測試環境
修改mapred-site.xml 中的mapreduce.framework.name
,設定為local
<property>
<name>mapreduce.framework.name</name>
<value>local</value>
</property>
2、提交到叢集中執行,用於生產環境 在本地將程式碼打成jar包,提交到叢集。在叢集上執行hadoop jar + jar包路徑 + main()所在類的全類名 + 引數執行
[[email protected] mapreduce]# ls
wc.jar
[[email protected] mapreduce]# hadoop jar ./wc.jar com.hpe.mr.wc.WC /input/*
3、在本機上的eclipse中直接提交任務到叢集中執行,這裡我們使用這種方式 (1) 修改配置檔案 mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.app-submission.cross-platform</name>
<value>true</value> <!-- 跨平臺提交開啟 -->
</property>
(2) 將本地Application打成jar包,放在window下某一個位置,這裡放在D盤的mapreduce資料夾中
(3)右擊專案,Run as --> Run Configurations
(4)點選Arguments,在Program arguments中填寫引數,即我們在main()中寫的args[0]的值,點選執行
(5)我們在console視窗可以看到我們在map中定義的<k1,v1>的輸出
0==========hello hadoop
13==========bye hadoop
0==========hello world
12==========bye world`
(6)在node01:8088頁面可以看到任務狀態 (7)在HDFS上檢視output/wordcount 因為在main()中我們設定啟動兩個reduce task,所以最後輸出兩個檔案part-r-00000和part-r-00001。 這裡 bye world , world後面我不小心多加了個 ` ,所以最後結果是這樣