用一個MapReduce輸出多個key的分割槽檔案
阿新 • • 發佈:2018-12-15
先看一下要處理的資料型別
19392963501,17816115082,2018-09-18 16:19:44,1431
14081946321,13094566759,2018-05-23 09:34:27,0610
13415701165,18939575060,2018-11-23 21:33:23,1031
15590483587,16303009156,2018-08-02 07:38:00,0487
15539613975,17882324598,2018-10-19 09:08:15,0948
資料欄位分別為主叫號碼,被叫號碼,通話時間,通話時長
我們的需求是:將資料按號碼的通話日期的年,月,日分別計算時長和次數,用一個MapReduce實現,在這裡我處理的時候是忽略被叫號碼的通話時長,僅以主叫號碼為例。
程式碼部分
package mapReducePhone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class PhoneNum { public static void main(String[] args) throws Exception{ //獲取連線 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //設定要執行的 jar job.setJarByClass(PhoneNum.class); //指定map和reduce job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); //設定map輸出型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設定reduce 輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //設定分割槽與reduce task 的數量 job.setPartitionerClass(PhonePartitioner.class); job.setNumReduceTasks(3); //設定輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //答應結果是否成功 boolean result = job.waitForCompletion(true); System.out.println(result); } } //資料格式 17026053728,17816115082,2108-03-28 11:09:19,1792 class PhoneMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Text k = new Text(); String[] split = value.toString().split(","); String caller =split[0]; // String callee =split[1]; // String date =split[2]; int statetime =Integer.parseInt(split[3]); String yearData = split[2].replaceAll("-", "").substring(0, 4); String monthData = split[2].replaceAll("-", "").substring(0, 6); String dayData = split[2].replaceAll("-", "").substring(0,9); //僅計算主叫號碼的通話記錄 若需求被叫號碼則將被叫號碼提取出來進行同樣操作 context.write(new Text(caller+"-"+dayData),new IntWritable(statetime)); context.write(new Text(caller+"-"+yearData),new IntWritable(statetime)); context.write(new Text(caller+"-"+monthData),new IntWritable(statetime)); } } class PhoneReducer extends Reducer<Text,IntWritable,Text,Text> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; int i =0; for(IntWritable value :values){ sum += value.get(); i++; } context.write(new Text(key),new Text(sum+" 次數"+ i)); } } class PhonePartitioner<K, V> extends Partitioner<K, V>{ @Override //自定義partition的數量需要和reduce task數量保持一致 public int getPartition(K key, V value, int numPartitions) { // TODO Auto-generated method stub //根據key的長度進行分割槽 int datelongth=key.toString().length(); switch(datelongth) { case 16 : return 0; case 18 : return 1; } return 2; } }