1. 程式人生 > >自定義類bean進行MapReduce引數傳輸

自定義類bean進行MapReduce引數傳輸

資料型別:

1363157984041 	13660577991(手機號)	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站點統計	24	9	6960(上行流量)	690(下行流量)	200

需求: 求每個手機號的上行,下行流量總和,和總流量,並以bean方式傳輸,並按照手機號開頭三位進行分割槽 統計類:

package mrpro927;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



/*
 *需求1:同一個手機號上行,下行,總流量之和
 * 
 */
public class phoneDataBeans {
	//
	public static class MyMapper extends Mapper<LongWritable, Text, Text, phoneBean>{
		Text t = new Text();
		phoneBean p = new phoneBean();
		@Override
		protected void map(LongWritable key, Text value, 
				Mapper<LongWritable, Text, Text, phoneBean>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			//清洗出無用資料
			if(split.length == 11){
				t.set(split[1]);
				p.setUpflow(Integer.parseInt(split[7]));
				p.setDownflow(Integer.parseInt(split[8]));
				p.setSum(p.getUpflow()+p.getDownflow());
				context.write(t, p);
			}
		}
	}
	
	//每組呼叫一次
	public static class MyReducer extends Reducer<Text, phoneBean, Text, phoneBean>{
		phoneBean p = new phoneBean();
		@Override
		protected void reduce(Text key, Iterable<phoneBean> values,
				Reducer<Text, phoneBean, Text, phoneBean>.Context context) 
						throws IOException, InterruptedException {
			int upsum = 0;
			int downsum = 0;
			int sum = 0;
			for(phoneBean t:values){
				upsum += t.getUpflow();
				downsum += t.getDownflow();
				sum +=t.getSum();
			}
			
			p.setUpflow(upsum);
			p.setDownflow(downsum);
			p.setSum(sum);
			context.write(key, p);
		}
	}
	
	
	public static class MyPartitioner extends Partitioner<Text, phoneBean>{

		@Override
		public int getPartition(Text key, phoneBean value, int numPartitions) {
			String substring = key.toString().substring(0, 3);
			if(substring.equals("136")){
				return 0;
			}else if(substring.equals("137")){
				return 1;
			}else if(substring.equals("138")){
				return 2;
			}else if(substring.equals("139")){
				return 3;
			}else {
				return 4;
			}
		}
		
	}
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		//載入配置檔案
		Configuration conf = new Configuration();
		//eclipse執行設定linux使用者名稱
		System.setProperty("HADOOP_USER_NAME", "mading");
		//啟動一個job
		Job job = Job.getInstance(conf);
		//指定當前任務的主類
		job.setJarByClass(phoneDataBeans.class);
		//指定mapper和reducer類
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		//指定map輸出的key,value型別,如果和reduce的輸出型別相同的情況下可以省略
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(phoneBean.class);
		//指定reduce輸出的key,value型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		//指定分割槽演算法
		job.setPartitionerClass(MyPartitioner.class);
		//設定reducetask的並行度
		job.setNumReduceTasks(5);
		//指定檔案輸入的路徑,這裡是HA高可用叢集的路徑
		FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/phonedatain"));
		//指定檔案的輸出路徑
		FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/pout01"));
		//提交job
		job.waitForCompletion(true);
	}

}

bean類,注意實現Writable介面

package mrpro927;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/*
 * 自定義的類作為MapReduce傳輸物件的時候,必須序列化,實現Writable 介面
 */
public class phoneBean implements Writable{
	private int upflow;
	private int downflow;
	private int sum;
	public int getUpflow() {
		return upflow;
	}
	public void setUpflow(int upflow) {
		this.upflow = upflow;
	}
	public int getDownflow() {
		return downflow;
	}
	public void setDownflow(int downflow) {
		this.downflow = downflow;
	}
	public int getSum() {
		return sum;
	}
	public void setSum(int sum) {
		this.sum = sum;
	}
	@Override
	public String toString() {
		return upflow + "\t" + downflow + "\t" + sum ;
	}
	//序列化的方法,物件=》二進位制
	//map發到reduce端的的時候先序列化
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(upflow);
		out.writeInt(downflow);
		out.writeInt(sum);
		
	}
	
	//反序列化的方法,到reduce端的時候進行反序列化,和序列化的順序一定要一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.upflow = in.readInt();
		this.downflow = in.readInt();
		this.sum = in.readInt();
	}

}