package com.empire.hadoop.mr.provinceflow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable { private long upFlow; private long dFlow; private long sumFlow; //反序列化時,需要反射呼叫空參建構函式,所以要顯示定義一個 public FlowBean() { } public FlowBean(long upFlow, long dFlow) { this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow(long dFlow) { this.dFlow = dFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /** * 序列化方法 */ public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意:反序列化的順序跟序列化的順序完全一致 */ public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dFlow = in.readLong(); sumFlow = in.readLong(); } public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow; } }
ProvincePartitioner (自定義分割槽類)
package com.empire.hadoop.mr.provinceflow; import java.util.HashMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * K2 V2 對應的是map輸出kv的型別 * * @author */ public class ProvincePartitioner extends Partitioner<Text, FlowBean> { public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>(); static { proviceDict.put("136", 0); proviceDict.put("137", 1); proviceDict.put("138", 2); proviceDict.put("139", 3); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String prefix = key.toString().substring(0, 3); Integer provinceId = proviceDict.get(prefix); return provinceId == null ? 4 : provinceId; } }
package com.empire.hadoop.mr.provinceflow;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); //將一行內容轉成string
String[] fields = line.split("\t"); //切分欄位
String phoneNbr = fields[1]; //取出手機號
long upFlow = Long.parseLong(fields[fields.length - 3]); //取出上行流量下行流量
long dFlow = Long.parseLong(fields[fields.length - 2]);
context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;
for (FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_dFlow += bean.getdFlow();
FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
context.write(key, resultBean);
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
* conf.set("mapreduce.framework.name", "yarn");
* conf.set("yarn.resoucemanager.hostname", "mini1");
Job job = Job.getInstance(conf);
/* job.setJar("/home/hadoop/wc.jar"); */
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/* job.submit(); */
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
hadoop jar flowcount_patitioner_aaron.jar com.empire.hadoop.mr.provinceflow.FlowCount /user/hadoop/flowcount /flowcountpatitioner
hdfs dfs -ls /flowcountpatitioner
hdfs dfs -cat /flowcountpatitioner/part-r-00000
18/11/29 07:26:20 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/
18/11/29 07:26:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/11/29 07:26:22 INFO input.FileInputFormat: Total input files to process : 5
18/11/29 07:26:22 INFO mapreduce.JobSubmitter: number of splits:5
18/11/29 07:26:22 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/11/29 07:26:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1543447570289_0001
18/11/29 07:26:24 INFO impl.YarnClientImpl: Submitted application application_1543447570289_0001
18/11/29 07:26:24 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1543447570289_0001/
18/11/29 07:26:24 INFO mapreduce.Job: Running job: job_1543447570289_0001
18/11/29 07:26:36 INFO mapreduce.Job: Job job_1543447570289_0001 running in uber mode : false
18/11/29 07:26:36 INFO mapreduce.Job: map 0% reduce 0%
18/11/29 07:26:45 INFO mapreduce.Job: map 20% reduce 0%
18/11/29 07:27:02 INFO mapreduce.Job: map 40% reduce 1%
18/11/29 07:27:04 INFO mapreduce.Job: map 100% reduce 1%
18/11/29 07:27:05 INFO mapreduce.Job: map 100% reduce 8%
18/11/29 07:27:06 INFO mapreduce.Job: map 100% reduce 60%
18/11/29 07:27:07 INFO mapreduce.Job: map 100% reduce 100%
18/11/29 07:27:07 INFO mapreduce.Job: Job job_1543447570289_0001 completed successfully
18/11/29 07:27:08 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=4195
FILE: Number of bytes written=1986755
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=11574
HDFS: Number of bytes written=594
HDFS: Number of read operations=30
HDFS: Number of large read operations=0
HDFS: Number of write operations=10
Job Counters
Killed map tasks=1
Launched map tasks=6
Launched reduce tasks=5
Data-local map tasks=6
Total time spent by all maps in occupied slots (ms)=111307
Total time spent by all reduces in occupied slots (ms)=93581
Total time spent by all map tasks (ms)=111307
Total time spent by all reduce tasks (ms)=93581
Total vcore-milliseconds taken by all map tasks=111307
Total vcore-milliseconds taken by all reduce tasks=93581
Total megabyte-milliseconds taken by all map tasks=113978368
Total megabyte-milliseconds taken by all reduce tasks=95826944
Map-Reduce Framework
Map input records=110
Map output records=110
Map output bytes=3945
Map output materialized bytes=4315
Input split bytes=624
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=4315
Reduce input records=110
Reduce output records=21
Spilled Records=220
Shuffled Maps =25
Failed Shuffles=0
Merged Map outputs=25
GC time elapsed (ms)=3300
CPU time spent (ms)=5980
Physical memory (bytes) snapshot=1349332992
Virtual memory (bytes) snapshot=8470929408
Total committed heap usage (bytes)=689782784
Shuffle Errors
File Input Format Counters
Bytes Read=10950
File Output Format Counters
Bytes Written=594
[[email protected] ~]$ hdfs dfs -ls /flowcountpatitioner
Found 6 items
-rw-r--r-- 2 hadoop supergroup 0 2018-11-29 07:27 /flowcountpatitioner/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 58 2018-11-29 07:27 /flowcountpatitioner/part-r-00000
-rw-r--r-- 2 hadoop supergroup 113 2018-11-29 07:27 /flowcountpatitioner/part-r-00001
-rw-r--r-- 2 hadoop supergroup 24 2018-11-29 07:27 /flowcountpatitioner/part-r-00002
-rw-r--r-- 2 hadoop supergroup 112 2018-11-29 07:27 /flowcountpatitioner/part-r-00003
-rw-r--r-- 2 hadoop supergroup 287 2018-11-29 07:27 /flowcountpatitioner/part-r-00004
[[email protected] ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00000
13602846565 9690 14550 24240
13660577991 34800 3450 38250
[[email protected] ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00001
13719199419 1200 0 1200
13726230503 12405 123405 135810
13726238888 12405 123405 135810
13760778710 600 600 1200
[[email protected] ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00002
13826544101 1320 0 1320
[[email protected] ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00003
13922314466 15040 18600 33640
13925057413 55290 241215 296505
13926251106 1200 0 1200
13926435656 660 7560 8220
[[email protected] ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00004
13480253104 900 900 1800
13502468823 36675 551745 588420
13560436666 5580 4770 10350
13560439658 10170 29460 39630
15013685858 18295 17690 35985
15920133257 15780 14680 30460
15989002119 9690 900 10590
18211575961 7635 10530 18165
18320173382 47655 12060 59715
84138413 20580 7160 27740
* Partition keys by their {@link Object#hashCode()}.
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;