1. 程式人生 > 其它 >Mapreduce例項——去重

Mapreduce例項——去重

01.Mapreduce例項——去重

實驗目的

1.準確理解MapReduce去重的設計原理

2.熟練掌握MapReduce去重的程式編寫

3.學會自己編寫MapReduce去重程式碼解決實際問題

實驗原理

“資料去重”主要是為了掌握和利用並行化思想來對資料進行有意義的篩選。統計大資料集上的資料種類個數、從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及資料去重。

資料去重的最終目標是讓原始資料中出現次數超過一次的資料在輸出檔案中只出現一次。在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚整合<key,value-list>後交給reduce。我們自然而然會想到將同一個資料的所有記錄都交給一臺reduce機器,無論這個資料出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以資料作為key,而對value-list則沒有要求(可以設定為空)。當reduce接收到一個<key,value-list>時就直接將輸入的key複製到輸出的key中,並將value設定成空值,然後輸出<key,value>。

MaprReduce去重流程如下圖所示:

實驗環境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

實驗內容

現有一個某電商網站的資料檔案,名為buyer_favorite1,記錄了使用者收藏的商品以及收藏的日期,檔案buyer_favorite1中包含(使用者id,商品id,收藏日期)三個欄位,資料內容以“\t”分割,由於資料很大,所以為了方便統計我們只擷取它的一部分資料,內容如下:

  1. 使用者id商品id收藏日期
  2. 1018110004812010-04-0416:54:31
  3. 2000110015972010-04-0715:07:52
  4. 2000110015602010-04-0715:08:27
  5. 2004210013682010-04-0808:20:30
  6. 2006710020612010-04-0816:45:33
  7. 2005610032892010-04-1210:50:55
  8. 2005610032902010-04-1211:57:35
  9. 2005610032922010-04-1212:05:29
  10. 2005410024202010-04-1415:24:12
  11. 2005510016792010-04-1419:46:04
  12. 2005410106752010-04-1415:23:53
  13. 2005410024292010-04-1417:52:45
  14. 2007610024272010-04-1419:35:39
  15. 2005410033262010-04-2012:54:44
  16. 2005610024202010-04-1511:24:49
  17. 2006410024222010-04-1511:35:54
  18. 2005610030662010-04-1511:43:01
  19. 2005610030552010-04-1511:43:06
  20. 2005610101832010-04-1511:45:24
  21. 2005610024222010-04-1511:45:49
  22. 2005610031002010-04-1511:45:54
  23. 2005610030942010-04-1511:45:57
  24. 2005610030642010-04-1511:46:04
  25. 2005610101782010-04-1516:15:20
  26. 2007610031012010-04-1516:37:27
  27. 2007610031032010-04-1516:37:05
  28. 2007610031002010-04-1516:37:18
  29. 2007610030662010-04-1516:37:31
  30. 2005410031032010-04-1516:40:14
  31. 2005410031002010-04-1516:40:16

要求用Java編寫MapReduce程式,根據商品id進行去重,統計使用者收藏商品中都有哪些商品被收藏。結果資料如下:

  1. 商品id
  2. 1000481
  3. 1001368
  4. 1001560
  5. 1001597
  6. 1001679
  7. 1002061
  8. 1002420
  9. 1002422
  10. 1002427
  11. 1002429
  12. 1003055
  13. 1003064
  14. 1003066
  15. 1003094
  16. 1003100
  17. 1003101
  18. 1003103
  19. 1003289
  20. 1003290
  21. 1003292
  22. 1003326
  23. 1010178
  24. 1010183
  25. 1010675

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop。

  1. cd/apps/hadoop/sbin
  2. ./start-all.sh

2.在Linux本地新建/data/mapreduce2目錄。

  1. mkdir-p/data/mapreduce2

3. (自行生成文字檔案,放到個人指定資料夾下)在Linux中切換到/data/mapreduce2目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce2/buyer_favorite1網址上下載文字檔案buyer_favorite1。

  1. cd/data/mapreduce2
  2. wgethttp://192.168.1.100:60000/allfiles/mapreduce2/buyer_favorite1

然後在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce2/hadoop2lib.tar.gz網址上下載專案用到的依賴包。

  1. wgethttp://192.168.1.100:60000/allfiles/mapreduce2/hadoop2lib.tar.gz

將hadoop2lib.tar.gz解壓到當前目錄下。

  1. tarzxvfhadoop2lib.tar.gz

4.首先在HDFS上新建/mymapreduce2/in目錄,然後將Linux本地/data/mapreduce2目錄下的buyer_favorite1檔案匯入到HDFS的/mymapreduce2/in目錄中。

  1. hadoopfs-mkdir-p/mymapreduce2/in
  2. hadoopfs-put/data/mapreduce2/buyer_favorite1/mymapreduce2/in

5.新建Java Project專案,專案名為mapreduce2。

在mapreduce2專案下新建包,包名為mapreduce。

在mapreduce包下新建類,類名為Filter。

6.新增專案所需依賴的jar包

右鍵專案,新建一個資料夾,命名為:hadoop2lib,用於存放專案所需的jar包。

將/data/mapreduce2目錄下,hadoop2lib目錄中的jar包,拷貝到eclipse中mapreduce2專案的hadoop2lib目錄下。

選中所有專案hadoop2lib目錄下所有jar包,並新增到Build Path中。

7.編寫程式程式碼,並描述其思路

資料去重的目的是讓原始資料中出現次數超過一次的資料在輸出檔案中只出現一次。我們自然想到將相同key值的所有value記錄交到一臺reduce機器,讓其無論這個資料出現多少次,最終結果只輸出一次。具體就是reduce的輸出應該以資料作為key,而對value-list沒有要求,當reduce接收到一個時,就直接將key複製到輸出的key中,將value設定為空。

Map程式碼

  1. publicstaticclassMapextendsMapper<Object,Text,Text,NullWritable>
  2. //map將輸入中的value複製到輸出資料的key上,並直接輸出
  3. {
  4. privatestaticTextnewKey=newText();//從輸入中得到的每行的資料的型別
  5. publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException
  6. //實現map函式
  7. {//獲取並輸出每一次的處理過程
  8. Stringline=value.toString();
  9. System.out.println(line);
  10. Stringarr[]=line.split("\t");
  11. newKey.set(arr[1]);
  12. context.write(newKey,NullWritable.get());
  13. System.out.println(newKey);
  14. }
  15. }

map階段採用Hadoop的預設的作業輸入方式,把輸入的value用split()方法擷取,截取出的商品id欄位設定為key,設定value為空,然後直接輸出<key,value>。

reduce端程式碼

  1. publicstaticclassReduceextendsReducer<Text,NullWritable,Text,NullWritable>{
  2. publicvoidreduce(Textkey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException
  3. //實現reduce函式
  4. {
  5. context.write(key,NullWritable.get());//獲取並輸出每一次的處理過程
  6. }
  7. }

map輸出的<key,value>鍵值對經過shuffle過程,聚成<key,value-list>後,會交給reduce函式。reduce函式,不管每個key 有多少個value,它直接將輸入的賦值給輸出的key,將輸出的value設定為空,然後輸出<key,value>就可以了。

完整程式碼

  1. packagemapreduce;
  2. importjava.io.IOException;
  3. importorg.apache.hadoop.conf.Configuration;
  4. importorg.apache.hadoop.fs.Path;
  5. importorg.apache.hadoop.io.IntWritable;
  6. importorg.apache.hadoop.io.NullWritable;
  7. importorg.apache.hadoop.io.Text;
  8. importorg.apache.hadoop.mapreduce.Job;
  9. importorg.apache.hadoop.mapreduce.Mapper;
  10. importorg.apache.hadoop.mapreduce.Reducer;
  11. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13. importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  15. publicclassFilter{
  16. publicstaticclassMapextendsMapper<Object,Text,Text,NullWritable>{
  17. privatestaticTextnewKey=newText();
  18. publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
  19. Stringline=value.toString();
  20. System.out.println(line);
  21. Stringarr[]=line.split("\t");
  22. newKey.set(arr[1]);
  23. context.write(newKey,NullWritable.get());
  24. System.out.println(newKey);
  25. }
  26. }
  27. publicstaticclassReduceextendsReducer<Text,NullWritable,Text,NullWritable>{
  28. publicvoidreduce(Textkey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{
  29. context.write(key,NullWritable.get());
  30. }
  31. }
  32. publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
  33. Configurationconf=newConfiguration();
  34. System.out.println("start");
  35. Jobjob=newJob(conf,"filter");
  36. job.setJarByClass(Filter.class);
  37. job.setMapperClass(Map.class);
  38. job.setReducerClass(Reduce.class);
  39. job.setOutputKeyClass(Text.class);
  40. job.setOutputValueClass(NullWritable.class);
  41. job.setInputFormatClass(TextInputFormat.class);
  42. job.setOutputFormatClass(TextOutputFormat.class);
  43. Pathin=newPath("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");
  44. Pathout=newPath("hdfs://localhost:9000/mymapreduce2/out");
  45. FileInputFormat.addInputPath(job,in);
  46. FileOutputFormat.setOutputPath(job,out);
  47. System.exit(job.waitForCompletion(true)?0:1);
  48. }
  49. }

8.在Filter類檔案中,右鍵並點選=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。

9.待執行完畢後,進入命令模式下,在HDFS中/mymapreduce2/out檢視實驗結果。

  1. hadoopfs-ls/mymapreduce2/out
  2. hadoopfs-cat/mymapreduce2/out/part-r-00000