1. 程式人生 > >mapreduce程式碼示例(借鑑)

mapreduce程式碼示例(借鑑)

1、資料去重

   "資料去重"主要是為了掌握和利用並行化思想來對資料進行有意義篩選統計大資料集上的資料種類個數從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及資料去重。下面就進入這個例項的MapReduce程式設計。

1.1 例項描述

  對資料檔案中的資料進行去重。資料檔案中的每行都是一個數據。

  樣例輸入如下所示:

     1)file1:

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

     2)file2:

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

     樣例輸出如下所示:

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d

1.2 設計思路

  資料去重最終目標是讓原始資料出現次數超過一次資料輸出檔案

出現一次我們自然而然會想到將同一個資料的所有記錄都交給一臺reduce機器,無論這個資料出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以資料作為key,而對value-list則沒有要求。當reduce接收到一個<key,value-list>時就直接將key複製到輸出的key中,並將value設定成空值

  在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚整合<key,value-list>後會交給reduce。所以從設計好的reduce輸入可以反推出map的輸出key應為資料,value任意。繼續反推,map輸出資料的key為資料,而在這個例項中每個資料代表輸入檔案中的一行內容,所以map階段要完成的任務就是在採用Hadoop預設的作業輸入方式之後,將value設定為key,並直接輸出(輸出中的value任意)。map中的結果經過shuffle過程之後交給reduce。reduce階段不會管每個key有多少個value,它直接將輸入的key複製為輸出的key,並輸出就可以了(輸出中的value被設定成空了)。

1.3 程式程式碼

     程式程式碼如下所示:

package com.hebut.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Dedup {

    //map將輸入中的value複製到輸出資料的key上,並直接輸出

    public static class Map extends Mapper<Object,Text,Text,Text>{

        private static Text line=new Text();//每行資料

        //實現map函式

        public void map(Object key,Text value,Context context)

                throws IOException,InterruptedException{

            line=value;

            context.write(line, new Text(""));

        }

    }

    //reduce將輸入中的key複製到輸出資料的key上,並直接輸出

    public static class Reduce extends Reducer<Text,Text,Text,Text>{

        //實現reduce函式

        public void reduce(Text key,Iterable<Text> values,Context context)

                throws IOException,InterruptedException{

            context.write(key, new Text(""));

        }

    }

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        //這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

        String[] ioArgs=new String[]{"dedup_in","dedup_out"};

     String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

     if (otherArgs.length != 2) {

     System.err.println("Usage: Data Deduplication <in> <out>");

     System.exit(2);

     }

     Job job = new Job(conf, "Data Deduplication");

     job.setJarByClass(Dedup.class);

     //設定MapCombineReduce處理類

     job.setMapperClass(Map.class);

     job.setCombinerClass(Reduce.class);

     job.setReducerClass(Reduce.class);

     //設定輸出型別

     job.setOutputKeyClass(Text.class);

     job.setOutputValueClass(Text.class);

     //設定輸入和輸出目錄

     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

     System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

1.4 程式碼結果

     1)準備測試資料

     通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入檔案"dedup_in"資料夾(備註:"dedup_out"不需要建立。)如圖1.4-1所示,已經成功建立。

        

圖1.4-1 建立"dedup_in"                                   圖1.4.2 上傳"file*.txt"

     然後在本地建立兩個txt檔案,通過Eclipse上傳到"/user/hadoop/dedup_in"資料夾中,兩個txt檔案的內容如"例項描述"那兩個檔案一樣。如圖1.4-2所示,成功上傳之後。

     從SecureCRT遠處檢視"Master.Hadoop"的也能證實我們上傳的兩個檔案。

    檢視兩個檔案的內容如圖1.4-3所示:

圖1.4-3 檔案"file*.txt"內容

2)檢視執行結果

     這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"資料夾進行重新整理,這時會發現多出一個"dedup_out"資料夾,且裡面有3個檔案,然後開啟雙其"part-r-00000"檔案,會在Eclipse中間把內容顯示出來。如圖1.4-4所示。

圖1.4-4 執行結果

    此時,你可以對比一下和我們之前預期的結果是否一致。

2、資料排序

  "資料排序"是許多實際任務執行時要完成的第一項工作,比如學生成績評比資料建立索引等。這個例項和資料去重類似,都是原始資料進行初步處理,為進一步的資料操作打好基礎。下面進入這個示例。

2.1 例項描述

    對輸入檔案中資料進行排序。輸入檔案中的每行內容均為一個數字即一個數據。要求在輸出中每行有兩個間隔的數字,其中,第一個代表原始資料在原始資料集中的位次第二個代表原始資料

    樣例輸入

    1)file1:

2

32

654

32

15

756

65223

    2)file2:

5956

22

650

92

    3)file3:

26

54

6

    樣例輸出

1    2

2    6

3    15

4    22

5    26

6    32

7    32

8    54

9    92

10    650

11    654

12    756

13    5956

14    65223

2.2 設計思路

  這個例項僅僅要求對輸入資料進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個預設的排序,而不需要自己再實現具體的排序呢?答案是肯定的。

  但是在使用之前首先需要瞭解它的預設排序規則。它是按照key值進行排序的,如果key為封裝int的IntWritable型別,那麼MapReduce按照數字大小對key排序,如果key為封裝為String的Text型別,那麼MapReduce按照字典順序對字串排序。

  瞭解了這個細節,我們就知道應該使用封裝int的IntWritable型資料結構了。也就是在map中將讀入的資料轉化成IntWritable型,然後作為key值輸出(value任意)。reduce拿到<key,value-list>之後,將輸入的key作為value輸出,並根據value-list元素個數決定輸出的次數。輸出的key(即程式碼中的linenum)是一個全域性變數,它統計當前key的位次。需要注意的是這個程式中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因為使用map和reduce就已經能夠完成任務了。

2.3 程式程式碼

    程式程式碼如下所示:

package com.hebut.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

    //map將輸入中的value化成IntWritable型別,作為輸出的key

    public static class Map extends

        Mapper<Object,Text,IntWritable,IntWritable>{

        private static IntWritable data=new IntWritable();

        //實現map函式

        public void map(Object key,Text value,Context context)

                throws IOException,InterruptedException{

            String line=value.toString();

            data.set(Integer.parseInt(line));

            context.write(data, new IntWritable(1));

        }

    }

    //reduce將輸入中的key複製到輸出資料的key上,

    //然後根據輸入的value-list中元素的個數決定key的輸出次數

    //用全域性linenum來代表key的位次

    public static class Reduce extends

            Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{

        private static IntWritable linenum = new IntWritable(1);

        //實現reduce函式

        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)

                throws IOException,InterruptedException{

            for(IntWritable val:values){

                context.write(linenum, key);

                linenum = new IntWritable(linenum.get()+1);

            }

        }

    }

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        //這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

        String[] ioArgs=new String[]{"sort_in","sort_out"};

     String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

     if (otherArgs.length != 2) {

     System.err.println("Usage: Data Sort <in> <out>");

         System.exit(2);

     }

     Job job = new Job(conf, "Data Sort");

     job.setJarByClass(Sort.class);

     //設定MapReduce處理類

     job.setMapperClass(Map.class);

     job.setReducerClass(Reduce.class);

     //設定輸出型別

     job.setOutputKeyClass(IntWritable.class);

     job.setOutputValueClass(IntWritable.class);

     //設定輸入和輸出目錄

     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

     System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

2.4 程式碼結果

1)準備測試資料

    通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入檔案"sort_in"資料夾(備註:"sort_out"不需要建立。)如圖2.4-1所示,已經成功建立。

              

圖2.4-1 建立"sort_in"                                                  圖2.4.2 上傳"file*.txt"

    然後在本地建立三個txt檔案,通過Eclipse上傳到"/user/hadoop/sort_in"資料夾中,三個txt檔案的內容如"例項描述"那三個檔案一樣。如圖2.4-2所示,成功上傳之後。

    從SecureCRT遠處檢視"Master.Hadoop"的也能證實我們上傳的三個檔案。

檢視兩個檔案的內容如圖2.4-3所示:

圖2.4-3 檔案"file*.txt"內容

2)檢視執行結果

    這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"資料夾進行重新整理,這時會發現多出一個"sort_out"資料夾,且裡面有3個檔案,然後開啟雙其"part-r-00000"檔案,會在Eclipse中間把內容顯示出來。如圖2.4-4所示。

圖2.4-4 執行結果

3、平均成績

    "平均成績"主要目的還是在重溫經典"WordCount"例子,可以說是在基礎上的微變化版,該例項主要就是實現一個計算學生平均成績的例子。

3.1 例項描述

  對輸入檔案中資料進行就算學生平均成績。輸入檔案中的每行內容均為一個學生姓名和他相應的成績,如果有多門學科,則每門學科為一個檔案。要求在輸出中每行有兩個間隔的資料,其中,第一個代表學生的姓名第二個代表其平均成績

    樣本輸入

    1)math:

張三    88

李四    99

王五    66

趙六    77

    2)china:

張三    78

李四    89

王五    96

趙六    67

    3)english:

張三    80

李四    82

王五    84

趙六    86

    樣本輸出

張三    82

李四    90

王五    82

趙六    76

3.2 設計思路

    計算學生平均成績是一個仿"WordCount"例子,用來重溫一下開發MapReduce程式的流程。程式包括兩部分的內容:Map部分和Reduce部分,分別實現了map和reduce的功能。

    Map處理的是一個純文字檔案,檔案中存放的資料時每一行表示一個學生的姓名和他相應一科成績。Mapper處理的資料是由InputFormat分解過的資料集,其中InputFormat的作用是將資料集切割成小資料集InputSplit,每一個InputSlit將由一個Mapper負責處理。此外,InputFormat中還提供了一個RecordReader的實現,並將一個InputSplit解析成<key,value>對提供給了map函式。InputFormat的預設值是TextInputFormat,它針對文字檔案,按行將文字切割成InputSlit,並用LineRecordReader將InputSplit解析成<key,value>對,key是行在文字中的位置,value是檔案中的一行。

    Map的結果會通過partion分發到Reducer,Reducer做完Reduce操作後,將通過以格式OutputFormat輸出。

    Mapper最終處理的結果對<key,value>,會送到Reducer中進行合併,合併的時候,有相同key的鍵/值對則送到同一個Reducer上。Reducer是所有使用者定製Reducer類地基礎,它的輸入是key和這個key對應的所有value的一個迭代器,同時還有Reducer的上下文。Reduce的結果由Reducer.Context的write方法輸出到檔案中。

3.3 程式程式碼

    程式程式碼如下所示:

package com.hebut.mr;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Score {

    public static class Map extends

            Mapper<LongWritable, Text, Text, IntWritable> {

        // 實現map函式

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            // 將輸入的純文字檔案的資料轉化成String

            String line = value.toString();

            // 將輸入的資料首先按行進行分割

            StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");

            // 分別對每一行進行處理

            while (tokenizerArticle.hasMoreElements()) {

                // 每行按空格劃分

                StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());

                String strName = tokenizerLine.nextToken();// 學生姓名部分

                String strScore = tokenizerLine.nextToken();// 成績部分

                Text name = new Text(strName);

                int scoreInt = Integer.parseInt(strScore);

                // 輸出姓名和成績

                context.write(name, new IntWritable(scoreInt));

            }

        }

    }

    public static class Reduce extends

            Reducer<Text, IntWritable, Text, IntWritable> {

        // 實現reduce函式

        public void reduce(Text key, Iterable<IntWritable> values,

                Context context) throws IOException, InterruptedException {

            int sum = 0;

            int count = 0;

            Iterator<IntWritable> iterator = values.iterator();

            while (iterator.hasNext()) {

                sum += iterator.next().get();// 計算總分

                count++;// 統計總的科目數

            }

            int average = (int) sum / count;// 計算平均成績

            context.write(key, new IntWritable(average));

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

        String[] ioArgs = new String[] { "score_in", "score_out" };

        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

        if (otherArgs.length != 2) {

            System.err.println("Usage: Score Average <in> <out>");

            System.exit(2);

        }

        Job job = new Job(conf, "Score Average");

        job.setJarByClass(Score.class);

        // 設定MapCombineReduce處理類

        job.setMapperClass(Map.class);

        job.setCombinerClass(Reduce.class);

        job.setReducerClass(Reduce.class);

        // 設定輸出型別

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        // 將輸入的資料集分割成小資料塊splites,提供一個RecordReder的實現

        job.setInputFormatClass(TextInputFormat.class);

        // 提供一個RecordWriter的實現,負責資料輸出

        job.setOutputFormatClass(TextOutputFormat.class

相關推薦

mapreduce程式碼示例(借鑑)

1、資料去重    "資料去重"主要是為了掌握和利用並行化思想來對資料進行有意義的篩選。統計大資料集上的資料種類個數、從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及資料去重。下面就進入這個例項的MapReduce程式設計。 1.1 例項描述   對資料檔案中的資料進行去重。資料檔案中的每行都是一

hadoop mapreduce讀取orcfile的java程式碼示例

orcfile在hive 0.11版本後提供支援,orcfile相比rcfile具有更高的資料壓縮比,在不使用任何壓縮演算法,僅僅使用orcfile儲存格式,資料量大小就能縮小一半以上。 下面以hive 0.13版本為例,列舉了mapreduce讀取orcfile的java

MapReduce序列化及分割槽的java程式碼示例

需求 統計每一個使用者(手機號)所耗費的總上行流量、下行流量,總流量,將統計結果按照總流量倒序排序   hadoop jar wordcount.jar cn.itcast.bigdata.mr.flowsum.FlowCount /wordcount/input /wo

hadoop MapReduce java示例

method prope import .lib mapper key maven sna artifact wordcount工作流程input-> 拆分Split->映射map->派發Shuffle->縮減reduce->outputhad

form表單提交資料的同時上傳檔案程式碼示例

                              form表單提交資料的同時在表單中上傳檔案程式碼示例    一

C語言解決螺旋矩陣演算法問題的程式碼示例_C 語言

趕集網校招就採用了螺旋輸出矩陣作為程式題,要求將矩陣螺旋輸出如: 2016425180442470.jpg 圖中6*6矩陣線條所示為輸出順序,如果輸出正確的話應該輸出1~36有序數字。我想的是這麼做的: #include <stdio.h> //#define LEN 1 //#define

Struts2(Interceptor篇):攔截器的實現原理以及程式碼示例

目錄 Interceptor 簡介 理解 Interceptor 概念 理解 Interceptor 原理 建立 Interceptor 監聽器 在pom.xml加入相關依賴 自定義 Interceptor 自定義一個實現了Interceptor介面的類,或者繼承抽象

再看go的interface程式碼示例

       程式碼: package main import "fmt" type Base interface { Input() int } type Dog struct { } func (p Dog) Input() int

簡單工廠模式的go程式碼示例

       簡單工廠模式很簡單,工廠負責生產物件, 來看下: package main import ( "fmt" ) type BaseIntf interface { Operate(int, int) int } t

02_RxJava轉換操作符程式碼示例

package com.gdc.rxjava; import java.util.List; import rx.Observable; import rx.Observer; import rx.Subscriber; import rx.functions.Func1; import r

03_RxJava 過濾型操作符(Filtering)程式碼示例

package com.gdc.rxjava; import java.util.concurrent.TimeUnit; import rx.Observable; import rx.Observable.OnSubscribe; import rx.functions.Func1; i

微信支付統一下單的坑跟回撥地址程式碼示例

 這裡我是把微信裡的回撥dome 拿出來重寫了一下,  在配置回撥地址的時候,要確定你的回撥連結地址一定要能夠訪問,  裡面註釋的比較多- -是我自己測試用的 也可以拿來作為參考,  配合上面一篇我寫的統一下單dome文章作為 結合來做,

python網路爬蟲(web spider)系統化整理總結(二):爬蟲python程式碼示例(兩種響應格式:json和html)

        上一篇部落格(入門知識篇),對爬蟲有了一個基本的瞭解,但是具體怎麼實現一個爬蟲程式呢?         一般情況下,我們在瀏覽器獲取資訊,是

使用Python處理Excel檔案的一些程式碼示例

筆記:使用Python處理Excel檔案的一些程式碼示例,以下程式碼來自於《Python資料分析基礎》一書,有刪改 #!/usr/bin/env python3 # 匯入讀取Excel檔案的庫,xlrd,其中的 open_workbook 為讀取工作簿 from xlrd import open_wo

go mysql事務程式碼示例

        在mysql中,可用begin, commit/rollback命令來操作,下面來看go程式碼操作: package main import ( "fmt" "database/sql" _ "github.com/

JWTs結合SpringCloud使用程式碼示例

文章目錄 什麼是JWT 在什麼時候使用JWTs JWTs結合SpringCloud使用 首先,需要建立一個獨立的gate服務 後臺服務工程新增過濾器 總結 參考

重溫C#委託,匿名方法,Lambda,泛型委託,表示式樹程式碼示例

帶你重溫C#委託,匿名方法,Lambda,泛型委託,表示式樹程式碼示例:     這些對老一代的程式設計師都是老生常談的東西,沒什麼新意,對新生代的程式設計師卻充滿著魅力。曾經新生代,好多都經過漫長的學習,理解,實踐才能掌握委託,表示式樹這些應用。今天我嘗試用簡單的方法敘述一下,讓

卷積神經網路CNN與基於MNIST的Python程式碼示例

卷積神經網路入門學(1) 原文地址:http://blog.csdn.net/hjimce/article/details/47323463 作者:hjimce 卷積神經網路演算法是n年前就有的演算法,只是近年來因為深度學習相關演算法為多層網路的訓練提供了新方法,然後現在

小程式如何實現多圖上傳、圖片預覽效果?(程式碼示例

wxml程式碼: <view class="weui-uploader__hd"> <view class="weui-uploader__title">點選可預覽選好的圖片</view> <view class="weui-

微信小程式如何實現下拉框效果?(程式碼示例

wxml程式碼: <view class='top-text'> 選擇接收班級</view> <!-- 下拉框 --> <view class='top-selected' bindtap='bindShowMsg'> <