1. 程式人生 > >MapReduce案例

MapReduce案例

合成 集合 ont jar 並行化 edi delet oss top

1. 倒排索引
倒排索引是文檔檢索系統中最常用的數據結構,被廣泛地應用於全文搜索引擎。 它主要是用來存儲某個單詞(或詞組) 在一個文檔或一組文檔中的存儲位置的映射,即提供了一種根據內容來查找文檔的方式。由於不是根據文檔來確定文檔所包含的內容,而是進行相反的操作,因而稱為倒排索引( Inverted Index)。
1.1. 實例描述
通常情況下,倒排索引由一個單詞(或詞組)以及相關的文檔列表組成,文檔列表中的文檔或者是標識文檔的ID號,或者是指文檔所在位置的 URL。如下圖所示:
技術分享圖片
從上圖可以看出,單詞1出現在{文檔 1,文檔 4,文檔 13, ……}中,單詞2出現在{文檔 3,文檔 5,文檔 15, ……}中,而單詞3出現在{文檔 1,文檔 8,文檔 20, ……}中。在實際應用中,還需要給每個文檔添加一個權值,用來指出每個文檔與搜索內容的相關度,如下圖所示:
技術分享圖片
最常用的是使用詞頻作為權重,即記錄單詞在文檔中出現的次數。以英文為例,如下圖所示,索引文件中的“ MapReduce”一行表示:“ MapReduce”這個單詞在文本 T0 中 出現過 1 次,××× 中出現過 1 次,T2 中出現過 2 次。當搜索條件為“ MapReduce”、“ is”、“ Simple” 時,對應的集合為: {T0, ×××, T2}∩{T0, ×××}∩{T0, ×××}={T0, ×××},即文檔 T0 和 ××× 包 含了所要索引的單詞,而且只有 T0 是連續的。
技術分享圖片
1.2. 設計思路
1)Map過程
首先使用默認的 TextInputFormat 類對輸入文件進行處理,得到文本中每行的偏移量及其內容。顯然, Map過程首先必須分析輸入的<key,value>對,得到倒排索引中需要的三個信息:單詞、文檔 URL 和詞頻,如下圖所示。
技術分享圖片
這裏存在兩個問題:第一, <key,value>對只能有兩個值,在不使用 Hadoop 自定義數據類型的情況下,需要根據情況將其中兩個值合並成一個值,作為 key 或 value 值;
第二,通過一個 Reduce 過程無法同時完成詞頻統計和生成文檔列表,所以必須增加一個 Combine 過程完成詞頻統計。
這裏將單詞和 URL 組成 key 值(如“ MapReduce: file1.txt”),將詞頻作為 value,這樣做的好處是可以利用 MapReduce 框架自帶的 Map 端排序,將同一文檔的相同單詞的詞頻組成列表,傳遞給 Combine 過程,實現類似於 WordCount 的功能。
2)Combine過程
經過map方法處理後, Combine過程將key值相同value值累加,得到一個單詞在文檔中的詞頻。 如果直接將圖所示的輸出作為 Reduce 過程的輸入,在Shuffle過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、 URL 和詞頻組成)應該交由同一個 Reducer 處理,但當前的 key 值無法保證這一點,所以必須修改 key值和value值。這次將單詞作為 key 值, URL 和詞頻組成 value值(如“ file1.txt: 1”)。這樣做的好處是可以利用 MapReduce框架默認的HashPartitioner 類完成 Shuffle過程,將相同單詞的所有記錄發送給同一個 Reducer 進行處理。
技術分享圖片
3)Reduce過程
經過上述兩個過程後, Reduce 過程只需將相同 key 值的 value 值組合成倒排索引文件所需的格式即可,剩下的事情就可以直接交給 MapReduce 框架進行處理了。
技術分享圖片
1.3. 程序代碼
InvertedIndexMapper:

public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {

private static Text keyInfo = new Text();// 存儲單詞和 URL 組合  

private static final Text valueInfo = new Text("1");// 存儲詞頻,初始化為1 

@Override  

protected void map(LongWritable key, Text value, Context context) 

        throws IOException, InterruptedException { 

    String line = value.toString();  

    String[] fields = StringUtils.split(line, " ");// 得到字段數組  

    FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到這行數據所在的文件切片  

    String fileName = fileSplit.getPath().getName();// 根據文件切片得到文件名  

    for (String field : fields) { 

        // key值由單詞和URL組成,如“MapReduce:file1”  

        keyInfo.set(field + ":" + fileName);  

        context.write(keyInfo, valueInfo);  

    } 

} 

}
InvertedIndexCombiner:

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {

private static Text info = new Text();  

// 輸入: <MapReduce:file3 {1,1,...}> 

// 輸出:<MapReduce file3:2> 

@Override  

protected void reduce(Text key, Iterable<Text> values, Context context) 

        throws IOException, InterruptedException { 

    int sum = 0;// 統計詞頻  

    for (Text value : values) { 

        sum += Integer.parseInt(value.toString());  

    } 

    int splitIndex = key.toString().indexOf(":");  

    // 重新設置 value 值由 URL 和詞頻組成  

    info.set(key.toString().substring(splitIndex + 1) + ":" + sum);  

    // 重新設置 key 值為單詞  

    key.set(key.toString().substring(0, splitIndex));  

    context.write(key, info);  

} 

}
InvertedIndexReducer:

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {

private static Text result = new Text();  

// 輸入:<MapReduce file3:2> 

// 輸出:<MapReduce file1:1;file2:1;file3:2;> 

@Override  

protected void reduce(Text key, Iterable<Text> values, Context context) 

        throws IOException, InterruptedException { 

    // 生成文檔列表  

    String fileList = new String();  

    for (Text value : values) { 

        fileList += value.toString() + ";";  

    } 

    result.set(fileList);  

    context.write(key, result);  

} 

}
InvertedIndexRunner:

public class InvertedIndexRunner {

public static void main(String[] args) throws IOException, 

        ClassNotFoundException, InterruptedException { 

    Configuration conf = new Configuration();  

    Job job = Job.getInstance(conf);  

    job.setJarByClass(InvertedIndexRunner.class);  

    job.setMapperClass(InvertedIndexMapper.class);  

    job.setCombinerClass(InvertedIndexCombiner.class);  

    job.setReducerClass(InvertedIndexReducer.class);  

    job.setOutputKeyClass(Text.class);  

    job.setOutputValueClass(Text.class);  

    FileInputFormat.setInputPaths(job, new Path(args[0]));  

    // 檢查參數所指定的輸出路徑是否存在,若存在,先刪除  

    Path output = new Path(args[1]);  

    FileSystem fs = FileSystem.get(conf);  

    if (fs.exists(output)) { 

        fs.delete(output, true);  

    } 

    FileOutputFormat.setOutputPath(job, output);  

    System.exit(job.waitForCompletion(true) ? 0 : 1);  

} 

}

2. 數據去重
數據去重主要是為了掌握和利用並行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及數據去重。
2.1. 實例描述
對數據文件中的數據進行去重。數據文件中的每行都是一個數據。比如原始輸入數據為:
File1:
2017-3-1 a
2017-3-2 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-7 c
2017-3-3 c
File2:
2017-3-1 b
2017-3-2 a
2017-3-3 b
2017-3-4 d
2017-3-5 a
2017-3-6 c
2017-3-7 d
2017-3-3 c
輸出結果為:
2017-3-1 a
2017-3-1 b
2017-3-2 a
2017-3-2 b
2017-3-3 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-6 c
2017-3-7 c
2017-3-7 d
2.2. 設計思路
數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。我們自然而然會想到將同一個數據的所有記錄都交給一臺 reduce機器,無論這個數據出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce 的輸入應該以數據作為 key,而對 value-list 則沒有要求。當 reduce 接收到一個<key, value-list>時就直接將 key復制到輸出的 key 中,並將 value 設置成空值。
在 MapReduce 流程中, map的輸出<key,value>經過 shuffle 過程聚集成<key, value-list>後會交給 reduce。所以從設計好的 reduce 輸入可以反推出 map 的輸出 key 應為數據, value任意。繼續反推, map 輸出數據的 key 為數據,而在這個實例中每個數據代表輸入文件中的一行內容,所以 map 階段要完成的任務就是在采用 Hadoop 默認的作業輸入方式之後,將value 設置為 key,並直接輸出(輸出中的 value 任意)。 map 中的結果經過 shuffle 過程之後交給 reduce。 reduce 階段不會管每個 key 有多少個 value,它直接將輸入的 key 復制為輸出的 key,並輸出就可以了(輸出中的 value 被設置成空了)。
2.3. 程序代碼
Mapper:

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

private static Text field = new Text();  

@Override  

protected void map(LongWritable key, Text value, Context context) 

        throws IOException, InterruptedException { 

    field = value;  

    context.write(field, NullWritable.get());  

} 

[align=left]}
Reducer:

public class DedupReducer extends

    Reducer<Text, NullWritable, Text, NullWritable> { 

@Override  

protected void reduce(Text key, Iterable<NullWritable> values, 

        Context context) throws IOException, InterruptedException { 

    context.write(key, NullWritable.get());  

} 

}
Runner:

public class DedupRunner {

public static void main(String[] args) throws IOException, 

        ClassNotFoundException, InterruptedException { 

    Configuration conf = new Configuration();  

    Job job = Job.getInstance(conf);  

    job.setJarByClass(DedupRunner.class);  

    job.setMapperClass(DedupMapper.class);  

    job.setReducerClass(DedupReducer.class);  

    job.setOutputKeyClass(Text.class);  

    job.setOutputValueClass(NullWritable.class);  

    FileInputFormat.setInputPaths(job, new Path(args[0]));  

    FileOutputFormat.setOutputPath(job, new Path(args[1]));  

    job.waitForCompletion(true);  

} 

[align=left]}
3. Top N
Top-N分析法是指從研究對象中得到所需的N個數據,並對這N個數據進行重點分析的方法。那麽該如何利用MapReduce來解決在海量數據中求Top N個數。
3.1. 實例描述
對數據文件中的數據取最大top-n。數據文件中的每個都是一個數據。
原始輸入數據為:
10 3 8 7 6 5 1 2 9 4
11 12 17 14 15 20
19 18 13 16
輸出結果為(最大的前5個):
20
19
18
17
16
3.2. 設計思路
要找出top N, 核心是能夠想到reduce Task個數一定只有一個。
因為一個map task就是一個進程,有幾個map task就有幾個中間文件,有幾個reduce task就有幾個最終輸出文件。我們要找的top N 是指的全局的前N條數據,那麽不管中間有幾個map, reduce最終只能有一個reduce來匯總數據,輸出top N。
Mapper過程
使用默認的mapper數據,一個input split(輸入分片)由一個mapper來處理。
在每一個map task中,我們找到這個input split的前n個記錄。這裏我們用TreeMap這個數據結構來保存top n的數據,TreeMap默認按鍵的自然順序升序進行排序。下一步,我們來加入新記錄到TreeMap中去。在map中,我們對每一條記錄都嘗試去更新TreeMap,最後我們得到的就是這個分片中的local top n的n個值。
以往的mapper中,我們都是處理一條數據之後就context.write一次。而在這裏是把所有這個input split的數據處理完之後再進行寫入。所以,我們可以把這個context.write放在cleanup裏執行。cleanup就是整個mapper task執行完之後會執行的一個函數。
TreeMap 是一個有序的key-value集合,默認會根據其鍵的自然順序進行排序,也可根據創建映射時提供的 Comparator 進行排序。其firstKey()方法用於返回當前這個集合第一個(最低)鍵。
Reducer過程
只有一個reducer,就是對mapper輸出的數據進行再一次匯總,選出其中的top n,即可達到我們的目的。註意的是,Treemap默認是正序排列數據,要想滿足求取top n倒序最大的n個,需要實現自己的Comparator()方法。
3.3. 程序代碼
TopNMapper:

private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();

@Override

public void map(LongWritable key, Text value, Context context) {

    String line = value.toString();

    String[] nums = line.split(" ");

    for (String num : nums) {

        repToRecordMap.put(Integer.parseInt(num), " ");

        if (repToRecordMap.size() > 5) {

            repToRecordMap.remove(repToRecordMap.firstKey());

        }

    }

@Override

protected void cleanup(Context context) {

    for (Integer i : repToRecordMap.keySet()) {

        try {

            context.write(NullWritable.get(), new IntWritable(i));

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

TopNReducer:

private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(new Comparator<Integer>() {

    /* 

    * int compare(Object o1, Object o2) 返回一個基本類型的整型,  

    * 返回負數表示:o1 小於o2,  

    * 返回0 表示:o1和o2相等,  

    * 返回正數表示:o1大於o2。  

    * 誰大誰排後面

    */

    public int compare(Integer a, Integer b) {

        return b - a;

    }

});

public void reduce(NullWritable key, Iterable<IntWritable> values, Context context)

        throws IOException, InterruptedException {

    for (IntWritable value : values) {

        repToRecordMap.put(value.get(), " ");

        if (repToRecordMap.size() > 5) {

            repToRecordMap.remove(repToRecordMap.firstKey());

        }

    }

    for (Integer i : repToRecordMap.keySet()) {

        context.write(NullWritable.get(), new IntWritable(i));

    }

}

TopNRunner:

Configuration conf = new Configuration();

    Job job = Job.getInstance(conf);

    job.setJarByClass(TopNRunner.class);

    job.setMapperClass(TopNMapper.class);

    job.setReducerClass(TopNReducer.class);

    job.setNumReduceTasks(1);

    job.setMapOutputKeyClass(NullWritable.class);// map階段的輸出的key

    job.setMapOutputValueClass(IntWritable.class);// map階段的輸出的value

    job.setOutputKeyClass(NullWritable.class);// reduce階段的輸出的key

    job.setOutputValueClass(IntWritable.class);// reduce階段的輸出的value

    FileInputFormat.setInputPaths(job, new Path("D:\\topN\\input"));

    FileOutputFormat.setOutputPath(job, new Path("D:\\topN\\output"));

    boolean res = job.waitForCompletion(true);

    System.exit(res ? 0 : 1);

MapReduce案例