1. 程式人生 > >MapReduce 編程模板編寫【分析網站基本指標UV】程序

MapReduce 編程模板編寫【分析網站基本指標UV】程序

地址 自動 trace spa bool this try reducer CI

1.網站基本指標的幾個概念

PV: page view 瀏覽量

頁面的瀏覽次數,用戶每打開一次頁面就記錄一次。

UV:unique visitor 獨立訪客數

一天內訪問某站點的人數(以cookie為例) 但是如果用戶把瀏覽器cookie給刪了之後再次訪問會影響記錄。

VV: visit view 訪客的訪問次數

記錄所有訪客一天內訪問了多少次網站,訪客完成訪問直到瀏覽器關閉算一次。

IP:獨立ip數

指一天內使用不同ip地址的用戶訪問網站的數量。

2.編寫MapReduce編程模板

Driver

package mapreduce;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
?
public class MRDriver extends Configured implements Tool {
?
    public int run(String[] args) throws Exception {
        //創建job
        Job job = Job.getInstance(this.getConf(),"mr-demo");
        job.setJarByClass(MRDriver.class);
?
        //input 默認從hdfs讀取數據 將每一行轉換成key-value
        Path inPath = new Path(args[0]);
        FileInputFormat.setInputPaths(job,inPath);
?
        //map 一行調用一次Map方法  對每一行數據進行分割
        job.setMapperClass(null);
        job.setMapOutputKeyClass(null);
        job.setMapOutputValueClass(null);
?
        //shuffle
        job.setPartitionerClass(null);//分組
        job.setGroupingComparatorClass(null);//分區
        job.setSortComparatorClass(null);//排序
?
        //reduce 每有一條key value調用一次reduce方法
        job.setReducerClass(null);
        job.setOutputKeyClass(null);
        job.setOutputValueClass(null);
?
        //output
        Path outPath = new Path(args[1]);
        //this.getConf()來自父類 內容為空可以自己set配置信息
        FileSystem fileSystem = FileSystem.get(this.getConf());
        //如果目錄已經存在則刪除
        if(fileSystem.exists(outPath)){
            //if path is a directory and set to true
            fileSystem.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job, outPath);
        //submit
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0:1;
    }
?
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
?

Mapper

public class MRModelMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /**
         * 實現自己的業務邏輯
         */
    }
}

Reduce

public class MRModelReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
?
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        /**
         * 根據業務需求自己實現
         */
    }
}

3. 統計每個城市的UV數

分析需求:

UV:unique view 唯一訪問數,一個用戶記一次

map:

key: CityId (城市id) 數據類型: Text

value: guid (用戶id) 數據類型:Text

shuffle:

key: CityId

value: {guid guid guid..}

reduce:

key: CityId

value: 訪問數 即shuffle輸出value的集合大小

output:

key : CityId

value : 訪問數

MRDriver.java mapreduce執行過程



package mapreduce;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
?
public class MRDriver extends Configured implements Tool {
?
    public int run(String[] args) throws Exception {
        //創建job
        Job job = Job.getInstance(this.getConf(),"mr-demo");
        job.setJarByClass(MRDriver.class);
?
        //input 默認從hdfs讀取數據 將每一行轉換成key-value
        Path inPath = new Path(args[0]);
        FileInputFormat.setInputPaths(job,inPath);
?
        //map 一行調用一次Map方法  對每一行數據進行分割
        job.setMapperClass(MRMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
?
       /* //shuffle
        job.setPartitionerClass(null);//分組
        job.setGroupingComparatorClass(null);//分區
        job.setSortComparatorClass();//排序
*/
        //reduce
        job.setReducerClass(MRReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
?
        //output
        Path outPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(this.getConf());
        if(fileSystem.exists(outPath)){
            //if path is a directory and set to true
            fileSystem.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job, outPath);
        
        //submit
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0:1;
    }
?
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MRMapper.java

package mapreduce;
?
import java.io.IOException;
?
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
?
public class MRMapper extends Mapper<LongWritable,Text,Text,Text> {
    private Text mapOutKey = new Text();
    private Text mapOutKey1 = new Text();
    
    //一行調用一次Map方法  對每一行數據進行分割
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        //獲得每行的值
        String str = value.toString();
        //按空格得到每個item
        String[] items = str.split("\t");
        
        if (items[24]!=null) {
            this.mapOutKey.set(items[24]);
            if (items[5]!=null) {
                this.mapOutKey1.set(items[5]);
            }
        }
        context.write(mapOutKey, mapOutKey1);
    }
    
}

MPReducer.java

package mapreduce;
?
import java.io.IOException;
import java.util.HashSet;
?
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
?
public class MRReducer extends Reducer<Text, Text, Text, IntWritable>{
?
    //每有一個key value數據 就執行一次reduce方法
    @Override
    protected void reduce(Text key, Iterable<Text> texts, Reducer<Text, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        
        HashSet<String> set = new HashSet<String>();
        
        for (Text text : texts) {
            set.add(text.toString());
        }
        
        context.write(key,new IntWritable(set.size()));
    
    }   
}
 

4.MapReduce執行wordcount過程理解

input:默認從HDFS讀取數據

 Path inPath = new Path(args[0]);
 FileInputFormat.setInputPaths(job,inPath);

將每一行數據轉換為key-value(分割),這一步由MapReduce框架自動完成。

輸出行的偏移量和行的內容

技術分享圖片

技術分享圖片

mapper: 分詞輸出

數據過濾,數據補全,字段格式化

輸入:input的輸出

將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對。

一行調用一次map方法。

統計word中的map:

技術分享圖片

技術分享圖片

shuffle: 分區,分組,排序

輸出:

<Bye,1>

<Hello,1>

<World,1,1>

得到map輸出的<key,value>對,Mapper會將他們按照key進行排序,得到mapper的最終輸出結果。

Reduce :每一條Keyvalue調用一次reduce方法

將相同Key的List<value>,進行相加求和

output:將reduce輸出寫入hdfs

MapReduce 編程模板編寫【分析網站基本指標UV】程序