1. 程式人生 > >Hadoop: MapReduce2的幾個基本示例

Hadoop: MapReduce2的幾個基本示例

1) WordCount 

這個就不多說了,滿大街都是,網上有幾篇對WordCount的詳細分析

這二篇都寫得不錯, 特別幾張圖畫得很清晰

2) 去重處理(Distinct)

類似於db中的select distinct(x) from table , 去重處理甚至比WordCount還要簡單,假如我們要對以下檔案的內容做去重處理(注:該檔案也是後面幾個示例的輸入引數)

複製程式碼
2
8
8
3
2
3
5
3
0
2
7
複製程式碼

基本上啥也不用做,在map階段,把每一行的值當成key分發下去,然後在reduce階段回收上來就可以了.

注:裡面用到了一個自己寫的類HDFSUtil,可以在 

hadoop: hdfs API示例 一文中找到.

原理:map階段完成後,在reduce開始之前,會有一個combine的過程,相同的key值會自動合併,所以自然而然的就去掉了重複.

複製程式碼
 1 package yjmyzz.mr;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.NullWritable;
 6 import org.apache.hadoop.io.Text;
7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 import org.apache.hadoop.util.GenericOptionsParser;
13 14 import yjmyzz.util.HDFSUtil; 15 16 import java.io.IOException; 17 18 19 public class RemoveDup { 20 21 public static class RemoveDupMapper 22 extends Mapper<Object, Text, Text, NullWritable> { 23 24 public void map(Object key, Text value, Context context) 25 throws IOException, InterruptedException { 26 context.write(value, NullWritable.get()); 27 //System.out.println("map: key=" + key + ",value=" + value); 28 } 29 30 } 31 32 public static class RemoveDupReducer extends Reducer<Text, NullWritable, Text, NullWritable> { 33 public void reduce(Text key, Iterable<NullWritable> values, Context context) 34 throws IOException, InterruptedException { 35 context.write(key, NullWritable.get()); 36 //System.out.println("reduce: key=" + key); 37 } 38 } 39 40 public static void main(String[] args) throws Exception { 41 Configuration conf = new Configuration(); 42 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 43 if (otherArgs.length < 2) { 44 System.err.println("Usage: RemoveDup <in> [<in>...] <out>"); 45 System.exit(2); 46 } 47 48 //刪除輸出目錄(可選,省得多次執行時,總是報OUTPUT目錄已存在) 49 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 50 51 Job job = Job.getInstance(conf, "RemoveDup"); 52 job.setJarByClass(RemoveDup.class); 53 job.setMapperClass(RemoveDupMapper.class); 54 job.setCombinerClass(RemoveDupReducer.class); 55 job.setReducerClass(RemoveDupReducer.class); 56 job.setOutputKeyClass(Text.class); 57 job.setOutputValueClass(NullWritable.class); 58 59 60 for (int i = 0; i < otherArgs.length - 1; ++i) { 61 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 62 } 63 FileOutputFormat.setOutputPath(job, 64 new Path(otherArgs[otherArgs.length - 1])); 65 System.exit(job.waitForCompletion(true) ? 0 : 1); 66 } 67 68 69 }
複製程式碼

輸出:

複製程式碼
0
2
3
5
7
8
複製程式碼

3) 記錄計數(Count)

這個跟WordCount略有不同,類似於Select Count(*) from tables的效果,程式碼也超級簡單,直接拿WordCount改一改就行了

複製程式碼
 1 package yjmyzz.mr;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.util.GenericOptionsParser;
13 import yjmyzz.util.HDFSUtil;
14 
15 import java.io.IOException;
16 import java.util.StringTokenizer;
17 
18 
19 public class RowCount {
20 
21     public static class RowCountMapper
22             extends Mapper<Object, Text, Text, IntWritable> {
23 
24         private final static IntWritable one = new IntWritable(1);
25         private final  static Text countKey = new Text("count");
26 
27         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
28                 context.write(countKey, one);
29         }
30     }
31 
32     public static class RowCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
33         private IntWritable result = new IntWritable();
34 
35         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
36             int sum = 0;
37             for (IntWritable val : values) {
38                 sum += val.get();
39             }
40             result.set(sum);
41             context.write(key, result);
42         }
43     }
44 
45     public static void main(String[] args) throws Exception {
46         Configuration conf = new Configuration();
47         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
48         if (otherArgs.length < 2) {
49             System.err.println("Usage: RowCount <in> [<in>...] <out>");
50             System.exit(2);
51         }
52         //刪除輸出目錄(可選)
53         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
54 
55         Job job = Job.getInstance(conf, "word count");
56         job.setJarByClass(RowCount.class);
57         job.setMapperClass(RowCountMapper.class);
58         job.setCombinerClass(RowCountReducer.class);
59         job.setReducerClass(RowCountReducer.class);
60         job.setOutputKeyClass(Text.class);
61         job.setOutputValueClass(IntWritable.class);
62         for (int i = 0; i < otherArgs.length - 1; ++i) {
63             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
64         }
65         FileOutputFormat.setOutputPath(job,
66                 new Path(otherArgs[otherArgs.length - 1]));
67         System.exit(job.waitForCompletion(true) ? 0 : 1);
68     }
69 
70 
71 }
複製程式碼

輸出: count 11

注:如果只想輸出一個數字,不需要"count"這個key,可以改進一下:

複製程式碼
 1 package yjmyzz.mr;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.NullWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 import org.apache.hadoop.util.GenericOptionsParser;
14 import yjmyzz.util.HDFSUtil;
15 
16 import java.io.IOException;
17 
18 
19 public class RowCount2 {
20 
21     public static class RowCount2Mapper
22             extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
23 
24         public long count = 0;
25 
26         public void map(LongWritable key, Text value, Context context)
27                 throws IOException, InterruptedException {
28             count += 1;
29         }
30 
31         protected void cleanup(Context context) throws IOException, InterruptedException {
32             context.write(new LongWritable(count), NullWritable.get());
33         }
34 
35     }
36 
37     public static class RowCount2Reducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
38 
39         public long count = 0;
40 
41         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context)
42                 throws IOException, InterruptedException {
43             count += key.get();
44         }
45 
46 
47         protected void cleanup(Context context) throws IOException, InterruptedException {
48             context.write(new LongWritable(count), NullWritable.get());
49         }
50 
51     }
52 
53     public static void main(String[] args) throws Exception {
54         Configuration conf = new Configuration();
55         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
56         if (otherArgs.length < 2) {
57             System.err.println("Usage: FindMax <in> [<in>...] <out>");
58             System.exit(2);
59         }
60 
61         //刪除輸出目錄(可選,省得多次執行時,總是報OUTPUT目錄已存在)
62         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
63 
64         Job job = Job.getInstance(conf, "RowCount2");
65         job.setJarByClass(RowCount2.class);
66         job.setMapperClass(RowCount2Mapper.class);
67         job.setCombinerClass(RowCount2Reducer.class);
68         job.setReducerClass(RowCount2Reducer.class);
69         job.setOutputKeyClass(LongWritable.class);
70         job.setOutputValueClass(NullWritable.class);
71 
72         for (int i = 0; i < otherArgs.length - 1; ++i) {
73             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
74         }
75         FileOutputFormat.setOutputPath(job,
76                 new Path(otherArgs[otherArgs.length - 1]));
77         System.exit(job.waitForCompletion(true) ? 0 : 1);
78     }
79 
80 
81 }
複製程式碼

這樣輸出結果就只有一個數字11了.

注意: 這裡context.write(xxx)只能寫在cleanup方法中, 該方法在Mapper和Reducer介面中都有, 在map方法及reduce方法執行完後,會觸發cleanup方法. 大家可以嘗試下,把context.write(xxx)寫在map和reduce方法中試試看,結果會出現多行記錄,而不是預期的僅1個數字.

4)求最大值(Max)

相關推薦

Hadoop: MapReduce2基本示例

1) WordCount  這個就不多說了,滿大街都是,網上有幾篇對WordCount的詳細分析 這二篇都寫得不錯, 特別幾張圖畫得很清晰 2) 去重處理(Distinct) 類似於db中的select distinct(x) fr

duilib中將xml封裝為控制元件簡單示例(簡單自定義控制元件,封裝基本控制元件合為1自定義控制元件)

使用duilib的時候,難免會有這樣的需求: 某一塊Container(Layout)以及裡面的佈局需要重複用,不想每次都複製貼上這麼多,要不然xml太大了; 通過繼承來自定義一個控制元件,比如CButtonUIEx之類的,想讓他像button一樣在xml中被識別; xml裡面的東西

軟件測試的基本原則

其中 排除 多次 排列 參與 基於 所有 業務 相關 我一直認為軟件測試是一件很有原則的工作,這個原則是最重要的,方法都應該在原則指導下進行。軟件測試的基本原則是站在用戶 的角度,對產品進行全面測試,盡早、盡可能多地發現 Bug,並負責跟蹤和分析產品中的問題,對不足之處提出

hadoop常用命令

hadoop官方文檔:http://hadoop.apache.org/docs/r1.2.1/file_system_shell.html1、登錄主節點,切換到hdfs用戶[[email protected]/* */~]#su - hdfs2、列出當前目錄有哪些子目錄,有哪些文件[[email

linux 容易混淆的基本命令

linux字符串比較:str01 = str02 兩者比較相同結果為真str01 != str02 兩者比較不同結果為真-n str 不為空為真-z str 結果為null 為真算術比較:a1 -eq a2 等於 a1 -ne a2 不等於a1 -gt a2 大於a1 -ge a2 大於等於a1 -

WebLogic(12C)——基本概念

分配 domain 基本 機器 eve 不依賴 影響 right term 轉http://blog.csdn.net/hanxuemin12345/article/details/46287597 目錄(?)[-] 域Domain 服務器Server 機器Mac

圖像處理中基本的處理方法c#代碼實現

位圖 edi windows系統 process 圖案 電視 間接 做了 同步 圖像是人類獲取和交換信息的主要來源,因此,圖像處理的應用領域必然涉及到人類生活和工作的方方面面。隨著人類活動範圍的不斷擴大,圖像處理的應用領域也將隨之不斷擴大。(1)航天和航空技術方面的應用 數

PHP時間戳的問題示例

北京時間 content date def 當前 時間戳 day 問題 獲得 <?php header("Content-type:text/html;charset=utf-8"); //設置北京時間為默認時區 date_default

計量經濟與時間序列_時間序列分析的基本概念(自相關函數,偏自相關函數等)

sig 永不 均值 blog 那種 屬於 class 觀察 自相關 1. 在時間序列分析中, 數學模型是什麽?數學公式又是什麽?數學推導過程又是什麽?... ...   一句話:用數學公式後者符號來表示現實存在的意義。數學是“萬金油”的科學,它是作為工作和分析方法運用到某

圖書管理系統的簡單示例

活動圖 body log 業務 圖書管理 活動 class pos 技術 圖書館業務用例圖

【Linux】linux常用基本命令 小白專區簡單易懂

ls -l 17. 命令補全 linu 刪除目錄 poweroff 環境 密碼 family 顯示日期的命令 date顯示日歷的命令 cal -s, --hctosys以硬件時鐘為準,校正系統時鐘hwclock,clock:顯示硬件時鐘 -w, --sy

使用Eclipse搭建JavaWeb開發環境的基本問題

resources user ace 程序開發 nav sep webapp 視圖 pre Eclipse搭建JavaWeb開發環境 eclipse是一個用於java程序開發的ide軟件,tomcat是一個運行javaweb應用的服務器軟件,使用eclipse開發ja

linux基本命令的用法

一. vim命令 .1.刪除當前游標所在行到檔案結尾所以內容   dG。 2.刪除一行   刪除一整行內容使用"dd"命令。刪除後下面的行會移上來填補空缺。 3.刪除換行符   在Vim中你可以把兩行合併為一行,也就是說兩行之間的換行符被刪除了:

Spring 事務——事務的基本屬性

事務屬性 上文的例子中,在宣告事務時,用到了@Transactional(readOnly = false, propagation = Propagation.REQUIRED)。 中間的引數readOnly,propagation我們稱為事務屬性。它就是對事務的基本配置。事務屬性有五

關於“知識共享”的基本概念

關於“知識共享”的幾個基本概念 目前,對於我們向全國高校投放魯賓遜微積分教材電子版的行動與實行“知識共享”許可協議之間的關係,有人仍然缺乏理解。 魯賓遜微積分教材電子版扉頁上印有“知識共享”標識,所以,投放行動(發行)是有根據的。     本文附件是一份“知識共享”文字

numpy基本用法

INPUT: print(np.empty((2,3))) print(np.zeros((2,3))) print(np.ones((2,3))) print(np.eye(2)) print(np.random.random((2,3))) OUTPUT: [[ 2.67

從查詢藍芽裝置到能夠相互通訊要經過基本步驟(本機做為伺服器)

從查詢藍芽裝置到能夠相互通訊要經過幾個基本步驟(本機做為伺服器): 1.設定許可權 在manifest中配置 <uses-permission android:name=“android.permission.BLUETOOTH”/+> <uses-permissi

kafka中的基本概念

Kafka架構是由producer(訊息生產者)、consumer(訊息消費者)、borker(kafka叢集的server,負責處理訊息讀、寫請求,儲存訊息,在kafka cluster這一層這裡,其實裡面是有很多個broker)、topic(訊息佇列/分類相當於佇列,裡面有生產者和消費者模型)、

圖靈機,Random Access Machine,和演算法的基本要素

圖靈機 學過計算機課程的人,大概第一節課老師就會講圖靈,圖靈也被成為計算機之父。他是英國電腦科學家、數學家、邏輯學家、密碼分析學家和理論生物學家。他還提出了一種數學模型,圖靈機模型;圖靈機(Turing Machine,TM)又稱確定型圖靈機,它是一種抽象的計

筆記-git中的基本概念

jpg pre 父節點 sha 推薦一個 必須 解釋 checkout 形式 這篇筆記旨在理解幾個核心的git基本概念,如果對git了解較少,可以先看git基本教程。 這裏推薦一個:廖雪峰-Git教程 commit(提交) 與 branch(分支) 版本號 commit是