1. 程式人生 > >MapReduce小文件優化與分區

MapReduce小文件優化與分區

get public 流量 計數 exception true boolean 大於 文件

一、小文件優化

1.Mapper類

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 思路?
 * wordcount單詞計數
 * <單詞,1>
 * 
 * 數據傳輸
 * 
 * KEYIN:數據的起始偏移量0~10 11~20 21~30 
 * VALUEIN:數據
 * 
 * KEYOUT:mapper輸出到reduce階段 k的類型
 * VALUEOUT:mapper輸出到reduce階段v的類型
 * <China,1><Beijing,1><love,1>
 
*/ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //key 起始偏移量 value 數據 context 上下文 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.讀取數據 String line = value.toString();
// 2.切割 love Beijing String[] words = line.split(" "); // 3.循環的寫到下一個階段<love,1><Beijing,1> for (String w : words) { // 4.輸出到reducer階段 context.write(new Text(w), new IntWritable(1)); } } }

2.Reducer類

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 統計單詞出現的次數 int sum = 0; // 累加求和 for (IntWritable count : values) { // 拿到值累加 sum += count.get(); } // 結果輸出 context.write(key, new IntWritable(sum)); } }

3.Driver類

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 獲取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 獲取jar包
        job.setJarByClass(WordCountDriver.class);
        // 獲取自定義的mapper與reducer類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 設置map輸出的數據類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 設置reduce輸出的數據類型(最終的數據類型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 指定運行的inputformat方式  默認的方式是textinputformat(小文件優化)
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 最大4M
        CombineTextInputFormat.setMinInputSplitSize(job, 3145728); // 最小3M
        // 設置輸入存在的路徑與處理後的結果路徑
        FileInputFormat.setInputPaths(job, new Path("c:/in1024/"));
        FileOutputFormat.setOutputPath(job, new Path("c:/out1024/"));
        // 提交任務
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

二、分區

1.Mapper類

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育網站    24    27    299    681    200
 * 
 * 13726130503  299    681 980
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.獲取數據
        String line = value.toString();
        
        // 2.切割
        String[] fields = line.split("\t");
        
        // 3.封裝對象 拿到關鍵字段 數據清洗
        String phoneN = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long dfFlow = Long.parseLong(fields[fields.length - 2]);
        
        // 4.輸出到reduce端
        context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
    }
}

2.Reducer類

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        // 1.相同手機號 的流量使用再次匯總
        long upFlow_sum = 0;
        long dfFlow_sum = 0;
        
        // 2.累加
        for (FlowBean f : values) {
            upFlow_sum += f.getUpFlow();
            dfFlow_sum += f.getDfFlow();
        }        
        FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);        
        // 3.輸出
        context.write(key, rs);
    }
}

3.封裝類

package com.css.flow.partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 封裝類 數據的傳輸
 */
public class FlowBean implements Writable{
    // 定義屬性
    private long upFlow;
    private long dfFlow;
    private long flowSum;
    
    public FlowBean() {        
    }
    
    // 流量累加
    public FlowBean(long upFlow, long dfFlow) {
        this.upFlow = upFlow;
        this.dfFlow = dfFlow;
        this.flowSum = upFlow + dfFlow;
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dfFlow = in.readLong();
        flowSum = in.readLong();        
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dfFlow);
        out.writeLong(flowSum);
    }
        
    @Override
    public String toString() {
        return upFlow + "\t" + dfFlow + "\t" + flowSum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDfFlow() {
        return dfFlow;
    }

    public void setDfFlow(long dfFlow) {
        this.dfFlow = dfFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }
}

4.分區類

package com.css.flow.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PhoneNumPartitioner extends Partitioner<Text, FlowBean>{

    // 根據手機號前三位進行分區
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 獲取手機號前三位
        String phoneNum = key.toString().substring(0, 3);
        // 分區
        int partitioner = 4;
        
        if ("135".equals(phoneNum)) {
            return 0;
        }else if ("137".equals(phoneNum)) {
            return 1;
        }else if ("138".equals(phoneNum)) {
            return 2;
        }else if ("139".equals(phoneNum)) {
            return 3;
        }
        return partitioner;
    }
}

5.Driver類

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        // 2.獲取jar包
        job.setJarByClass(FlowCountDriver.class);
        
        // 3.獲取自定義的mapper與reducer類
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        
        // 4.設置map輸出的數據類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        // 5.設置reduce輸出的數據類型(最終的數據類型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        // 設置自定義的分區類
        // 自定義分區個數要大於分區數
        job.setPartitionerClass(PhoneNumPartitioner.class);
        job.setNumReduceTasks(5);
        
        // 6.設置輸入存在的路徑與處理後的結果路徑
        FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
        FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));
        
        // 7.提交任務
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

6.輸入的文件HTTP_20180313143750.dat

3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育網站    24    27    299    681    200
3631279950322    13822544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4    www.taobao.com    淘寶網    4    0    264    0    200
3631279910362    13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
3631244000322    13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
3631279930342    18212575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    視頻網站    15    12    1527    2106    200
3631279950342    13884138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
3631279930352    13510439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
3631279950332    15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    316    296    200
3631279830392    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
3631279840312    13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站點統計    24    9    660    690    200
3631279730382    15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    369    338    200
3631279860392    15889002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站點統計    3    3    938    380    200
3631279920332    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
3631279860312    13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    120    1320    200
3631279840302    13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    綜合門戶    15    12    198    910    200
3631279950332    13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
3631279820302    13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    綜合門戶    57    102    735    11349    400
3631279860322    18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    212    200
3631279900332    13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    4243    200
3631279880322    13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
3631279850362    13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
3631279930352    13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1136    94    200
3631279930353    13560436326    C4-17-FE-BA-DE-D9:CMCC    120.196.100.77    lol.qq.com/    英雄聯盟    18    15    1136    94    200

7.輸出的文件

(1)part-r-00000
13502468823    735    11349    12084
13510439658    1116    954    2070
13560436326    1136    94    1230
13560436666    1136    94    1230
13560439658    918    4938    58562)part-r-00001
13719199419    240    0    240
13726130503    299    681    980
13726238888    2481    24681    27162
13760778710    120    120    2403)part-r-00002
13822544101    264    0    264
13884138413    4116    1432    55484)part-r-00003
13922314466    3008    3720    6728
13925057413    11058    4243    15301
13926251106    240    0    240
13926435656    132    1512    16445)part-r-00004
13480253104    120    1320    1440
13602846565    198    910    1108
13660577991    660    690    1350
15013685858    369    338    707
15889002119    938    380    1318
15920133257    316    296    612
18212575961    1527    2106    3633
18320173382    9531    212    9743

MapReduce小文件優化與分區