02.Mapreduce例項——求平均值
02.Mapreduce例項——求平均值
實驗目的
1.準確理解Mapreduce求平均值的設計原理
2.熟練掌握Mapreduce求平均值程式的編寫
3.學會編寫Mapreduce求平均值程式程式碼解決問題
實驗原理
求平均數是MapReduce比較常見的演算法,求平均數的演算法也比較簡單,一種思路是Map端讀取資料,在資料輸入到Reduce之前先經過shuffle,將map函式輸出的key值相同的所有的value值形成一個集合value-list,然後將輸入到Reduce端,Reduce端彙總並且統計記錄數,然後作商即可。具體原理如下圖所示:
實驗環境
Linux Ubuntu 14.04
jdk-7u75-linux-x64
hadoop-2.6.0-cdh5.4.5
hadoop-2.6.0-eclipse-cdh5.4.5.jar
eclipse-java-juno-SR2-linux-gtk-x86_64
實驗內容
現有某電商關於商品點選情況的資料檔案,表名為goods_click,包含兩個欄位(商品分類,商品點選次數),分隔符“\t”,由於資料很大,所以為了方便統計我們只擷取它的一部分資料,內容如下:
- 商品分類商品點選次數
- 521275
- 5212093
- 5209293
- 5213238
- 52006462
- 5210928
- 5210943
- 521320
10. 5213234
11. 521329
12. 5213230
13. 5213245
14. 5213224
15. 520092615
16. 5213225
17. 5209013
18. 521326
19. 521360
20. 5209010
21. 52024347
要求使用mapreduce統計出每類商品的平均點選次數。
結果資料如下:
- 商品分類商品平均點選次數
- 52006462
- 520092615
- 52024347
- 5209011
- 5209293
- 5210935
- 5212093
- 521275
10. 5213223
11. 521360
實驗步驟
1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop。
- cd/apps/hadoop/sbin
- ./start-all.sh
2.在Linux本地新建/data/mapreduce4目錄。
- mkdir-p/data/mapreduce4
3.在Linux中切換到/data/mapreduce4目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce4/goods_click網址上下載文字檔案goods_click。
- cd/data/mapreduce4
- wgethttp://192.168.1.100:60000/allfiles/mapreduce4/goods_click
然後在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce4/hadoop2lib.tar.gz網址上下載專案用到的依賴包。
- wgethttp://192.168.1.100:60000/allfiles/mapreduce4/hadoop2lib.tar.gz
將hadoop2lib.tar.gz解壓到當前目錄下。
- tarzxvfhadoop2lib.tar.gz
4.首先在HDFS上新建/mymapreduce4/in目錄,然後將Linux本地/data/mapreduce4目錄下的goods_click檔案匯入到HDFS的/mymapreduce4/in目錄中。
- hadoopfs-mkdir-p/mymapreduce4/in
- hadoopfs-put/data/mapreduce4/goods_click/mymapreduce4/in
5.新建Java Project專案,專案名為mapreduce4。
在mapreduce4專案下新建包,包名為mapreduce。
在mapreduce包下新建類,類名為MyAverage。
6.新增專案所需依賴的jar包,右鍵點選mapreduce4,新建一個資料夾,名為hadoop2lib,用於存放專案所需的jar包。
將/data/mapreduce4目錄下,hadoop2lib目錄中的jar包,拷貝到eclipse中mapreduce4專案的hadoop2lib目錄下。
選中hadoop2lib目錄下所有jar包,並新增到Build Path中。
7.編寫Java程式碼並描述其設計思路。
Mapper程式碼
- publicstaticclassMapextendsMapper<Object,Text,Text,IntWritable>{
- privatestaticTextnewKey=newText();
- //實現map函式
- publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
- //將輸入的純文字檔案的資料轉化成String
- Stringline=value.toString();
- System.out.println(line);
- Stringarr[]=line.split("\t");
- newKey.set(arr[0]);
- 10. intclick=Integer.parseInt(arr[1]);
- 11. context.write(newKey,newIntWritable(click));
- 12. }
- 13. }
map端在採用Hadoop的預設輸入方式之後,將輸入的value值通過split()方法截取出來,我們把擷取的商品點選次數字段轉化為IntWritable型別並將其設定為value,把商品分類欄位設定為key,然後直接輸出key/value的值。
Reducer程式碼
- publicstaticclassReduceextendsReducer<Text,IntWritable,Text,IntWritable>{
- //實現reduce函式
- publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{
- intnum=0;
- intcount=0;
- for(IntWritableval:values){
- num+=val.get();//每個元素求和num
- count++;//統計元素的次數count
- }
- 10. intavg=num/count;//計算平均數
- 11.
- 12. context.write(key,newIntWritable(avg));
- 13. }
- 14. }
map的輸出<key,value>經過shuffle過程整合<key,values>鍵值對,然後將<key,values>鍵值對交給reduce。reduce端接收到values之後,將輸入的key直接複製給輸出的key,將values通過for迴圈把裡面的每個元素求和num並統計元素的次數count,然後用num除以count 得到平均值avg,將avg設定為value,最後直接輸出<key,value>就可以了。
完整程式碼
- packagemapreduce;
- importjava.io.IOException;
- importorg.apache.hadoop.conf.Configuration;
- importorg.apache.hadoop.fs.Path;
- importorg.apache.hadoop.io.IntWritable;
- importorg.apache.hadoop.io.NullWritable;
- importorg.apache.hadoop.io.Text;
- importorg.apache.hadoop.mapreduce.Job;
- importorg.apache.hadoop.mapreduce.Mapper;
10. importorg.apache.hadoop.mapreduce.Reducer;
11. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12. importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
13. importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14. importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
15. publicclassMyAverage{
- 16. publicstaticclassMapextendsMapper<Object,Text,Text,IntWritable>{
- 17. privatestaticTextnewKey=newText();
- 18. publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
- 19. Stringline=value.toString();
- 20. System.out.println(line);
- 21. Stringarr[]=line.split("\t");
- 22. newKey.set(arr[0]);
- 23. intclick=Integer.parseInt(arr[1]);
- 24. context.write(newKey,newIntWritable(click));
- 25. }
- 26. }
- 27. publicstaticclassReduceextendsReducer<Text,IntWritable,Text,IntWritable>{
- 28. publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{
- 29. intnum=0;
- 30. intcount=0;
- 31. for(IntWritableval:values){
- 32. num+=val.get();
- 33. count++;
- 34. }
- 35. intavg=num/count;
- 36. context.write(key,newIntWritable(avg));
- 37. }
- 38. }
- 39. publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
- 40. Configurationconf=newConfiguration();
- 41. System.out.println("start");
- 42. Jobjob=newJob(conf,"MyAverage");
- 43. job.setJarByClass(MyAverage.class);
- 44. job.setMapperClass(Map.class);
- 45. job.setReducerClass(Reduce.class);
- 46. job.setOutputKeyClass(Text.class);
- 47. job.setOutputValueClass(IntWritable.class);
- 48. job.setInputFormatClass(TextInputFormat.class);
- 49. job.setOutputFormatClass(TextOutputFormat.class);
- 50. Pathin=newPath("hdfs://localhost:9000/mymapreduce4/in/goods_click");
- 51. Pathout=newPath("hdfs://localhost:9000/mymapreduce4/out");
- 52. FileInputFormat.addInputPath(job,in);
- 53. FileOutputFormat.setOutputPath(job,out);
- 54. System.exit(job.waitForCompletion(true)?0:1);
- 55.
- 56. }
- 57. }
8.在MyAverage類檔案中,右鍵並點選=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。
9.待執行完畢後,進入命令模式下,在HDFS上/mymapreduce4/out中檢視實驗結果。
- hadoopfs-ls/mymapreduce4/out
- hadoopfs-cat/mymapreduce4/out/part-r-00000