利用mapreduce對一下文字進行處理,對每個使用者的上行流量,下行流量 還有宗總流量進行分析,輸出手機號、上行流量、下行流量
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY iface.qiyi.com 視訊網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC sug.so.360.cn 資訊保安 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC rank.ie.sogou.com 搜尋引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY input.shouji.sogou.com 搜尋引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC t3.baidu.com 搜尋引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 18 15 1116 954 200
1.思路分析,map會對文字進行一行的讀取,讀取 完之後,根據key值進行分組,然後彙總到reduce中進行集中處理
package com.oracle.j2ee.mapreduce; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyFlow { //寫一個BEAN public static class FlowBean implements Writable{ public long up; public long down; public long count; public FlowBean() { } public FlowBean(long up,long down) { this.up=up; this.down=down; this.count=up+down; } public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } @Override public void write(DataOutput out) throws IOException { out.writeLong(up); out.writeLong(down); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { up=in.readLong(); down=in.readLong(); count=in.readLong(); } @Override public String toString() { return up+" "+down+" "+count; } } //處理讀取的資料 public static class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String str=value.toString(); String[] news=str.split("\t"); String phone=news[1]; long up=Long.parseLong(news[news.length-3]); long down=Long.parseLong(news[news.length-2]); FlowBean flow=new FlowBean(up, down); context.write(new Text(phone), flow); } } public static class MyReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> iter, Context context) throws IOException, InterruptedException { long upSum=0; long downSum=0; for(FlowBean arr:iter) { upSum+=arr.getUp(); downSum+=arr.getDown(); } FlowBean flow=new FlowBean(upSum, downSum); context.write(key, flow); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //獲取配置 Configuration conf=new Configuration(); //獲取Job例項 Job job=Job.getInstance(conf); //載入jar包,以方便可以在叢集上執行 job.setJarByClass(MyFlow.class); //載入Mymapper與MyReducer job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //載入map輸出key與value的型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //載入reduce的輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //配置輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交任務 job.waitForCompletion(true); } }
上傳命令為: hadoop fs -put flow.log /input/
執行jar包, hadoop jar flow.jar com.oracle.j2ee.mapreduce.MyFlow /input/flow.log /output
[[email protected] apps]# hadoop jar flow.jar com.oracle.j2ee.mapreduce.MyFlow /flow.log /myoutput4 18/09/28 03:42:59 INFO client.RMProxy: Connecting to ResourceManager at hadoop-1/ 18/09/28 03:42:59 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 18/09/28 03:43:00 INFO input.FileInputFormat: Total input paths to process : 1 18/09/28 03:43:00 INFO mapreduce.JobSubmitter: number of splits:1 18/09/28 03:43:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1538049918445_0009 18/09/28 03:43:01 INFO impl.YarnClientImpl: Submitted application application_1538049918445_0009 18/09/28 03:43:01 INFO mapreduce.Job: The url to track the job: http://hadoop-1:8088/proxy/application_1538049918445_0009/ 18/09/28 03:43:01 INFO mapreduce.Job: Running job: job_1538049918445_0009 18/09/28 03:43:10 INFO mapreduce.Job: Job job_1538049918445_0009 running in uber mode : false 18/09/28 03:43:10 INFO mapreduce.Job: map 0% reduce 0% 18/09/28 03:43:19 INFO mapreduce.Job: map 100% reduce 0% 18/09/28 03:43:27 INFO mapreduce.Job: map 100% reduce 100% 18/09/28 03:43:28 INFO mapreduce.Job: Job job_1538049918445_0009 completed successfully 18/09/28 03:43:28 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=839 FILE: Number of bytes written=214929 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=2284 HDFS: Number of bytes written=551 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=6508 Total time spent by all reduces in occupied slots (ms)=6420 Total time spent by all map tasks (ms)=6508 Total time spent by all reduce tasks (ms)=6420 Total vcore-milliseconds taken by all map tasks=6508 Total vcore-milliseconds taken by all reduce tasks=6420 Total megabyte-milliseconds taken by all map tasks=6664192 Total megabyte-milliseconds taken by all reduce tasks=6574080 Map-Reduce Framework Map input records=22 Map output records=22 Map output bytes=789 Map output materialized bytes=839 Input split bytes=94 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=839 Reduce input records=22 Reduce output records=21 Spilled Records=44 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=167 CPU time spent (ms)=1570 Physical memory (bytes) snapshot=330604544 Virtual memory (bytes) snapshot=1684975616 Total committed heap usage (bytes)=136450048 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=2190 File Output Format Counters Bytes Written=551
[[email protected] apps]# hadoop fs -cat /myoutput4/part-r-00000
13480253104 180 180 360
13502468823 7335 110349 117684
13560436666 1116 954 2070
13560439658 2034 5892 7926
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13726238888 2481 24681 27162
13760778710 120 120 240
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548
2.在叢集中執行需要 job.setJarByClass(****.class);