MapReduce 編程模板編寫【分析網站基本指標UV】程序
阿新 • • 發佈:2018-04-30
地址 自動 trace spa bool this try reducer CI
1.網站基本指標的幾個概念
PV: page view 瀏覽量
頁面的瀏覽次數,用戶每打開一次頁面就記錄一次。
UV:unique visitor 獨立訪客數
一天內訪問某站點的人數(以cookie為例) 但是如果用戶把瀏覽器cookie給刪了之後再次訪問會影響記錄。
VV: visit view 訪客的訪問次數
記錄所有訪客一天內訪問了多少次網站,訪客完成訪問直到瀏覽器關閉算一次。
IP:獨立ip數
指一天內使用不同ip地址的用戶訪問網站的數量。
2.編寫MapReduce編程模板
Driver
package mapreduce; ? import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; ? public class MRDriver extends Configured implements Tool { ? public int run(String[] args) throws Exception { //創建job Job job = Job.getInstance(this.getConf(),"mr-demo"); job.setJarByClass(MRDriver.class); ? //input 默認從hdfs讀取數據 將每一行轉換成key-value Path inPath = new Path(args[0]); FileInputFormat.setInputPaths(job,inPath); ? //map 一行調用一次Map方法 對每一行數據進行分割 job.setMapperClass(null); job.setMapOutputKeyClass(null); job.setMapOutputValueClass(null); ? //shuffle job.setPartitionerClass(null);//分組 job.setGroupingComparatorClass(null);//分區 job.setSortComparatorClass(null);//排序 ? //reduce 每有一條key value調用一次reduce方法 job.setReducerClass(null); job.setOutputKeyClass(null); job.setOutputValueClass(null); ? //output Path outPath = new Path(args[1]); //this.getConf()來自父類 內容為空可以自己set配置信息 FileSystem fileSystem = FileSystem.get(this.getConf()); //如果目錄已經存在則刪除 if(fileSystem.exists(outPath)){ //if path is a directory and set to true fileSystem.delete(outPath,true); } FileOutputFormat.setOutputPath(job, outPath); //submit boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0:1; } ? public static void main(String[] args) { Configuration configuration = new Configuration(); try { int status = ToolRunner.run(configuration, new MRDriver(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } } } ?
Mapper
public class MRModelMapper extends Mapper<LongWritable,Text,Text,LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 實現自己的業務邏輯 */ } }
Reduce
public class MRModelReducer extends Reducer<Text,LongWritable,Text,LongWritable> { ? @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { /** * 根據業務需求自己實現 */ } }
3. 統計每個城市的UV數
分析需求:
UV:unique view 唯一訪問數,一個用戶記一次
map:
key: CityId (城市id) 數據類型: Text
value: guid (用戶id) 數據類型:Text
shuffle:
key: CityId
value: {guid guid guid..}
reduce:
key: CityId
value: 訪問數 即shuffle輸出value的集合大小
output:
key : CityId
value : 訪問數
MRDriver.java mapreduce執行過程
package mapreduce; ? import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; ? public class MRDriver extends Configured implements Tool { ? public int run(String[] args) throws Exception { //創建job Job job = Job.getInstance(this.getConf(),"mr-demo"); job.setJarByClass(MRDriver.class); ? //input 默認從hdfs讀取數據 將每一行轉換成key-value Path inPath = new Path(args[0]); FileInputFormat.setInputPaths(job,inPath); ? //map 一行調用一次Map方法 對每一行數據進行分割 job.setMapperClass(MRMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); ? /* //shuffle job.setPartitionerClass(null);//分組 job.setGroupingComparatorClass(null);//分區 job.setSortComparatorClass();//排序 */ //reduce job.setReducerClass(MRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); ? //output Path outPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(this.getConf()); if(fileSystem.exists(outPath)){ //if path is a directory and set to true fileSystem.delete(outPath,true); } FileOutputFormat.setOutputPath(job, outPath); //submit boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0:1; } ? public static void main(String[] args) { Configuration configuration = new Configuration(); try { int status = ToolRunner.run(configuration, new MRDriver(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } } }
MRMapper.java
package mapreduce; ? import java.io.IOException; ? import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; ? public class MRMapper extends Mapper<LongWritable,Text,Text,Text> { private Text mapOutKey = new Text(); private Text mapOutKey1 = new Text(); //一行調用一次Map方法 對每一行數據進行分割 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲得每行的值 String str = value.toString(); //按空格得到每個item String[] items = str.split("\t"); if (items[24]!=null) { this.mapOutKey.set(items[24]); if (items[5]!=null) { this.mapOutKey1.set(items[5]); } } context.write(mapOutKey, mapOutKey1); } }
MPReducer.java
package mapreduce; ? import java.io.IOException; import java.util.HashSet; ? import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; ? public class MRReducer extends Reducer<Text, Text, Text, IntWritable>{ ? //每有一個key value數據 就執行一次reduce方法 @Override protected void reduce(Text key, Iterable<Text> texts, Reducer<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { HashSet<String> set = new HashSet<String>(); for (Text text : texts) { set.add(text.toString()); } context.write(key,new IntWritable(set.size())); } }
4.MapReduce執行wordcount過程理解
input:默認從HDFS讀取數據
Path inPath = new Path(args[0]); FileInputFormat.setInputPaths(job,inPath);
將每一行數據轉換為key-value(分割),這一步由MapReduce框架自動完成。
輸出行的偏移量和行的內容
mapper: 分詞輸出
數據過濾,數據補全,字段格式化
輸入:input的輸出
將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對。
一行調用一次map方法。
統計word中的map:
shuffle: 分區,分組,排序
輸出:
<Bye,1>
<Hello,1>
<World,1,1>
得到map輸出的<key,value>對,Mapper會將他們按照key進行排序,得到mapper的最終輸出結果。
Reduce :每一條Keyvalue調用一次reduce方法
將相同Key的List<value>,進行相加求和
output:將reduce輸出寫入hdfs
MapReduce 編程模板編寫【分析網站基本指標UV】程序