MapReduce程式設計例項(轉)
阿新 • • 發佈:2020-07-28
備註(轉自)https://www.jianshu.com/p/1e4c976f5ecc
動手實戰學習MapReduce程式設計例項
前言
這裡放一個我學習MapReduce的程式設計例項專案吧,本來是想把這些分開寫成多篇文章的,能夠詳細敘述我學習過程中感想。但無奈,時間不夠,只好在Github上建立了該專案,在程式碼中由較為詳細的註釋,我想也足夠了吧。
josonle/MapReduce-Demo
該專案有些題目是參考了網上幾篇部落格,但程式碼實現是本人實現的。其次,所謂的MapReduce學習流程是參照老師上課所講的PPT上的流程【某985大資料課程PPT】,我想老師以這樣的流程授課肯定是有道理的。專案中也放了老師提供的幾個參考Demo檔案。
目錄(目錄不可用,見諒。專案中也付了這篇文件)
MapReduce程式設計例項
1.自定義物件序列化
需求分析
需要統計手機使用者流量日誌,日誌內容例項:
flowdata.log要把同一個使用者的上行流量、下行流量進行累加,並計算出綜合 。例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:13897230503,500,1600,2100
報錯:Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.17.10:9000/workspace/flowStatistics/output, expected: file:///
解決:1、將core-site.xml 和hdfs-site.xml拷貝到專案裡去就可以,原因是訪問遠端的HDFS 需要通過URI來獲得FileSystem
2、在專案中,Configuration物件設定fs.defaultFS 【推薦這個,**大小寫別拼錯,我就是拼錯了找了半天**】
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://"+namenode_ip+":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
解答
一、正常處理即可,不過在處理500 1400
這種時靈活變通一下即可
public static class FlowMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
Text phone = new Text(strs[0]);
Text flow = new Text(strs[1]+"\t"+strs[2]);
context.write(phone, flow);
}
}
public static class FlowReducer extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
int upFlow = 0;
int downFlow = 0;
for (Text value : values) {
String[] strs = value.toString().split("\t");
upFlow += Integer.parseInt(strs[0].toString());
downFlow += Integer.parseInt(strs[1].toString());
}
int sumFlow = upFlow+downFlow;
context.write(key,new Text(upFlow+"\t"+downFlow+"\t"+sumFlow));
}
}
二、自定義一個實現Writable介面的可序列化的物件Flow,包含資料形式如 upFlow downFlow sumFlow
public static class FlowWritableMapper extends Mapper<Object, Text, Text, FlowWritable> {
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
Text phone = new Text(strs[0]);
FlowWritable flow = new FlowWritable(Integer.parseInt(strs[1]),Integer.parseInt(strs[2]));
context.write(phone, flow);
}
}
public static class FlowWritableReducer extends Reducer<Text, FlowWritable, Text, FlowWritable>{
public void reduce(Text key,Iterable<FlowWritable> values,Context context) throws IOException, InterruptedException {
int upFlow = 0;
int downFlow = 0;
for (FlowWritable value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
}
context.write(key,new FlowWritable(upFlow,downFlow));
}
}
public static class FlowWritable implements Writable{
private int upFlow,downFlow,sumFlow;
public FlowWritable(int upFlow,int downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getSumFlow()