1. 程式人生 > >Hadoop序列化案例

Hadoop序列化案例

一、問題描述

       根據所給的資料輸出每一個手機號上網的上載流量、下載流量和總流量。

二、資料格式

    輸入資料(部分)格式

    1363157973098     15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜尋引擎    28    27    3659    3538    200
    1363157986029     15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站點統計    3    3    1938    180    200
    1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
    1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
    1363157984040     13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    綜合門戶    15    12    1938    2910    200

    輸出資料格式

    手機號   上載流量(總)  下載流量(總)  總流量

三、程式碼實現

       DataCount:

package edu.jianwei.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataCount {
	
	static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
        private Text k=new Text();
        private DataBean v=new DataBean();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line=value.toString();
			String[] words=line.split("\t");
		  
			String telNum=words[1];
			double upLoad=Double.parseDouble(words[8]);
			double downLoad=Double.parseDouble(words[9]);
			k.set(telNum);
			v.Set(telNum, upLoad, downLoad);
			context.write(k, v);
			
		}
		
		
	}
	
	static class DCReduce extends Reducer<Text,DataBean, Text, DataBean>{
		 private DataBean v=new DataBean();

		@Override
		protected void reduce(Text key, Iterable<DataBean> v2s,
				Context context)
				throws IOException, InterruptedException {
			double upTotal=0;
			double downToal=0;
			for (DataBean d : v2s) {
				upTotal+=d.getUpLoad();
				downToal+=d.getDownload();
			}
			v.Set("", upTotal, downToal);
			context.write(key, v);
			
		}
		
	}

	public static void main(String[] args) throws Exception {
		 Configuration conf=new Configuration();
		 Job job=Job.getInstance();
		 
		 job.setJarByClass(DataCount.class);
		 
		 job.setMapperClass(DCMapper.class);
		 job.setMapOutputKeyClass(Text.class);
		 job.setMapOutputValueClass(DataBean.class);
		 FileInputFormat.setInputPaths(job, new Path(args[0]));
		 
		 
		 job.setReducerClass(DCReduce.class);
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(DataBean.class);
		 FileOutputFormat.setOutputPath(job, new Path(args[1]));
		 
		 job.waitForCompletion(true);
	}

}

     DataBean:

package edu.jianwei.hadoop.mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DataBean implements Writable {
	private String telNum;
	private double upLoad;
	private double download;
	private double total;
	
	public void Set(String telnum,double upload,double download){
		this.telNum=telnum;
		this.upLoad=upload;
		this.download=download;
		this.total=upload+download;
	}

	public void write(DataOutput out) throws IOException {
	    out.writeUTF(telNum);
	    out.writeDouble(upLoad);
	    out.writeDouble(download);
	    out.writeDouble(total);
		
	}

	public void readFields(DataInput in) throws IOException {
		
		this.telNum=in.readUTF();
		this.upLoad=in.readDouble();
		this.download=in.readDouble();
		this.total=in.readDouble();
	}

	public String getTelNum() {
		return telNum;
	}

	public void setTelNum(String telNum) {
		this.telNum = telNum;
	}

	public double getUpLoad() {
		return upLoad;
	}

	public void setUpLoad(double upLoad) {
		this.upLoad = upLoad;
	}

	public double getDownload() {
		return download;
	}

	public void setDownload(double download) {
		this.download = download;
	}

	public double getTotal() {
		return total;
	}

	public void setTotal(double total) {
		this.total = total;
	}

	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return upLoad+"\t"+download+"\t"+total;
	}
   
	
}

四、程式碼執行

      1.執行程式碼

         hadoop jar /root/dc.jar  /dc  /dc/res

       2.程式碼執行結果(部分)

        13560436666     1116.0  954.0   2070.0
        13560439658     2034.0  5892.0  7926.0
        13602846565     1938.0  2910.0  4848.0
        13660577991     6960.0  690.0   7650.0
        13719199419     240.0   0.0     240.0
        13726230503     2481.0  24681.0 27162.0