1. 程式人生 > >mr 程式自定義分組的實現

mr 程式自定義分組的實現

 

 

AreaPartitioner

package cn.itcast.hadoop.mr.areapartition;

import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {

	private static HashMap<String, Integer> areaMap = new HashMap<>();

	static {
		areaMap.put("135", 0);
		areaMap.put("136", 1);
		areaMap.put("137", 2);
		areaMap.put("138", 3);
		areaMap.put("139", 4);
	}

	@Override
	public int getPartition(KEY key, VALUE value, int numPartitions) {
		// 從 key 中拿到手機號,查詢手機歸屬字典,不同省份返回不同的組號

		int areaCoder = areaMap.get(key.toString().substring(0, 3)) == null ? 5
				: areaMap.get(key.toString().substring(0, 3));

		return areaCoder;
	}

}

FlowSumArea

package cn.itcast.hadoop.mr.areapartition;

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.itcast.hadoop.mr.flowsum.FlowBean;


/**
 * 對流量原始日誌進行流量統計,將不同省份的使用者統計結果輸出到不同檔案
 * 需要自定義改造兩個機制:
 * 1.改造分割槽的邏輯,自定義一個parttioner
 * 2.自定義 reduce task 的併發任務數量
 * 
 * @author [email protected]
 *
 */
public class FlowSumArea {

	public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {

			//拿一行資料
			String line = value.toString();
			//切分各個欄位
			String[] fields = StringUtils.split(line, "\t");
			
			//拿到我們需要的欄位
			String phoneNB = fields[1];
			long u_flow = Long.parseLong(fields[7]);
			long d_flow = Long.parseLong(fields[8]);
			
			//封裝成 kv 並輸出
			context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

		}
		
		
	}
	
	
	public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
		
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values,Context context)
				throws IOException, InterruptedException {

			long up_flow_counter = 0;
			long d_flow_counter = 0;
			
			for(FlowBean bean: values){
				
				up_flow_counter += bean.getUp_flow();
				d_flow_counter += bean.getD_flow();
				
				
			}
			
			context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
			
			
			
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowSumArea.class);
		
		job.setMapperClass(FlowSumAreaMapper.class);
		job.setReducerClass(FlowSumAreaReducer.class);
		
		//設定我們自定義的邏輯定義
		job.setPartitionerClass(AreaPartitioner.class);
		
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		//設定reduce的任務併發數,應該跟分組的數量保持一致
		job.setNumReduceTasks(6);
		
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		
		System.exit(job.waitForCompletion(true)?0:1);
		
		
	}
	
	
}

打包 jar 包,上傳:

上面最後一個加個 2 。。

 

 

 

reduce 併發數量如果 < 分組數,會報錯;但是改成 1 不會報錯。。。

reduce 併發數量如果 < 分組數,多的分組沒有資料

map 不會涉及到業務邏輯,,如果有 10 個map ,每個就處理 1/10 的資料,map 的併發量是可以任意去設定的。