自定義實現mapreduce計算的value型別
阿新 • • 發佈:2019-02-19
1. 在進行mapreduce程式設計時其Hadoop內建的資料型別不能滿足需求時,或針對用例優化自定義 資料型別可能執行的更好.
因此可以通過實現org.apache.hadoop.io.Writable介面定義自定義的Writable型別,使其作為mapreduce計算的value型別。
2. 通過檢視原始碼中org.apache.hadoop.io.Writable介面明確具體實現的例項。
3. 自實現自定義的Writable型別是也要注意以下幾點:public class MyWritable implements Writable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }
3.1 如果要新增一個自定義的建構函式用於自定義的Writable類一定要保持預設的空建構函式。
3.2 如果使用TextOutputFormat序列化自定義Writable型別的例項。要確保用於自定義的Writable資料型別有一個有意義的toString()實現。
3.3 在讀取輸入資料時,Hadoop課重複使用Writable類的一個例項。在readFileds()方法裡面填充欄位時,不應該依賴與該物件的現 有狀態。
4. 下面通過一個具體的《自定義型別處理手機上網日誌》例項來感受一下自定義的Writable型別。
4.1 資料檔名為:HTTP_20130313143750.dat(可從網上下載)。
4.2 資料樣本:1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
4.3 資料結構型別:
4.4 我們主要提取的是手機號、上行資料包、下行資料包、上行總流量、下行總流量。 (無論是傳送請求還是返回請求都會產生資料包和流量)
5.Mapreduce程式的具體實現。
5.1自定義資料處理型別。
5.2 Mapper函式。public class DataWritable implements Writable { // upload private int upPackNum; private int upPayLoad; // downLoad private int downPackNum; private int downPayLoad; public DataWritable() { } public void set(int upPackNum, int upPayLoad, int downPackNum, int downPayLoad) { this.upPackNum = upPackNum; this.upPayLoad = upPayLoad; this.downPackNum = downPackNum; this.downPayLoad = downPayLoad; } public int getUpPackNum() { return upPackNum; } public int getUpPayLoad() { return upPayLoad; } public int getDownPackNum() { return downPackNum; } public int getDownPayLoad() { return downPayLoad; } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readInt(); this.upPayLoad = in.readInt(); this.downPackNum = in.readInt(); this.downPayLoad = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(upPackNum); out.writeInt(upPayLoad); out.writeInt(downPackNum); out.writeInt(downPayLoad); } @Override public String toString() { return upPackNum + "\t" + upPayLoad // + "\t" + downPackNum + // "\t" + downPayLoad; } }
static class DataTotalMapper extends
Mapper<LongWritable, Text, Text, DataWritable> {
private Text mapOutputKey = new Text();
private DataWritable dataWritable = new DataWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
// split
String[] strs = lineValue.split("\t");
// get data
String phoneNum = strs[1];
int upPackNum = Integer.valueOf(strs[6]);
int downPackNum = Integer.valueOf(strs[7]);
int upPayLoad = Integer.valueOf(strs[8]);
int downPayLoad = Integer.valueOf(strs[9]);
// set map output key / value
if (phoneNum.length() == 11)//確保處理的都是手機資料
mapOutputKey.set(phoneNum);
dataWritable.set(upPackNum, upPayLoad, downPackNum, downPayLoad);
context.write(mapOutputKey, dataWritable);
}
}
5.3 Reduce函式。static class DataTotalReducer extends
Reducer<Text, DataWritable, Text, DataWritable> {
private DataWritable dataWritable = new DataWritable();
public void reduce(Text key, Iterable<DataWritable> values,
Context context) throws IOException, InterruptedException {
int upPackNum = 0;
int downPackNum = 0;
int upPayLoad = 0;
int downPayLoad = 0;
for (DataWritable data : values) {
upPackNum += data.getUpPackNum();
downPackNum += data.getDownPackNum();
upPayLoad += data.getUpPayLoad();
downPayLoad += data.getDownPayLoad();
}
dataWritable.set(upPackNum, upPayLoad, downPackNum, downPayLoad);
context.write(key, dataWritable);
}
}
5.4 主函式public class DataTotalPhone {
static final String INPUT_PATH = "hdfs://192.168.56.171:9000/DataPhone/HTTP_20130313143750.dat";
static final String OUT_PATH = "hdfs://192.168.56.171:9000/DataPhone/out";
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
// create job
Job job = new Job(conf, DataTotalPhone.class.getSimpleName());
// set job
job.setJarByClass(DataTotalMapper.class);
// 1)input
Path inputDir = new Path(args[0]);
FileInputFormat.addInputPath(job, inputDir);
// 2)map
job.setMapperClass(DataTotalMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataWritable.class);
// 3)reduce
job.setReducerClass(DataTotalReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataWritable.class);
// 4)output
Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
}
6. 程式執行後結果。