MapReduce小文件優化與分區
阿新 • • 發佈:2018-10-26
get public 流量 計數 exception true boolean 大於 文件
一、小文件優化
1.Mapper類
package com.css.combine; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 思路? * wordcount單詞計數 * <單詞,1> * * 數據傳輸 * * KEYIN:數據的起始偏移量0~10 11~20 21~30 * VALUEIN:數據 * * KEYOUT:mapper輸出到reduce階段 k的類型 * VALUEOUT:mapper輸出到reduce階段v的類型 * <China,1><Beijing,1><love,1>*/ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //key 起始偏移量 value 數據 context 上下文 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.讀取數據 String line = value.toString();// 2.切割 love Beijing String[] words = line.split(" "); // 3.循環的寫到下一個階段<love,1><Beijing,1> for (String w : words) { // 4.輸出到reducer階段 context.write(new Text(w), new IntWritable(1)); } } }
2.Reducer類
package com.css.combine; import java.io.IOException;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 統計單詞出現的次數 int sum = 0; // 累加求和 for (IntWritable count : values) { // 拿到值累加 sum += count.get(); } // 結果輸出 context.write(key, new IntWritable(sum)); } }
3.Driver類
package com.css.combine; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 獲取jar包 job.setJarByClass(WordCountDriver.class); // 獲取自定義的mapper與reducer類 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 設置map輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置reduce輸出的數據類型(最終的數據類型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定運行的inputformat方式 默認的方式是textinputformat(小文件優化) job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 最大4M CombineTextInputFormat.setMinInputSplitSize(job, 3145728); // 最小3M // 設置輸入存在的路徑與處理後的結果路徑 FileInputFormat.setInputPaths(job, new Path("c:/in1024/")); FileOutputFormat.setOutputPath(job, new Path("c:/out1024/")); // 提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
二、分區
1.Mapper類
package com.css.flow.partition; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 3631279850362 13726130503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 www.itstaredu.com 教育網站 24 27 299 681 200 * * 13726130503 299 681 980 */ public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.獲取數據 String line = value.toString(); // 2.切割 String[] fields = line.split("\t"); // 3.封裝對象 拿到關鍵字段 數據清洗 String phoneN = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dfFlow = Long.parseLong(fields[fields.length - 2]); // 4.輸出到reduce端 context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow)); } }
2.Reducer類
package com.css.flow.partition; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { // 1.相同手機號 的流量使用再次匯總 long upFlow_sum = 0; long dfFlow_sum = 0; // 2.累加 for (FlowBean f : values) { upFlow_sum += f.getUpFlow(); dfFlow_sum += f.getDfFlow(); } FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum); // 3.輸出 context.write(key, rs); } }
3.封裝類
package com.css.flow.partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 封裝類 數據的傳輸 */ public class FlowBean implements Writable{ // 定義屬性 private long upFlow; private long dfFlow; private long flowSum; public FlowBean() { } // 流量累加 public FlowBean(long upFlow, long dfFlow) { this.upFlow = upFlow; this.dfFlow = dfFlow; this.flowSum = upFlow + dfFlow; } // 反序列化 @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dfFlow = in.readLong(); flowSum = in.readLong(); } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dfFlow); out.writeLong(flowSum); } @Override public String toString() { return upFlow + "\t" + dfFlow + "\t" + flowSum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDfFlow() { return dfFlow; } public void setDfFlow(long dfFlow) { this.dfFlow = dfFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } }
4.分區類
package com.css.flow.partition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PhoneNumPartitioner extends Partitioner<Text, FlowBean>{ // 根據手機號前三位進行分區 @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 獲取手機號前三位 String phoneNum = key.toString().substring(0, 3); // 分區 int partitioner = 4; if ("135".equals(phoneNum)) { return 0; }else if ("137".equals(phoneNum)) { return 1; }else if ("138".equals(phoneNum)) { return 2; }else if ("139".equals(phoneNum)) { return 3; } return partitioner; } }
5.Driver類
package com.css.flow.partition; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.獲取jar包 job.setJarByClass(FlowCountDriver.class); // 3.獲取自定義的mapper與reducer類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4.設置map輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5.設置reduce輸出的數據類型(最終的數據類型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 設置自定義的分區類 // 自定義分區個數要大於分區數 job.setPartitionerClass(PhoneNumPartitioner.class); job.setNumReduceTasks(5); // 6.設置輸入存在的路徑與處理後的結果路徑 FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in")); FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out")); // 7.提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
6.輸入的文件HTTP_20180313143750.dat
3631279850362 13726130503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 www.itstaredu.com 教育網站 24 27 299 681 200 3631279950322 13822544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 www.taobao.com 淘寶網 4 0 264 0 200 3631279910362 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 3631244000322 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 3631279930342 18212575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 3631279950342 13884138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 3631279930352 13510439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 3631279950332 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 316 296 200 3631279830392 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 3631279840312 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 660 690 200 3631279730382 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 369 338 200 3631279860392 15889002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 938 380 200 3631279920332 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 3631279860312 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 120 1320 200 3631279840302 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 198 910 200 3631279950332 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 3631279820302 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 735 11349 400 3631279860322 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 212 200 3631279900332 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 4243 200 3631279880322 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 3631279850362 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 3631279930352 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1136 94 200 3631279930353 13560436326 C4-17-FE-BA-DE-D9:CMCC 120.196.100.77 lol.qq.com/ 英雄聯盟 18 15 1136 94 200
7.輸出的文件
(1)part-r-00000 13502468823 735 11349 12084 13510439658 1116 954 2070 13560436326 1136 94 1230 13560436666 1136 94 1230 13560439658 918 4938 5856 (2)part-r-00001 13719199419 240 0 240 13726130503 299 681 980 13726238888 2481 24681 27162 13760778710 120 120 240 (3)part-r-00002 13822544101 264 0 264 13884138413 4116 1432 5548 (4)part-r-00003 13922314466 3008 3720 6728 13925057413 11058 4243 15301 13926251106 240 0 240 13926435656 132 1512 1644 (5)part-r-00004 13480253104 120 1320 1440 13602846565 198 910 1108 13660577991 660 690 1350 15013685858 369 338 707 15889002119 938 380 1318 15920133257 316 296 612 18212575961 1527 2106 3633 18320173382 9531 212 9743
MapReduce小文件優化與分區