MR例子(統計各個手機號在某段時間內產生的總流量)
目的:統計各個手機號在某段時間類產生的總流量
準備檔案 (已經上傳到hdfs上 檔名data.txt)
上圖中對應的欄位如下圖
檔案及程式碼分析
所給的檔案是每一個使用者每一次上網產生的流量,先如今需要將相同使用者進行聚合。
最後輸出的結果欄位:手機號 上行總流量 下行總流量 總流量
map的輸入輸出都是以key value 形式存在。輸入的鍵值對為K1為整數 value為字串 , 輸出的鍵值對K2為字串(手機號),輸出相當於上行總流量 ,下行總流量 ,總流量的list。所以我們用一個物件(DataBean)來儲存它們。
reduce的輸入就是map的輸出(經過shuffle處理,這裡不做詳細說明),reduce輸出的形式為key為手機號(字串),value為物件(DataBean)的結果就是我們最後想要的結果。
map進行的業務處理就是取出目標檔案中的四個欄位,然後進行拆分
reduce進行的業務處理,主要是對map的輸出中的DataBean裡面的流量進行求和,最後輸出,下面直接上程式碼。
DataBean
package cn.master1.hadoop.mr.dc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DataBean implements Writable{
private String telNo;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;
public DataBean() {}
public DataBean(String telNo, long upPayLoad, long downPayLoad) {
this.telNo = telNo;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = upPayLoad + downPayLoad;
}
@Override
public String toString() {
return this.upPayLoad + "/t" + this.downPayLoad + "/t" + this.totalPayLoad;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(telNo);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
public void readFields(DataInput in) throws IOException {
this.telNo = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
public String getTelNo() {
return telNo;
}
public void setTelNo(String telNo) {
this.telNo = telNo;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
package cn.master1.hadoop.mr.dc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DataCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
/*當k2 v2 和 k3 v3 型別一一對應時,此行和下面一行可以省略。*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataBean>.Context context)
throws IOException, InterruptedException {
//接收資料
String line = value.toString();
String[] fileds = line.split("/t");
String telNo = fileds[1];
long up = Long.parseLong(fileds[8]);
long down = Long.parseLong(fileds[9]);
DataBean bean = new DataBean(telNo, up, down);
context.write(new Text(telNo), bean);
}
}
public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
@Override
protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
throws IOException, InterruptedException {
long up_sum = 0;
long down_sum = 0;
for(DataBean bean : v2s){
up_sum += bean.getUpPayLoad();
down_sum += bean.getDownPayLoad();
}
DataBean bean = new DataBean("", up_sum, down_sum);
context.write(key, bean);
}
}
}
jar包
打成jar包(不指定入口程式)命名為examples.jar,放到虛擬機器跟目錄下,然後執行
hadoop jar /root/examples.jar cn.master1.hadoop.mr.dc.DataCount /data.txt /dataout
cn.master1.hadoop.mr.dc.DataCount指定執行的入口程式 /data.txt 目標檔案(存在hdfs上) /dataout輸出檔案(存放到hdfs上)
最後輸出結果如下
下面簡單說一說MR的執行流程和hadoop的序列化
MR執行流程
(1).客戶端提交一個mr的jar包給RM(resourceManage)(提交方式:hadoop jar ...)
(2).JobClient通過RPC和RM進行通訊,返回一個存放jar包的地址(HDFS)和jobId
(3).client將jar包寫入到HDFS當中(path = hdfs上的地址 + jobId)
(4).開始提交任務(任務的描述資訊,不是jar, 包括jobid,jar存放的位置,配置資訊等等)
(5).RM進行初始化任務
(6).讀取HDFS上的要處理的檔案,開始計算輸入分片,每一個分片對應一個NM(nodeManage)
(7).NM通過心跳機制領取任務(任務的描述資訊)
(8).下載所需的jar,配置檔案等。
(9).NM啟動一個java child子程序,用來執行具體的任務(MapperTask或ReducerTask)
(10).將結果寫入到HDFS當中。
hadoop序列化
- 序列化的概念
序列化(Serialization)是指把結構化物件轉化為位元組流。
反序列化(Deserialization)是序列化的逆過程。即把位元組流轉回結構化物件。
Java序列化(java.io.Serializable)
hadoop序列化並不是用的java自帶的序列化機制,java的序列化機制運用的比較廣泛,所以序列化和反序列化時儲存的東西過多,效率較低,而hadoop在序列化時,只需要儲存資料即可,因為只需要傳輸資料。hadoop具有特定的序列化機制。
- 序列化格式特點:
緊湊:高效使用儲存空間。
快速:讀寫資料的額外開銷小
可擴充套件:可透明地讀取老格式的資料
互操作:支援多語言的互動
hadoop的序列化格式Writable
更多詳細介紹