MapReduce流量統計
阿新 • • 發佈:2019-04-04
運行 new override += 構造方法 sta 路徑 -a paths
準備數據access.log
要用到的只有第二個
手機號
,倒數第三上行流量
,倒數第二下行流量
。
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 10000 20000 200
自定義復雜數據類型
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 自定義復雜數據類型: * (1)需要實現Writable接口 * (2)需要實現接口中的write和readFields方法 * (3)比較隱蔽的一點,需要定義默認的空構造方法 * 屬性必須覆蓋getter/setter方法! */ public class Access implements Writable { private String phone; //手機號 private long up; //上行流量 private long down; //下行流量 private long sum; //總流量(上行+下行) public Access(){} public Access(String phone,long up,long down){ this.phone = phone; this.up = up; this.down = down; this.sum = up + down; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); //String out.writeLong(up); //long out.writeLong(down); out.writeLong(sum); } @Override public void readFields(DataInput in) throws IOException { //按照write輸入的順序獲取 this.phone = in.readUTF(); this.up = in.readLong(); this.down = in.readLong(); this.sum = in.readLong(); } // 省略setter/getter... @Override public String toString() { return "Access{" + "phone='" + phone + '\'' + ", up=" + up + ", down=" + down + ", sum=" + sum + '}'; } }
自定義Mapper處理
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 自定義Mapper處理類 * LongWritable:偏移量key * Text: 一行數據value * Text: 手機號作為key * Access: 復雜類型對象作為value */ public class AccessMapper extends Mapper<LongWritable, Text,Text,Access> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); String phone = words[1]; //取出手機號 long up = Long.parseLong(words[words.length-3]); //上行流量 long down = Long.parseLong(words[words.length-2]); //下行流量 long sum = up + down; //總流量 //寫入緩存 context.write(new Text(phone),new Access(phone,up,down)); } }
自定義Reducer處理
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 自定義Reducer處理類
*/
public class AccessReducer extends Reducer<Text,Access,Text,Access> {
@Override
protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {
//values裏都是手機號相同的對象,即<Access,Access>
long ups = 0;
long downs = 0;
for (Access access : values) {
ups += access.getUp();
downs += access.getDown();
}
// 寫入context
context.write(key,new Access(key.toString(),ups,downs));
}
}
編寫Driver類
public class AccessLocalApp {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
// 創建一個Job
Job job = Job.getInstance(conf);
job.setJarByClass(AccessLocalApp.class);
// 設置Mapper和Reduer類
job.setMapperClass(AccessMapper.class);
job.setReducerClass(AccessReducer.class);
// 設置Mapper和Reducer輸出的key,value的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Access.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Access.class);
// 輸入,輸出路徑
FileInputFormat.setInputPaths(job,new Path("input"));
FileOutputFormat.setOutputPath(job,new Path("output"));
// 提交作業Job
job.waitForCompletion(true);
}
}
輸出結果part-r-00000
13480253104 Access{phone='13480253104', up=180, down=180, sum=360}
13502468823 Access{phone='13502468823', up=7335, down=110349, sum=117684}
13560436666 Access{phone='13560436666', up=1116, down=954, sum=2070}
13560439658 Access{phone='13560439658', up=2034, down=5892, sum=7926}
13602846565 Access{phone='13602846565', up=1938, down=2910, sum=4848}
13660577991 Access{phone='13660577991', up=6960, down=690, sum=7650}
13719199419 Access{phone='13719199419', up=240, down=0, sum=240}
13726230503 Access{phone='13726230503', up=2481, down=24681, sum=27162}
13726238888 Access{phone='13726238888', up=12481, down=44681, sum=57162}
13760778710 Access{phone='13760778710', up=120, down=120, sum=240}
13826544101 Access{phone='13826544101', up=264, down=0, sum=264}
13922314466 Access{phone='13922314466', up=3008, down=3720, sum=6728}
13925057413 Access{phone='13925057413', up=11058, down=48243, sum=59301}
13926251106 Access{phone='13926251106', up=240, down=0, sum=240}
13926435656 Access{phone='13926435656', up=132, down=1512, sum=1644}
15013685858 Access{phone='15013685858', up=3659, down=3538, sum=7197}
15920133257 Access{phone='15920133257', up=3156, down=2936, sum=6092}
15989002119 Access{phone='15989002119', up=1938, down=180, sum=2118}
18211575961 Access{phone='18211575961', up=1527, down=2106, sum=3633}
18320173382 Access{phone='18320173382', up=9531, down=2412, sum=11943}
84138413 Access{phone='84138413', up=4116, down=1432, sum=5548}
重構思路
- 可以看到,輸出結果是
Access{...}
類型,其實就是toString
格式問題,可以修改如下:
public String toString() {
return phone+","+up+","+down+","+sum;
}
- 使用
NullWritable
// Reducer類型
public class AccessReducer extends Reducer<Text,Access, NullWritable,Access> { ... }
// Reducer輸出key
context.write(NullWritable.get(),new Access(key.toString(),ups,downs));
- 升級Reducer的完整代碼
public class AccessReducer extends Reducer<Text,Access, NullWritable,Access> {
@Override
protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {
//values裏都是手機號相同的對象,即<Access,Access>
long ups = 0;
long downs = 0;
for (Access access : values) {
ups += access.getUp();
downs += access.getDown();
}
// 寫入context
context.write(NullWritable.get(),new Access(key.toString(),ups,downs));
}
}
新的輸出結果
13480253104,180,180,360
13502468823,7335,110349,117684
13560436666,1116,954,2070
13560439658,2034,5892,7926
13602846565,1938,2910,4848
13660577991,6960,690,7650
13719199419,240,0,240
13726230503,2481,24681,27162
13726238888,12481,44681,57162
13760778710,120,120,240
13826544101,264,0,264
13922314466,3008,3720,6728
13925057413,11058,48243,59301
13926251106,240,0,240
13926435656,132,1512,1644
15013685858,3659,3538,7197
15920133257,3156,2936,6092
15989002119,1938,180,2118
18211575961,1527,2106,3633
18320173382,9531,2412,11943
84138413,4116,1432,5548
這樣就拿到我們想要的數據結果了!
自定義Partitioner
需求:將統計結果按照手機號的前綴進行區分,寫到不同的文件中。
- 自定義
Partitioner
實現
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* MapReduce自定義分區規則
*/
public class AccessPartitioner extends Partitioner<Text,Access> {
/**
* @param phone 手機號
*/
@Override
public int getPartition(Text phone, Access access, int numPartitions) {
if (phone.toString().startsWith("13")){
return 0;
}else if(phone.toString().startsWith("15")){
return 1;
}else {
return 2;
}
}
}
- Driiver參數
// 設置自定義分區規則
job.setPartitionerClass(AccessPartitioner.class);
// 設置Reduce個數
job.setNumReduceTasks(3);
- 再次運行,就得到了想要的統計結果
part-r-00000
、part-r-00001
、part-r-00002
13480253104,180,180,360
13502468823,7335,110349,117684
13560436666,1116,954,2070
13560439658,2034,5892,7926
13602846565,1938,2910,4848
13660577991,6960,690,7650
13719199419,240,0,240
13726230503,2481,24681,27162
13726238888,12481,44681,57162
13760778710,120,120,240
13826544101,264,0,264
13922314466,3008,3720,6728
13925057413,11058,48243,59301
13926251106,240,0,240
13926435656,132,1512,1644
15013685858,3659,3538,7197
15920133257,3156,2936,6092
15989002119,1938,180,2118
18211575961,1527,2106,3633
18320173382,9531,2412,11943
84138413,4116,1432,5548
MapReduce流量統計