1. 程式人生 > >大數據實戰:用戶流量分析系統

大數據實戰:用戶流量分析系統

instance ron () 地址 pre 過程 原理 獲得 .net

文章出處:http://blog.csdn.net/sdksdk0/article/details/51628874

作者:朱培

---------------------------------------------------------------------------------------------------------------

本文是結合Hadoop中的mapreduce來對用戶數據進行分析,統計用戶的手機號碼、上行流量、下行流量、總流量的信息,同時可以按照總流量大小對用戶進行分組排序等。是一個非常簡潔易用的hadoop項目,主要用戶進一步加強對MapReduce的理解及實際應用。文末提供源數據采集文件和系統源碼。

本案例非常適合hadoop初級人員學習以及想入門大數據、雲計算、數據分析等領域的朋友進行學習。

一、待分析的數據源

以下是一個待分析的文本文件,裏面有非常多的用戶瀏覽信息,保擴用戶手機號碼,上網時間,機器序列號,訪問的IP,訪問的網站,上行流量,下行流量,總流量等信息。這裏只截取一小段,具體文件在文末提供下載鏈接。

技術分享


二、基本功能實現

想要統計出用戶的上行流量、下行流量、總流量信息,我們需要建立一個bean類來對數據進行封裝。於是新建應該Java工程,導包,或者直接建立一個MapReduce工程。在這裏面建立一個FlowBean.java文件。 [html] view plain copy
print?
  1. private long upFlow;
  2. private long dFlow;
  3. private long sumFlow;
然後就是各種右鍵生成get,set方法,還要toString(),以及生成構造函數,(千萬記得要生成一個空的構造函數,不然後面進行分析的時候會報錯)。 完整代碼如下: [java] view plain copy print?
  1. package cn.tf.flow;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Writable;
  6. import org.apache.hadoop.io.WritableComparable;
  7. public class FlowBean implements WritableComparable<FlowBean>{
  8. private long upFlow;
  9. private long dFlow;
  10. private long sumFlow;
  11. public long getUpFlow() {
  12. return upFlow;
  13. }
  14. public void setUpFlow(long upFlow) {
  15. this.upFlow = upFlow;
  16. }
  17. public long getdFlow() {
  18. return dFlow;
  19. }
  20. public void setdFlow(long dFlow) {
  21. this.dFlow = dFlow;
  22. }
  23. public long getSumFlow() {
  24. return sumFlow;
  25. }
  26. public void setSumFlow(long sumFlow) {
  27. this.sumFlow = sumFlow;
  28. }
  29. public FlowBean(long upFlow, long dFlow) {
  30. super();
  31. this.upFlow = upFlow;
  32. this.dFlow = dFlow;
  33. this.sumFlow = upFlow+dFlow;
  34. }
  35. @Override
  36. public void readFields(DataInput in) throws IOException {
  37. upFlow=in.readLong();
  38. dFlow=in.readLong();
  39. sumFlow=in.readLong();
  40. }
  41. @Override
  42. public void write(DataOutput out) throws IOException {
  43. out.writeLong(upFlow);
  44. out.writeLong(dFlow);
  45. out.writeLong(sumFlow);
  46. }
  47. public FlowBean() {
  48. super();
  49. }
  50. @Override
  51. public String toString() {
  52. return upFlow + "\t" + dFlow + "\t" + sumFlow;
  53. }
  54. @Override
  55. public int compareTo(FlowBean o) {
  56. return this.sumFlow>o.getSumFlow() ? -1:1;
  57. }
  58. }

然後就是這個統計的代碼了,新建一個FlowCount.java.在這個類裏面,我直接把Mapper和Reduce寫在同一個類裏面了,如果按規範的要求應該是要分開寫的。 在mapper中,獲取後面三段數據的值,所以我的這裏length-2,length-3. [java] view plain copy print?
  1. public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
  2. @Override
  3. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  4. // 拿到這行的內容轉成string
  5. String line = value.toString();
  6. String[] fields = StringUtils.split(line, "\t");
  7. try {
  8. if (fields.length > 3) {
  9. // 獲得手機號及上下行流量字段值
  10. String phone = fields[1];
  11. long upFlow = Long.parseLong(fields[fields.length - 3]);
  12. long dFlow = Long.parseLong(fields[fields.length - 2]);
  13. // 輸出這一行的處理結果,key為手機號,value為流量信息bean
  14. context.write(new Text(phone), new FlowBean(upFlow, dFlow));
  15. } else {
  16. return;
  17. }
  18. } catch (Exception e) {
  19. }
  20. }
  21. }


 
在reduce中隊數據進行整理,統計
 



[java] view plain copy
 
 print?

  1. public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
  2. @Override
  3. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
  4. long upSum = 0;
  5. long dSum = 0;
  6. for (FlowBean bean : values) {
  7. upSum += bean.getUpFlow();
  8. dSum += bean.getdFlow();
  9. }
  10. FlowBean resultBean = new FlowBean(upSum, dSum);
  11. context.write(key, resultBean);
  12. }
  13. }


最後在main方法中調用執行。 [java] view plain copy print?
  1. public static void main(String[] args) throws Exception {
  2. Configuration conf = new Configuration();
  3. Job job = Job.getInstance(conf);
  4. job.setJarByClass(FlowCount.class);
  5. job.setMapperClass(FlowCountMapper.class);
  6. job.setReducerClass(FlowCountReducer.class);
  7. job.setMapOutputKeyClass(Text.class);
  8. job.setMapOutputValueClass(FlowBean.class);
  9. job.setOutputKeyClass(Text.class);
  10. job.setOutputValueClass(FlowBean.class);
  11. FileInputFormat.setInputPaths(job, new Path(args[0]));
  12. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  13. boolean res = job.waitForCompletion(true);
  14. System.exit(res ? 0 : 1);
  15. }
當然啦,還需要先在你的hdfs根目錄中建立/flow/data數據,然後我那個用戶的數據源上傳上去。 [java] view plain copy print?
  1. bin/hadoop fs -mkdir -p /flow/data
  2. bin/hadoop fs -put HTTP_20130313143750.dat /flow/data
  3. bin/hadoop jar ../lx/flow.jar
把上面這個MapReduce工程打包成一個jar文件,然後用hadoop來執行這個jar文件。例如我放在~/hadoop/lx/flow.jar,然後再hadoop安裝目錄中執行 [java] view plain copy print?
  1. bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount /flow/data /flow/output
最後執行結果如下:
技術分享

在這整過過程中,我們是有yarnchild的進程在執行的,如下圖所示:當整個過程執行完畢之後yarnchild也會自動退出。 技術分享

三、按總流量從大到小排序

如果你上面這個基本操作以及完成了的話,按總流量排序就非常簡單了。我們新建一個FlowCountSort.Java.

全部代碼如下:

[java] view plain copy print?
  1. package cn.tf.flow;
  2. import java.io.IOException;
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. public class FlowCountSort {
  14. public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
  15. @Override
  16. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  17. String line=value.toString();
  18. String[] fields=StringUtils.split(line,"\t");
  19. String phone=fields[0];
  20. long upSum=Long.parseLong(fields[1]);
  21. long dSum=Long.parseLong(fields[2]);
  22. FlowBean sumBean=new FlowBean(upSum,dSum);
  23. context.write(sumBean, new Text(phone));
  24. }
  25. }
  26. public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
  27. //進來的“一組”數據就是一個手機的流量bean和手機號
  28. @Override
  29. protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  30. context.write(values.iterator().next(), key);
  31. }
  32. }
  33. public static void main(String[] args) throws Exception {
  34. Configuration conf = new Configuration();
  35. Job job = Job.getInstance(conf);
  36. job.setJarByClass(FlowCountSort.class);
  37. job.setMapperClass(FlowCountSortMapper.class);
  38. job.setReducerClass(FlowCountSortReducer.class);
  39. job.setMapOutputKeyClass(FlowBean.class);
  40. job.setMapOutputValueClass(Text.class);
  41. job.setOutputKeyClass(Text.class);
  42. job.setOutputValueClass(FlowBean.class);
  43. FileInputFormat.setInputPaths(job, new Path(args[0]));
  44. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  45. boolean res = job.waitForCompletion(true);
  46. System.exit(res ? 0 : 1);
  47. }
  48. }


這個主要就是使用了FlowBean.java中的代碼來實現的,主要是繼承了WritableComparable<FlowBean>接口來實現,然後重寫了compareTo()方法。

[html] view plain copy print?
  1. @Override
  2. public int compareTo(FlowBean o) {
  3. return this.sumFlow>o.getSumFlow() ? -1:1;
  4. }

按照同樣的方法對這個文件打成jar包,然後使用hadoop的相關語句進行執行就可以了。

[java] view plain copy print?
  1. bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort /flow/output /flow/sortoutput

結果圖:


技術分享

四、按用戶號碼區域進行分類

流量匯總之後的結果需要按照省份輸出到不同的結果文件中,需要解決兩個問題:

1、如何讓mr的最終結果產生多個文件: 原理:MR中的結果文件數量由reduce
task的數量絕對,是一一對應的 做法:在代碼中指定reduce task的數量


2、如何讓手機號進入正確的文件 原理:讓不同手機號數據發給正確的reduce task,就進入了正確的結果文件
要自定義MR中的分區partition的機制(默認的機制是按照kv中k的hashcode%reducetask數)
做法:自定義一個類來幹預MR的分區策略——Partitioner的自定義實現類

主要代碼與前面的排序是非常類似的,只要在main方法中添加如下兩行代碼就可以了。

[java] view plain copy print?
  1. //指定自定義的partitioner
  2. job.setPartitionerClass(ProvincePartioner.class);
  3. job.setNumReduceTasks(5);


這裏我們需要新建一個ProvincePartioner.java來處理號碼分類的邏輯。

[java] view plain copy print?
  1. public class ProvincePartioner extends Partitioner<Text, FlowBean>{
  2. private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
  3. static {
  4. provinceMap.put("135", 0);
  5. provinceMap.put("136", 1);
  6. provinceMap.put("137", 2);
  7. provinceMap.put("138", 3);
  8. }
  9. @Override
  10. public int getPartition(Text key, FlowBean value, int numPartitions) {
  11. String prefix = key.toString().substring(0, 3);
  12. Integer partNum = provinceMap.get(prefix);
  13. if(partNum == null) partNum=4;
  14. return partNum;
  15. }
  16. }


執行方法和前面也是一樣的。從執行的流程中我們可以看到這裏啟動了5個reduce task,因為我這裏數據量比較小,所以只啟動了一個map task。

技術分享

到這裏,整個用戶流量分析系統就全部結束了。關於大數據的更多內容,歡迎關註。點擊左上角頭像下方“點擊關註".感謝您的支持!

數據源下載地址:http://download.csdn.net/detail/sdksdk0/9545935

源碼項目地址:https://github.com/sdksdk0/HDFS_MapReduce

大數據實戰:用戶流量分析系統