1. 程式人生 > 其它 >MapReduce高階 分割槽、排序,Combine

MapReduce高階 分割槽、排序,Combine

一、分割槽

1.1先分析一下具體的業務邏輯,確定大概有多少個分割槽
1.2首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類
1.3重寫public int getPartition這個方法,根據具體邏輯,讀資料庫或者配置返回相同的數字
1.4在main方法中設定Partioner的類,job.setPartitionerClass(DataPartitioner.class);
1.5設定Reducer的數量,job.setNumReduceTasks(6);

 

public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(DataCount.class);
		
		job.setMapperClass(DCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DataInfo.class);
		
		job.setReducerClass(DCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DataInfo.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setPartitionerClass(DCPartitioner.class);
		
		job.setNumReduceTasks(Integer.parseInt(args[2]));
		
		
		job.waitForCompletion(true);

	}
	//Map
	public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{
		
		private Text k = new Text();
		
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, DataInfo>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			String tel = fields[1];
			long up = Long.parseLong(fields[8]);
			long down = Long.parseLong(fields[9]);
			DataInfo dataInfo = new DataInfo(tel,up,down);
			k.set(tel);
			context.write(k, dataInfo);

		}
		
	}
	public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{
		
		@Override
		protected void reduce(Text key, Iterable<DataInfo> values,
				Reducer<Text, DataInfo, Text, DataInfo>.Context context)
				throws IOException, InterruptedException {
			long up_sum = 0;
			long down_sum = 0;
			for(DataInfo d : values){
				up_sum += d.getUpPayLoad();
				down_sum += d.getDownPayLoad();
			}
			DataInfo dataInfo = new DataInfo("",up_sum,down_sum);
			
			context.write(key, dataInfo);
		}
		
	}
	public static class DCPartitioner extends  Partitioner<Text, DataInfo>{
		
		private static Map<String,Integer> provider = new HashMap<String,Integer>();
		
		static{
			provider.put("138", 1);
			provider.put("139", 1);
			provider.put("152", 2);
			provider.put("153", 2);
			provider.put("182", 3);
			provider.put("183", 3);
		}
		@Override
		public int getPartition(Text key, DataInfo value, int numPartitions) {
			//向資料庫或配置資訊 讀寫
			String tel_sub = key.toString().substring(0,3);
			Integer count = provider.get(tel_sub);
			if(count == null){
				count = 0;
			}
			return count;
		}
		
	}

 

二、排序

排序MR預設是按key2進行排序的,如果想自定義排序規則,被排序的物件要實WritableComparable介面,在compareTo方法中實現排序規則,然後將這個物件當做k2,即可完成排序

 

public class InfoBean implements WritableComparable<InfoBean>{

	private String account;
	private double income;
	private double expenses;
	private double surplus;
	
	public void set(String account,double income,double expenses){
		this.account = account;
		this.income = income;
		this.expenses = expenses;
		this.surplus = income - expenses;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(account);
		out.writeDouble(income);
		out.writeDouble(expenses);
		out.writeDouble(surplus);
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.account = in.readUTF();
		this.income = in.readDouble();
		this.expenses = in.readDouble();
		this.surplus = in.readDouble();
	}

	@Override
	public int compareTo(InfoBean o) {
		if(this.income == o.getIncome()){
			return this.expenses > o.getExpenses() ? 1 : -1;
		}
		return this.income > o.getIncome() ? 1 : -1;
	}

	@Override
	public String toString() {
		return  income + "\t" +	expenses + "\t" + surplus;
	}
	public String getAccount() {
		return account;
	}

	public void setAccount(String account) {
		this.account = account;
	}

	public double getIncome() {
		return income;
	}

	public void setIncome(double income) {
		this.income = income;
	}

	public double getExpenses() {
		return expenses;
	}

	public void setExpenses(double expenses) {
		this.expenses = expenses;
	}

	public double getSurplus() {
		return surplus;
	}

	public void setSurplus(double surplus) {
		this.surplus = surplus;
	}

}

public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{

		private InfoBean k = new InfoBean();
		@Override
		protected void map(
				LongWritable key,
				Text value,
				Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			k.set(fields[0], Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
			
			context.write(k, NullWritable.get());
			
		}
		
	}
	public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{

		private Text k = new Text();
		@Override
		protected void reduce(InfoBean key, Iterable<NullWritable> values,
				Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context)
				throws IOException, InterruptedException {
			k.set(key.getAccount());
			
			context.write(k, key);
		}
		
	}

 

三、Combine

combiner的作用就是在map端對輸出先做一次合併,以減少傳輸到reducer的資料量。

  job.setCombinerClass(WCReducer.class);
      
  //提交任務
  job.waitForCompletion(true);