1. 程式人生 > >自定義實現mapreduce計算的value型別

自定義實現mapreduce計算的value型別

1. 在進行mapreduce程式設計時其Hadoop內建的資料型別不能滿足需求時,或針對用例優化自定義 資料型別可能執行的更好.

    因此可以通過實現org.apache.hadoop.io.Writable介面定義自定義的Writable型別,使其作為mapreduce計算的value型別。

2. 通過檢視原始碼中org.apache.hadoop.io.Writable介面明確具體實現的例項。

public class MyWritable implements Writable {
        // Some data     
        private int counter;
        private long timestamp;
        
        public void write(DataOutput out) throws IOException {
          out.writeInt(counter);
          out.writeLong(timestamp);
        }
        
        public void readFields(DataInput in) throws IOException {
          counter = in.readInt();
          timestamp = in.readLong();
        }
        
        public static MyWritable read(DataInput in) throws IOException {
          MyWritable w = new MyWritable();
          w.readFields(in);
          return w;
        }
}
3. 自實現自定義的Writable型別是也要注意以下幾點:
    3.1 如果要新增一個自定義的建構函式用於自定義的Writable類一定要保持預設的空建構函式。
    3.2 如果使用TextOutputFormat序列化自定義Writable型別的例項。要確保用於自定義的Writable資料型別有一個有意義的toString()實現。
    3.3 在讀取輸入資料時,Hadoop課重複使用Writable類的一個例項。在readFileds()方法裡面填充欄位時,不應該依賴與該物件的現 有狀態。
4. 下面通過一個具體的《自定義型別處理手機上網日誌》例項來感受一下自定義的Writable型別。
   4.1 資料檔名為:HTTP_20130313143750.dat(可從網上下載)。

   4.2 資料樣本:1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200

   4.3 資料結構型別:

       

    4.4 我們主要提取的是手機號、上行資料包、下行資料包、上行總流量、下行總流量。 (無論是傳送請求還是返回請求都會產生資料包和流量)

5.Mapreduce程式的具體實現。

   5.1自定義資料處理型別。

public class DataWritable implements Writable {
	// upload
	private int upPackNum;
	private int upPayLoad;

	// downLoad
	private int downPackNum;
	private int downPayLoad;

	public DataWritable() {
	}

	public void set(int upPackNum, int upPayLoad, int downPackNum,
			int downPayLoad) {
		this.upPackNum = upPackNum;
		this.upPayLoad = upPayLoad;
		this.downPackNum = downPackNum;
		this.downPayLoad = downPayLoad;
	}

	public int getUpPackNum() {
		return upPackNum;
	}

	public int getUpPayLoad() {
		return upPayLoad;
	}

	public int getDownPackNum() {
		return downPackNum;
	}

	public int getDownPayLoad() {
		return downPayLoad;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readInt();
		this.upPayLoad = in.readInt();
		this.downPackNum = in.readInt();
		this.downPayLoad = in.readInt();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(upPackNum);
		out.writeInt(upPayLoad);
		out.writeInt(downPackNum);
		out.writeInt(downPayLoad);
	}

	@Override
	public String toString() {
		return upPackNum + "\t" + upPayLoad //
				+ "\t" + downPackNum + //
				"\t" + downPayLoad;
	}
}
    5.2 Mapper函式。 
static class DataTotalMapper extends
			Mapper<LongWritable, Text, Text, DataWritable> {
		private Text mapOutputKey = new Text();
		private DataWritable dataWritable = new DataWritable();

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String lineValue = value.toString();
			// split
			String[] strs = lineValue.split("\t");
			// get data
			String phoneNum = strs[1];
			int upPackNum = Integer.valueOf(strs[6]);
			int downPackNum = Integer.valueOf(strs[7]);
			int upPayLoad = Integer.valueOf(strs[8]);
			int downPayLoad = Integer.valueOf(strs[9]);
			// set map output key / value
			if (phoneNum.length() == 11)//確保處理的都是手機資料
				mapOutputKey.set(phoneNum);
			dataWritable.set(upPackNum, upPayLoad, downPackNum, downPayLoad);
			context.write(mapOutputKey, dataWritable);
		}
	}
    5.3 Reduce函式。
static class DataTotalReducer extends
			Reducer<Text, DataWritable, Text, DataWritable> {
		private DataWritable dataWritable = new DataWritable();

		public void reduce(Text key, Iterable<DataWritable> values,
				Context context) throws IOException, InterruptedException {
			int upPackNum = 0;
			int downPackNum = 0;
			int upPayLoad = 0;
			int downPayLoad = 0;
			for (DataWritable data : values) {
				upPackNum += data.getUpPackNum();
				downPackNum += data.getDownPackNum();
				upPayLoad += data.getUpPayLoad();
				downPayLoad += data.getDownPayLoad();
			}
			dataWritable.set(upPackNum, upPayLoad, downPackNum, downPayLoad);
			context.write(key, dataWritable);
		}
	}
     5.4 主函式
public class DataTotalPhone {
	static final String INPUT_PATH = "hdfs://192.168.56.171:9000/DataPhone/HTTP_20130313143750.dat";
	static final String OUT_PATH = "hdfs://192.168.56.171:9000/DataPhone/out";

	public static void main(String[] args) throws ClassNotFoundException,
			IOException, InterruptedException {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}
		// create job
		Job job = new Job(conf, DataTotalPhone.class.getSimpleName());
		// set job
		job.setJarByClass(DataTotalMapper.class);
		// 1)input
		Path inputDir = new Path(args[0]);
		FileInputFormat.addInputPath(job, inputDir);
		// 2)map
		job.setMapperClass(DataTotalMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DataWritable.class);
		// 3)reduce
		job.setReducerClass(DataTotalReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DataWritable.class);
		// 4)output
		Path outputDir = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outputDir);
		boolean isSuccess = job.waitForCompletion(true);
		return isSuccess ? 0 : 1;
	}
}
6. 程式執行後結果。