1. 程式人生 > 實用技巧 >MapReduce程式設計例項(轉)

MapReduce程式設計例項(轉)

備註(轉自)https://www.jianshu.com/p/1e4c976f5ecc

動手實戰學習MapReduce程式設計例項

前言

這裡放一個我學習MapReduce的程式設計例項專案吧,本來是想把這些分開寫成多篇文章的,能夠詳細敘述我學習過程中感想。但無奈,時間不夠,只好在Github上建立了該專案,在程式碼中由較為詳細的註釋,我想也足夠了吧。
josonle/MapReduce-Demo
該專案有些題目是參考了網上幾篇部落格,但程式碼實現是本人實現的。其次,所謂的MapReduce學習流程是參照老師上課所講的PPT上的流程【某985大資料課程PPT】,我想老師以這樣的流程授課肯定是有道理的。專案中也放了老師提供的幾個參考Demo檔案。


目錄(目錄不可用,見諒。專案中也付了這篇文件)

MapReduce程式設計例項

1.自定義物件序列化

需求分析

需要統計手機使用者流量日誌,日誌內容例項:

flowdata.log

要把同一個使用者的上行流量、下行流量進行累加,並計算出綜合 。例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:13897230503,500,1600,2100

報錯:Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.17.10:9000/workspace/flowStatistics/output, expected: file:///

解決:1、將core-site.xml 和hdfs-site.xml拷貝到專案裡去就可以,原因是訪問遠端的HDFS 需要通過URI來獲得FileSystem
    2、在專案中,Configuration物件設定fs.defaultFS 【推薦這個,**大小寫別拼錯,我就是拼錯了找了半天**】

        String namenode_ip = "192.168.17.10";
        String hdfs = "hdfs://"+namenode_ip+":9000";
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfs);

解答

一、正常處理即可,不過在處理500 1400 這種時靈活變通一下即可
public static class FlowMapper extends Mapper<Object, Text, Text, Text>{
        
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\t");
            Text phone = new Text(strs[0]);
            Text flow = new Text(strs[1]+"\t"+strs[2]);
            context.write(phone, flow);
        }
    }
    
    public static class FlowReducer extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
            int upFlow = 0;
            int downFlow = 0;
            
            for (Text value : values) {
                String[] strs = value.toString().split("\t");
                upFlow += Integer.parseInt(strs[0].toString());
                downFlow += Integer.parseInt(strs[1].toString());
            }
            int sumFlow = upFlow+downFlow;
            
            context.write(key,new Text(upFlow+"\t"+downFlow+"\t"+sumFlow));
        }
    }

二、自定義一個實現Writable介面的可序列化的物件Flow,包含資料形式如 upFlow downFlow sumFlow
public static class FlowWritableMapper extends Mapper<Object, Text, Text, FlowWritable> {
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\t");
            Text phone = new Text(strs[0]);
            FlowWritable flow = new FlowWritable(Integer.parseInt(strs[1]),Integer.parseInt(strs[2]));
            context.write(phone, flow);
        }
    }
    public static class FlowWritableReducer extends Reducer<Text, FlowWritable, Text, FlowWritable>{
        public void reduce(Text key,Iterable<FlowWritable> values,Context context) throws IOException, InterruptedException {
            int upFlow = 0;
            int downFlow = 0;
            
            for (FlowWritable value : values) {
                upFlow += value.getUpFlow();
                downFlow += value.getDownFlow();
            }
            
            context.write(key,new FlowWritable(upFlow,downFlow));
        }
    }
    
    public static class FlowWritable implements Writable{
        private int upFlow,downFlow,sumFlow;

        public FlowWritable(int upFlow,int downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow+downFlow;
        }
        
        public int getDownFlow() {
            return downFlow;
        }

        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }

        public int getUpFlow() {
            return upFlow;
        }

        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }

        public int getSumFlow() {
            return sumFlow;
        }

        public void setSumFlow(int sumFlow) {
            this.sumFlow = sumFlow;
        }
        // writer和readFields方法務必實現,序列化資料的關鍵
        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(upFlow);
            out.writeInt(downFlow);
            out.writeInt(sumFlow);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            upFlow = in.readInt();
            downFlow = in.readInt();
            sumFlow = in.readInt();
        }

        @Override
        public String toString() {
            // TODO Auto-generated method stub
            return upFlow+"\t"+downFlow+"\t"+sumFlow;
        }
    }

注意: 要根據具體情況在job中設定Mapper、Reducer類及輸出的key、value型別
具體見程式碼

2.資料去重

需求分析

需求很簡單,就是把檔案中重複資料去掉。比如說統計類似如下檔案中不包含重複日期資料的日期

2017-02-14 1
2016-02-01 2
2017-07-10 3
2016-02-26 4
2015-01-19 5
2016-04-29 6
2016-05-10 7
2015-11-20 8
2017-05-23 9
2014-02-26 10

解答思路

只要搞清楚了MR的流程這個就很簡單,reducer的輸入類似<key3,[v1,v2,v3...]>,這個地方輸入的key3是沒有重複值的。所以利用這一點,Mapper輸出的key儲存日期資料,value置為空即可 【這裡可以使用NullWritable型別】

還有就是,不一定是日期去重,去重一行資料也是如此,key儲存這一行資料即可

public static class DateDistinctMapper extends Mapper<Object, Text, Text, NullWritable> {       
        public void map(Object key, Text value, Context context ) 
                throws IOException, InterruptedException {
            String[] strs = value.toString().split(" ");
            Text date = new Text(strs[0]);//取到日期作為key
            context.write(date, NullWritable.get());
        }
    }
  
public static class DateDistinctReducer extends Reducer<Text,NullWritable,Text,NullWritable>{
    
        public void reduce(Text key, Iterable<NullWritable> values, Context context) 
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

3.資料排序、二次排序

需求分析

這一類問題很多,像學生按成績排序,手機使用者流量按上行流量升序,下行流量降序排序等等

  1. 日期計數升序排序

  2. 日期計數降序排序

    //日期 日期出現的次數
    2015-01-27   7
    2015-01-28   3
    2015-01-29   7
    2015-01-30   6
    2015-01-31   7
    2015-02-01   15
    2015-02-02   10
    2015-02-03   9
    2015-02-04   12
    2015-02-05   14
    
  1. 手機使用者流量按上行流量升序,下行流量降序排序

解答思路

MapReduce是預設會對key進行升序排序的,可以利用這一點實現某些排序

  • 單列排序
    • 升序還是降序排序
    • 可以利用Shuffle預設對key排序的規則;
    • 自定義繼承WritableComparator的排序類,實現compare方法
  • 二次排序
    • 實現可序列化的比較類WritableComparable<T>,並實現compareTo方法(同樣可指定升序降序)
日期按計數升序排序
public static class SortMapper extends Mapper<Object, Text, IntWritable, Text> {
        private IntWritable num = new IntWritable();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\t");
            num.set(Integer.parseInt(strs[1]));
            // 將次數作為key進行升序排序
            context.write(num, new Text(strs[0]));
            System.out.println(num.get()+","+strs[0]);
        }
    }

    public static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable> {

        public void reduce(IntWritable key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text value : values) {
                // 排序後再次顛倒k-v,將日期作為key
                System.out.println(value.toString()+":"+key.get());
                context.write(value, key);
            }
        }
    }
日期按計數降序排序

實現自定義的排序比較器,繼承WritableComparator類,並實現其compare方法

public static class MyComparator extends WritableComparator {
        public MyComparator() {
            // TODO Auto-generated constructor stub
            super(IntWritable.class, true);
        }

        @Override
        @SuppressWarnings({ "rawtypes", "unchecked" }) // 不檢查型別
        public int compare(WritableComparable a, WritableComparable b) {
            // CompareTo方法,返回值為1則降序,-1則升序
            // 預設是a.compareTo(b),a比b小返回-1,現在反過來返回1,就變成了降序
            return b.compareTo(a);
    }

所使用的Mapper、Reducer同上面升序排序的,其次,要在main函式中指定自定義的排序比較器

job.setSortComparatorClass(MyComparator.class);

手機使用者流量按上行流量升序,下行流量降序排序

同第一個例項類似,要自定義物件序列化,同時也要可比較,實現WritableComparable介面,並實現CompareTo方法

我這裡是將之前統計好的使用者流量資料作為輸入資料

public static class MySortKey implements WritableComparable<MySortKey> {
        private int upFlow;
        private int downFlow;
        private int sumFlow;

        public void FlowSort(int up, int down) {
            upFlow = up;
            downFlow = down;
            sumFlow = up + down;
        }

        public int getUpFlow() {
            return upFlow;
        }
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
        public int getDownFlow() {
            return downFlow;
        }
        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }
        public int getSumFlow() {
            return sumFlow;
        }
        public void setSumFlow(int sumFlow) {
            this.sumFlow = sumFlow;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(upFlow);
            out.writeInt(downFlow);
            out.writeInt(sumFlow);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            upFlow = in.readInt();
            downFlow = in.readInt();
            sumFlow = in.readInt();
        }

        @Override
        public int compareTo(MySortKey o) {
            if ((this.upFlow - o.upFlow) == 0) {// 上行流量相等,比較下行流量
                return o.downFlow - this.downFlow;// 按downFlow降序排序
            } else {
                return this.upFlow - o.upFlow;// 按upFlow升序排
            }
        }

        @Override
        public String toString() {
            // TODO Auto-generated method stub
            return upFlow + "\t" + downFlow + "\t" + sumFlow;
        }
    }

    public static class SortMapper extends Mapper<Object, Text, MySortKey, Text> {
        Text phone = new Text();
        MySortKey mySortKey = new MySortKey();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] lists = value.toString().split("\t");
            phone.set(lists[0]);
            mySortKey.setUpFlow(Integer.parseInt(lists[1]));
            mySortKey.setDownFlow(Integer.parseInt(lists[2]));
            context.write(mySortKey, phone);// 調換手機號和流量計數,後者作為排序鍵
        }
    }

    public static class SortReducer extends Reducer<MySortKey, Text, Text, MySortKey> {
        public void reduce(MySortKey key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text value : values) {
                System.out.println(value.toString()+","+key.toString());
                context.write(value, key);// 再次把手機號和流量計數調換
            }
        }
    }

4.自定義分割槽

需求分析

還是以上個例子的手機使用者流量日誌為例,在上個例子的統計需要基礎上新增一個新需求:按省份統計,不同省份的手機號放到不同的檔案裡。

例如137表示屬於河北,138屬於河南,那麼在結果輸出時,他們分別在不同的檔案中。

解答思路

挺簡單的,看過我之前結合原始碼解讀MapReduce過程的話,就知道這其實就是一個分割槽的問題。定義自己的分割槽規則,一個分割槽會對應一個reduce,會輸出到一個檔案。

而你需要做的就是基礎partitioner類,並實現getPartition方法,其餘過程同第一個例子

// 自定義分割槽類
public static class PhoneNumberPartitioner extends Partitioner<Text, FlowWritable> {
        private static HashMap<String, Integer> numberDict = new HashMap<>();
        static {
            numberDict.put("133", 0);
            numberDict.put("135", 1);
            numberDict.put("137", 2);
            numberDict.put("138", 3);
        }

        @Override
        public int getPartition(Text key, FlowWritable value, int numPartitions) {
            String num = key.toString().substring(0, 3);
            // 藉助HashMap返回不同手機段對應的分割槽號
            // 也可以直接通過if判斷,如
            // 根據年份對資料進行分割槽,返回不同分割槽號
            // if (key.toString().startsWith("133")) return 0 % numPartitions;
            return numberDict.getOrDefault(num, 4);
        }
    }

注意: main函式中要指定自定義分割槽類,以及Reducer task數量(一個分割槽對應一個reduce任務,一個Reduce任務對應一個輸出檔案)

// 設定分割槽類,及Reducer數目
job.setPartitionerClass(PhoneNumberPartitioner.class);
job.setNumReduceTasks(4);

[圖片上傳失敗...(image-591982-1544427279895)]

增加ReduceTask數量可看到生成的檔案數也增加了,不過檔案內容為空

5.計算出每組訂單中金額最大的記錄

需求分析

有如下訂單資料:

img

需要求出每一個訂單中成交金額最大的一筆交易。

思路解答

實際上是求最大值、最小值的問題,一拿到題,大概會冒出兩種思路吧

  1. 先排序(升序),Reduce端取第一條就是最小值,最後一條是最大值
  2. 不排序,在Reduce端不斷迴圈作比較,也可以求得最值

但問題還涉及到每一個訂單中的最大值,這就是分組的問題。比如說這裡,同一訂單號視為一組,在一組中找最大

先定義一個可序列化且可比較的物件Pair,用來存order_id,amount(只涉及這兩個變數)。Mapper端輸出類似

Key2Value2
{order_0000001,222.8} null
{order_0000001,25.8} null
{order_0000002,522.8} null
{order_0000002,122.4} null
{order_0000003,222.8} null

通過Pair中的order_id分組,因為Pair又是可比較,設定同一組按照amount降序排序。然後在Reduce端取第一個key-value對即可
Reduce端輸入k-v類似下表:

Key3Value3
{order_0000001,[222.8,25.8]} null
{order_0000002,[522.8,122.4]} null
{order_0000003,[222.8]} null

以上是排序思路,因為這裡比較簡單,直接在reduce端進行比較求最值更方便 【你可以自己試一下】


// 定義Pair物件
    public static class Pair implements WritableComparable<Pair> {
        private String order_id;
        private DoubleWritable amount;

        public Pair() {
            // TODO Auto-generated constructor stub
        }

        public Pair(String id, DoubleWritable amount) {
            this.order_id = id;
            this.amount = amount;
        }

        // 省略一些內容,你可以直接去檔案中看

        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeUTF(order_id);
            out.writeDouble(amount.get());
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            order_id = in.readUTF();
            amount = new DoubleWritable(in.readDouble());
        }

        @Override
        public int compareTo(Pair o) {
            if (order_id.equals(o.order_id)) {// 同一order_id,按照amount降序排序
                return o.amount.compareTo(amount);
            } else {
                return order_id.compareTo(o.order_id);
            }
        }

    }
// 是分組不是分割槽,分組是組內定義一些規則由reduce去處理,分割槽是由多個Reduce處理,寫到不同檔案中
// 自定義分組類
    public static class GroupComparator extends WritableComparator {
        public GroupComparator() {
            // TODO Auto-generated constructor stub
            super(Pair.class, true);
        }
        // Mapper端會對Pair排序,之後分組的規則是對Pair中的order_id比較
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // TODO Auto-generated method stub
            Pair oa = (Pair) a;
            Pair ob = (Pair) b;
            return oa.getOrder_id().compareTo(ob.getOrder_id());
        }
    }
// Mapper類
    public static class MyMapper extends Mapper<Object, Text, Pair, NullWritable> {
        Pair pair = new Pair();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split(" ");
            pair.setOrder_id(strs[0]);
            pair.setAmount(new DoubleWritable(Double.parseDouble(strs[2])));
            context.write(pair, NullWritable.get());// 道理同上,以Pair作為key
            System.out.println(pair.getOrder_id()+","+pair.getAmount());
        }
    }
    
// Reducer類
    public static class MyReducer extends Reducer<Pair, NullWritable, Text, DoubleWritable> {
        public void reduce(Pair key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            context.write(new Text(key.getOrder_id()), key.getAmount());// 已經排好序的,取第一個即可
            System.out.println(key.order_id+": "+key.amount.get());
        }
    }

注意: main函式中要另外設定自定義的分組類 job.setGroupingComparatorClass(GroupComparator.class);

多檔案輸入輸出、及不同輸入輸出格式化型別

6.合併多個小檔案(多檔案輸入輸出、及不同輸入輸出格式化型別)

需求分析

要計算的目標檔案中有大量的小檔案,會造成分配任務和資源的開銷比實際的計算開銷還大,這就產生了效率損耗。

需要先把一些小檔案合併成一個大檔案。

解答思路

簡單模型.jpg

如圖,MapReduce有一種簡單模型,僅僅只有Mapper。我想初學者都可能遇到過吧,當Mapper輸出k-v型別同Reducer輸入k-v不同型別時,Reducer不會執行。

其次,是輸入和輸出資料如何格式化?

輸出很簡單,因為最後是合併成一個檔案,直接以SequenceFileOutputFormat格式化類寫入即可

SequenceFileOutputFormat 的輸出是一個二進位制順
序檔案

輸入要自定義格式化類,具體過程可以參考我之前寫過的一篇文章:【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程 ,本來是需要實現InputFormat介面的getSplitscreateRecordReader方法,前者是邏輯上獲取切片,後者是將分片轉化為鍵值對形式。

但是這裡我們是合併小檔案,沒必要切片,直接將檔案物件視為一個分片,鍵值對以檔名為key,檔案物件為value。這裡自定義MyInputFormat類繼承自InputFormat的實現類FileInputFormat類

public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
    
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        // TODO 因為是合併小檔案,設定檔案不可分割,k-v的v就是檔案物件
        // 設定不可分,會跳過getSplits方法中切分邏輯
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        MyRecordReader myRecordReader = new MyRecordReader();
//      myRecordReader
        return myRecordReader;
    }

}

然後,你在檢視原始碼時能夠發現,createRecordReader方法返回值型別是RecordReader<KEYIN, VALUEIN>,該型別定義瞭如何獲取當前Key-value,如何生成Key-Value的三個核心方法getCurrentKey(),getCurrentValue(),nextKeyValue()

所以你又要定義自己的MyRecordReader類,其繼承的RecordReader類有initialize(初始化RecordReader)、getCurrentKey(),getCurrentValue(),nextKeyValue()close(關閉RecordReader)

具體的程式碼你可以看我原始碼檔案

7.分組輸出到多個檔案【多檔案輸入輸出、及不同輸入輸出格式化型別】

需求分析

img

需要把相同訂單id的記錄放在一個檔案中,並以訂單id命名。

8.join操作

需求分析

有2個數據檔案:訂單資料、商品資訊。【資料檔案:product.txt,order.txt】

訂單資料表order

img

商品資訊表product

img

需要用MapReduce程式來實現下面這個SQL查詢運算:

select o.id order_id, o.date, o.amount, p.id p_id, p.pname, p.category_id, p.price
from t_order o join t_product p on o.pid = p.id

9.計算出使用者間的共同好友

需求分析

下面是使用者的好友關係列表,每一行代表一個使用者和他的好友列表 【資料檔案:friendsdata.txt】

image

需要求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰

例如從前2天記錄中可以看出,C、E是A、B的共同好友,最終的形式如下:

img

MapReduce理論基礎

Hadoop、Spark學習路線及資源收納

MapReduce書籍推薦

  • 《MapReduce Design Patterns》

    image
  • 《MapReduce2.0原始碼分析與程式設計實戰》

    [圖片上傳失敗...(image-a2c861-1544427279895)]

  • 《Hadoop MapReduce v2 Cookbook, 2nd Edition》

image

MapReduce實戰系統學習流程

詞頻統計

資料去重

資料排序

求平均值、中位數、標準差、最大/小值、計數

分組、分割槽

資料輸入輸出格式化

多檔案輸入、輸出

單表關聯

多表關聯

倒排索引

索引(index)作為一種具備各種優勢的資料結構,被大量應用在資料檢索領域

索引的優點

  • 通過對關鍵欄位排序,加快資料的檢索速度
  • 保證每一行資料的唯一性
index.png reverseindex.jpg

需求

對於給定的文件,確定每個單詞存在於某個文件,同時在文件中出現的次數

思路解答

  • Map端對檔案統計每個單詞出現的次數,輸出類似<{hadoop,file1},2>
  • Map端輸出前要先進行Combine過程,最終輸出類似< hadoop, file1:2>
  • Reduce端繼續對相同單詞進行合併,最終輸出類似<hadoop, file1:2 file2:5>

資料檔案

隨便找幾篇英文文件就可以了

TopN

資料檔案類似如下:

t001 2067
t002 2055
t003 109
t004 1200
t005 3368
t006 251
t001 3067
t002 255
t003 19
t004 2000
t005 368
t006 2512

隨便寫的,每一行以空格隔開,查詢後面數之的TopN

思路解答

就是每一個Map Task任務要求其只輸出TopN資料,這裡藉助TreeMap自動排序的特性【 將數字作為排序鍵 】,保證TopN。然後是Reduce中再次求解TopN即可

注意: 在main函式中要設定ReduceTask數量為1,保證最終的TopN

// Mapper中實現的map方法如下
    private TreeMap<Integer, Text> visittimesMap = new TreeMap<Integer, Text>();    //TreeMap是有序KV集合

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        if (value == null) {
            return;
        }
        String[] strs = value.toString().split(" ");
        String tId = strs[0];
        String tVisittimes = strs[1];
        if (tId == null || tVisittimes == null) {
            return;
        }
        visittimesMap.put(Integer.parseInt(tVisittimes), new Text(value));  //將訪問次數(KEY)和行記錄(VALUE)放入TreeMap中自動排序
        if (visittimesMap.size() > 5) { //如果TreeMap中元素超過N個,將第一個(KEY最小的)元素刪除
            visittimesMap.remove(visittimesMap.firstKey());
        }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        for (Text t : visittimesMap.values()) {
            context.write(NullWritable.get(), t);   //在clean()中完成Map輸出
        }
    }

PeopleRank演算法實現

推薦系統——協同過濾演算法實現

資料

見resources資料夾下,其中rand.sh指令碼用於生成隨機日期資料



作者:josonLe
連結:https://www.jianshu.com/p/1e4c976f5ecc
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。