Web日誌流處理的MapReduce程式 -- 兩個(一個使用Collections排序 一個使用MapReduce本身的排序)
阿新 • • 發佈:2018-12-11
我的這兩個專案程式碼地址:
Collections排序:
https://gitee.com/tanghongping/web_click_mr_hve
MapReduce排序:
https://gitee.com/tanghongping/MapReduceTest
這兩個專案裡面會有一些車市的程式碼,可以忽略。
使用Collections.sort排序
WeblogBean
package com.thp.bigdata.webClick.mrBean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 對接外部資料的層,表結構定義最好跟外部資料來源保持一致 * @author 湯小萌 * */ public class WeblogBean implements Writable { private boolean valid = true; // 判斷資料是否合法 private String remote_addr; // 記錄客戶端的ip地址 private String remote_user; // 記錄客戶端使用者名稱稱 忽略屬性"-" private String time_local; // 記錄訪問時間與時區 private String request; // 記錄請求的url與http協議 private String status; // 記錄請求狀態;成功是200 private String body_bytes_sent; // 記錄發給客戶單主體檔案的大小 private String http_referer; // 記錄使用者是從哪個連結過來的 private String http_user_agent; // 記錄客戶端瀏覽器的相關資訊 public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) { this.valid = valid; this.remote_addr = remote_addr; this.remote_user = remote_user; this.time_local = time_local; this.request = request; this.status = status; this.body_bytes_sent = body_bytes_sent; this.http_referer = http_referer; this.http_user_agent = http_user_agent; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return http_referer; } public void setHttp_referer(String http_referer) { this.http_referer = http_referer; } public String getHttp_user_agent() { return http_user_agent; } public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; } @Override public void write(DataOutput out) throws IOException { out.writeBoolean(this.valid); out.writeUTF(null==remote_addr?"":remote_addr); out.writeUTF(null==remote_user?"":remote_user); out.writeUTF(null==time_local?"":time_local); out.writeUTF(null==request?"":request); out.writeUTF(null==status?"":status); out.writeUTF(null==body_bytes_sent?"":body_bytes_sent); out.writeUTF(null==http_referer?"":http_referer); out.writeUTF(null==http_user_agent?"":http_user_agent); } @Override public void readFields(DataInput in) throws IOException { this.valid = in.readBoolean(); this.remote_addr = in.readUTF(); this.remote_user = in.readUTF(); this.time_local = in.readUTF(); this.request = in.readUTF(); this.status = in.readUTF(); this.body_bytes_sent = in.readUTF(); this.http_referer = in.readUTF(); this.http_user_agent = in.readUTF(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.getRemote_addr()); sb.append("\001").append(this.getRemote_user()); sb.append("\001").append(this.getTime_local()); sb.append("\001").append(this.getRequest()); sb.append("\001").append(this.getStatus()); sb.append("\001").append(this.getBody_bytes_sent()); sb.append("\001").append(this.getHttp_referer()); sb.append("\001").append(this.getHttp_user_agent()); return sb.toString(); } }
PageViewsBean
package com.thp.bigdata.webClick.mrBean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * * @author 湯小萌 * */ public class PageViewsBean implements Writable { private String session; // sessionId private String remote_addr; // 客戶端ip地址 private String timeStr; // 訪問的時間 private String request; // 請求的url private int step; // 訪問的第幾步 private String staylong; // 停留的時間 private String referal; // 是從哪個頁面過來的 private String useragent; // 記錄跟瀏覽器相關資訊 private String bytes_send; // 傳送的資料位元組大小 private String status; // 本次請求的狀態 public void set(String session, String remote_addr, String useragent, String timeStr, String request, int step, String staylong, String referal, String bytes_send, String status) { this.session = session; this.remote_addr = remote_addr; this.useragent = useragent; this.timeStr = timeStr; this.request = request; this.step = step; this.staylong = staylong; this.referal = referal; this.bytes_send = bytes_send; this.status = status; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getTimeStr() { return timeStr; } public void setTimeStr(String timeStr) { this.timeStr = timeStr; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public int getStep() { return step; } public void setStep(int step) { this.step = step; } public String getStaylong() { return staylong; } public void setStaylong(String staylong) { this.staylong = staylong; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public String getUseragent() { return useragent; } public void setUseragent(String useragent) { this.useragent = useragent; } public String getBytes_send() { return bytes_send; } public void setBytes_send(String bytes_send) { this.bytes_send = bytes_send; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(session); out.writeUTF(remote_addr); out.writeUTF(timeStr); out.writeUTF(request); out.writeInt(step); out.writeUTF(staylong); out.writeUTF(referal); out.writeUTF(useragent); out.writeUTF(bytes_send); out.writeUTF(status); } @Override public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.timeStr = in.readUTF(); this.request = in.readUTF(); this.step = in.readInt(); this.staylong = in.readUTF(); this.referal = in.readUTF(); this.useragent = in.readUTF(); this.bytes_send = in.readUTF(); this.status = in.readUTF(); } }
VisitBean
package com.thp.bigdata.webClick.mrBean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * * @author 湯小萌 * */ public class VisitBean implements Writable { private String session; private String remote_addr; private String inTime; private String outTime; private String inPage; private String outPage; private String referal; private int pageVisits; public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) { this.session = session; this.remote_addr = remote_addr; this.inTime = inTime; this.outTime = outTime; this.inPage = inPage; this.outPage = outPage; this.referal = referal; this.pageVisits = pageVisits; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getInTime() { return inTime; } public void setInTime(String inTime) { this.inTime = inTime; } public String getOutTime() { return outTime; } public void setOutTime(String outTime) { this.outTime = outTime; } public String getInPage() { return inPage; } public void setInPage(String inPage) { this.inPage = inPage; } public String getOutPage() { return outPage; } public void setOutPage(String outPage) { this.outPage = outPage; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public int getPageVisits() { return pageVisits; } public void setPageVisits(int pageVisits) { this.pageVisits = pageVisits; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(session); out.writeUTF(remote_addr); out.writeUTF(inTime); out.writeUTF(outTime); out.writeUTF(inPage); out.writeUTF(outPage); out.writeUTF(referal); out.writeInt(pageVisits); } @Override public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.inTime = in.readUTF(); this.outTime = in.readUTF(); this.inPage = in.readUTF(); this.outPage = in.readUTF(); this.referal = in.readUTF(); this.pageVisits = in.readInt(); } @Override public String toString() { return session + "\001" + remote_addr + "\001" + inTime + "\001" + outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits; } }
預處理解析類
package com.thp.bigdata.webClick.mrBean;
import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
/**
* 對載入進來的資料進行
* @author 湯小萌
*
*/
public class WeblogParser {
/**
* 0 ) 194.237.142.21
1 ) -
2 ) -
3 ) [18/Sep/2013:06:49:18
4 ) +0000]
5 ) "GET
6 ) /wp-content/uploads/2013/07/rstudio-git3.png
7 ) HTTP/1.1"
8 ) 304
9 ) 0
10 ) "-"
11 ) "Mozilla/4.0
12 ) (compatible;)"
* @param line
* @return
*/
public static WeblogBean parser(String line) {
WeblogBean weblogBean = new WeblogBean();
String[] arr = line.split(" ");
if(arr.length >11) {
weblogBean.setRemote_addr(arr[0]);
weblogBean.setRemote_user(arr[1]);
String time_local = formatDate(arr[3].substring(1));
if(null == time_local) time_local = "-invalid_time-";
weblogBean.setTime_local(time_local);
weblogBean.setRequest(arr[6]);
weblogBean.setStatus(arr[8]);
weblogBean.setBody_bytes_sent(arr[9]);
weblogBean.setHttp_referer(arr[10]);
// 如果useragent元素較多,則拼接useragent
if(arr.length > 12) {
StringBuffer sb = new StringBuffer();
for(int i = 11; i < arr.length; i++) {
sb.append(arr[i]);
}
weblogBean.setHttp_user_agent(sb.toString());
} else {
weblogBean.setHttp_user_agent(arr[11]);
}
if(Integer.parseInt(weblogBean.getStatus()) >= 400) { // 狀態碼 >=400 說明請求錯誤
weblogBean.setValid(false);
}
if("-invalid_time-".equals(weblogBean.getTime_local())) {
weblogBean.setValid(false);
}
} else {
weblogBean.setValid(false);
}
return weblogBean;
}
/**
* 過來靜態資源
*/
public static void filterStaticResource(WeblogBean bean, Set<String> pages) {
if(!pages.contains(bean.getRequest())) {
bean.setValid(false); // 在這些定義的url資源以外的資源都是作為靜態資源處理
}
}
public static SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);
public static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
/**
* 時間轉換
* @param time_local
* @return
*/
public static String formatDate(String time_local) {
try {
return sdf2.format(sdf1.parse(time_local));
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
@Test
public void testSpilt() {
String str = "194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] \"GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/4.0 (compatible;)\"";
String[] arr = str.split(" ");
int i = 1;
for(String s : arr) {
System.out.println(i + " ) " + s);
i++;
}
}
@Test
public void testProp() throws IOException {
}
public static void main(String[] args) throws IOException {
Properties pop = new Properties();
InputStream is = WeblogParser.class.getClassLoader().getResourceAsStream("com/thp/bigdata/webClick/mrBean/url_1.propeties");
pop.load(is);
String str = (String) pop.get("url");
System.out.println(str);
}
}
MapReduce 程式
1 . 日誌的預處理:
package com.thp.bigdata.webClick.mr.pre;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.thp.bigdata.webClick.mrBean.WeblogBean;
import com.thp.bigdata.webClick.mrBean.WeblogParser;
/**
* 處理原始的日誌,過濾出真實的PV情況
* 1)轉換時間格式
* 2)對缺失的欄位填充預設值
* 3)對記錄標記valid和invalid
* @author 湯小萌
*
*/
public class WeblogPreProcess {
static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Set<String> pages = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 從外部載入網站url分類
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
/*pages.add("/about/");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");*/
Properties pop = new Properties();
InputStream in = WeblogPreProcessMapper.class.getClassLoader().getResourceAsStream("url.propeties");
pop.load(in);
String urlStr = pop.getProperty("url");
String[] urls = urlStr.split(",");
for(String url : urls) {
pages.add(url);
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
WeblogBean weblogBean = WeblogParser.parser(line);
// 可插拔的方法 : 過濾 js/圖片/css等靜態資源
WeblogParser.filterStaticResource(weblogBean, pages);
if(weblogBean.isValid()) { // 無效的資料都被過濾出去了
k.set(weblogBean.toString());
context.write(k, v);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("f:/weblog/input"));
FileOutputFormat.setOutputPath(job, new Path("f:/weblog/output"));
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
2.分析出點選流:
package com.thp.bigdata.webClick.mr;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.thp.bigdata.webClick.mrBean.WeblogBean;
/**
* 將清洗之後的日誌梳理出點選流pageViews模型資料
* 輸入的資料是經過清洗之後的資料
*
* 區分每一次會話,給每一次visit(session)增加了session-id(隨機uuid)
* 梳理出每一次會話中所訪問的每個頁面(請求時間,url,停留時長,以及該頁面在這次session中的序號)
* 保留referral_url,body_bytes_send,useragent
* @author 湯小萌
*
*/
public class ClickStream {
static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WeblogBean> {
Text k = new Text();
WeblogBean v = new WeblogBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
System.out.println(line);
String[] fields = line.split("\001");
if(fields.length < 9) return;
v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5],
fields[6], fields[7], fields[8]);
// 只有有效的記錄才會進入後續處理
if(v.isValid()) {
k.set(v.getRemote_addr());
context.write(k, v);
}
}
}
static class ClickStreamReducer extends Reducer<Text, WeblogBean, NullWritable, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<WeblogBean> values, Context context) throws IOException, InterruptedException {
ArrayList<WeblogBean> beans = new ArrayList<WeblogBean>();
// 先將每一個使用者都拿出來按照時間進行排序
for(WeblogBean bean : values) {
WeblogBean weblogBean = new WeblogBean();
try {
BeanUtils.copyProperties(weblogBean, bean);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
beans.add(weblogBean);
}
// 將Bean按照時間進行排序
Collections.sort(beans, new Comparator<WeblogBean>() {
@Override
public int compare(WeblogBean o1, WeblogBean o2) {
try {
Date d1 = toDate(o1.getTime_local());
Date d2 = toDate(o2.getTime_local());
if(d1 == null || d2 == null) return 0;
return d1.compareTo(d2);
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
});
int step = 1;
String session = UUID.randomUUID().toString();
for(int i = 0; i < beans.size(); i++) {
WeblogBean bean = beans.get(i);
// 如果僅有一條資料,則輸出
if(1 == beans.size()) {
// 設定預設停留時間為60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" +
bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +
"\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +
bean.getBody_bytes_sent() + "\001"
+ bean.getStatus());
context.write(NullWritable.get(), v);
// 重新生成session
session = UUID.randomUUID().toString();
break;
}
if(i == 0) { // 不止一條資料,那麼第一條要直接跳過,因為 bean.get(i-1)
continue;
}
try {
long timeDiff = timeDiff(bean.getTime_local(), beans.get(i - 1).getTime_local());
if(timeDiff < 30*60*1000) {
// 如果 本次 - 上次 時間差 < 30 min ,則輸出前一次的頁面訪問資訊
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() +
"\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() +
"\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001"
+ beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果 本次 - 上次 時間差 > 30 min, 則輸出前一次的頁面訪問資訊,將step重置為1,以分隔為為新的visit
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" +
beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" +
(step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" +
beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
// 輸出完上一條之後,重置step編號
step = 1;
// session 也要重新生成
session = UUID.randomUUID().toString();
}
} catch (ParseException e) {
e.printStackTrace();
}
// 如果此次遍歷時最後一條資料,則將本條資料輸出 session 在上面的邏輯都控制了
if(i == beans.size() - 1) {
// 設定預設停留時間為60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
context.write(NullWritable.get(), v);
}
}
}
private String toStr(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
return sdf.format(date);
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
return sdf.parse(timeStr);
}
// 算時間差
private long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStream.class);
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WeblogBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("f:/weblog_1/output"));
FileOutputFormat.setOutputPath(job, new Path("f:/weblog_1/pageviews"));
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("f:/weblog_1/pageviews"))) {
fs.delete(new Path("f:/weblog_1/pageviews"), true);
}
job.waitForCompletion(true);
}
}
3.進一步梳理出visit模型:
package com.thp.bigdata.webClick.mr;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.thp.bigdata.webClick.mrBean.PageViewsBean;
import com.thp.bigdata.webClick.mrBean.VisitBean;
/**
* 從PageViews模型資料結果中進一步梳理出visit模型
*
* 經過這裡之後出去的資料格式:
* sessionid satrt-time out-time satrt-page out-page pagecounts ...
*
* @author 湯小萌
*
*/
public class ClickStreamVisit {
static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {
PageViewsBean pvBean = new PageViewsBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\001");
int step = Integer.parseInt(fields[5]); // 訪問的步數
pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
k.set(pvBean.getSession());
context.write(k, pvBean);
}
}
static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
@Override
protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context)
throws IOException, InterruptedException {
// 將pvBean按照step排序
ArrayList<PageViewsBean> pvBeanList = new ArrayList<PageViewsBean>();
for(PageViewsBean pvBean : pvBeans) {
PageViewsBean bean = new PageViewsBean();
try {
BeanUtils.copyProperties(bean, pvBean);
pvBeanList.add(bean);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
Collections.sort(pvBeanList, new Comparator<PageViewsBean>() {
@Override
public int compare(PageViewsBean o1, PageViewsBean o2) {
return o1.getStep() > o2.getStep() ? 1 : -1;
}
});
// 取這次visit的首尾pageViews記錄,放入VisitBean中
VisitBean visitBean = new VisitBean();
// 取visit 的首記錄
visitBean.setInPage(pvBeanList.get(0).getRequest());
visitBean.setInTime(pvBeanList.get(0).getTimeStr());
// 取visit 的尾記錄
visitBean.setOutPage(pvBeanList.get(pvBeanList.size() - 1).getRequest());
visitBean.setOutTime(pvBeanList.get(pvBeanList.size() - 1).getTimeStr());
// visit訪問的頁面數
visitBean.setPageVisits(pvBeanList.size());
// 來訪者的ip
visitBean.setRemote_addr(pvBeanList.get(0).getRemote_addr());
// 本次visit的referal
visitBean.setReferal(pvBeanList.get(0).getReferal());
visitBean.setSession(session.toString());
context.write(NullWritable.get(), visitBean);
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamVisit.class);
job.setMapperClass(ClickStreamVisitMapper.class);
job.setReducerClass(ClickStreamVisitReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageViewsBean.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(VisitBean.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("f:/weblog_1/pageviews"));
FileOutputFormat.setOutputPath(job, new Path("f:/weblog_1/visitout"));
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("f:/weblog_1/visitout"))) {
fs.delete(new Path("f:/weblog_1/visitout"), true);
}
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
/**
*
* 2018年11月29日 上午9:00:57
* @param a
*/
public void a1(int a) {
new StringBuffer();
}
}
使用MapReduce自身的排序
WeblogBean
package mr.flow.weblog.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* 對接外部資料的層,表結構定義最好跟外部資料來源保持一致
* @author 湯小萌
*
*/
public class WeblogBean implements WritableComparable<WeblogBean> {
private boolean valid = true; // 判斷資料是否合法
private String remote_addr; // 記錄客戶端的ip地址
private String remote_user; // 記錄客戶端使用者名稱稱 忽略屬性"-"
private String time_local; // 記錄訪問時間與時區
private String request; // 記錄請求的url與http協議
private String status; // 記錄請求狀態;成功是200
private String body_bytes_sent; // 記錄發給客戶單主體檔案的大小
private String http_referer; // 記錄使用者是從哪個連結過來的
private String http_user_agent; // 記錄客戶端瀏覽器的相關資訊
public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request,
String status, String body_bytes_sent, String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.valid);
out.writeUTF(null==remote_addr?"":remote_addr);
out.writeUTF(null==remote_user?"":remote_user);
out.writeUTF(null==time_local?"":time_local);
out.writeUTF(null==request?"":request);
out.writeUTF(null==status?"":status);
out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
out.writeUTF(null==http_referer?"":http_referer);
out.writeUTF(null==http_user_agent?"":http_user_agent);
}
@Override
public void readFields(DataInput in) throws IOException {
this.valid = in.readBoolean();
this.remote_addr = in.readUTF();
this.remote_user = in.readUTF();
this.time_local = in.readUTF();
this.request = in.readUTF();
this.status = in.readUTF();
this.body_bytes_sent = in.readUTF();
this.http_referer = in.readUTF();
this.http_user_agent = in.readUTF();
}
@Override
public String toString() {
// System.out.println("=========================");
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.getRemote_addr());
sb.append("\001").append(this.getRemote_user());
sb.append("\001").append(this.getTime_local());
sb.append("\001").append(this.getRequest());
sb.append("\001").append(this.getStatus());
sb.append("\001").append(this.getBody_bytes_sent());
sb.append("\001").append(this.getHttp_referer());
sb.append("\001").append(this.getHttp_user_agent());
return sb.toString();
}
/**
* 跟時間先後順序排序
*/
@Override
public int compareTo(WeblogBean o) {
/*System.out.println("++++++++++++++++++++++++++++++++++++");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
try {
Date d1 = sdf.parse(this.getTime_local());
Date d2 = sdf.parse(o.getTime_local());
if(d1 == null || d2 == null) return 0;
System.out.println(d1.compareTo(d2));
return d1.compareTo(d2);
} catch (ParseException e) {
e.printStackTrace();
}*/
// 先比較ip地址 -- 【注意:】 這個ip必須要先繼續一次比較 兩個相同之後,才可以進行日期的比較 如果沒有比較ip就只比較日期那麼是會出問題的
int ipCompareResult = this.getRemote_addr().compareTo(o.getRemote_addr());
if(ipCompareResult == 0) { // ip地址相同,則繼續比較同一個ip下的訪問的時間
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
try {
Date d1 = sdf.parse(this.getTime_local());
Date d2 = sdf.parse(o.getTime_local());
if(d1 == null || d2 == null) return 0;
// System.out.println(d1.compareTo(d2));
return d1.compareTo(d2);
} catch (ParseException e) {
e.printStackTrace();
}
} else {
return ipCompareResult;
}
return 0;
}
}
PageViewsBean
package mr.flow.weblog.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.hadoop.io.WritableComparable;
import org.junit.Test;
/**
* sessionId
* @author 湯小萌
*
*/
public class PageViewsBean implements WritableComparable<PageViewsBean> {
private String session; // sessionId
private String remote_addr; // 客戶端ip地址
private String timeStr; // 訪問的時間
private String request; // 請求的url
private int step; // 訪問的第幾步
private String staylong; // 停留的時間
private String referal; // 是從哪個頁面過來的
private String useragent; // 記錄跟瀏覽器相關資訊
private String bytes_send; // 傳送的資料位元組大小
private String status; // 本次請求的狀態
public void set(String session, String remote_addr, String useragent, String timeStr, String request, int step, String staylong, String referal, String bytes_send, String status) {
this.session = session;
this.remote_addr = remote_addr;
this.useragent = useragent;
this.timeStr = timeStr;
this.request = request;
this.step = step;
this.staylong = staylong;
this.referal = referal;
this.bytes_send = bytes_send;
this.status = status;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(timeStr);
out.writeUTF(request);
out.writeInt(step);
out.writeUTF(staylong);
out.writeUTF(referal);
out.writeUTF(useragent);
out.writeUTF(bytes_send);
out.writeUTF(status);
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.timeStr = in.readUTF();
this.request = in.readUTF();
this.step = in.readInt();
this.staylong = in.readUTF();
this.referal = in.readUTF();
this.useragent = in.readUTF();
this.bytes_send = in.readUTF();
this.status = in.readUTF();
}
@Override
public int compareTo(PageViewsBean o) {
// 【注意:】這個session也要先進行比較,只有先進行了session的比較後面的step的比較菜有意義
int sessionCompareResult = this.session.compareTo(o.getSession());
if(sessionCompareResult == 0) { // 相同的session的話就繼續比較 step
return this.step - o.getStep() > 0 ? 1 : -1; // 這種方式 是正序 從小島大排序
}
return sessionCompareResult;
// return 0;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getTimeStr() {
return timeStr;
}
public void setTimeStr(String timeStr) {
this.timeStr = timeStr;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public int getStep() {
return step;
}
public void setStep(int step) {
this.step = step;
}
public String getStaylong() {
return staylong;
}
public void setStaylong(String staylong) {
this.staylong = staylong;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public String getUseragent() {
return useragent;
}
public void setUseragent(String useragent) {
this.useragent = useragent;
}
public String getBytes_send() {
return bytes_send;
}
public void setBytes_send(String bytes_send) {
this.bytes_send = bytes_send;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public String toString() {
return this.session + " " + this.step + "";
}
@Test
public void testCompareTo() {
PageViewsBean pvb1 = new PageViewsBean();
pvb1.set(null, null, null, null, null, 2, null, null, null, null);
PageViewsBean pvb2 = new PageViewsBean();
pvb2.set(null, null, null, null, null, 1, null, null, null, null);
PageViewsBean pvb3 = new PageViewsBean();
pvb3.set(null, null, null, null, null, 4, null, null, null, null);
PageViewsBean pvb4 = new PageViewsBean();
pvb4.set(null, null, null, null, null, 3, null, null, null, null);
ArrayList<PageViewsBean> list = new ArrayList<PageViewsBean>();
list.add(pvb1);
list.add(pvb2);
list.add(pvb3);
list.add(pvb4);
System.out.println(list);
Collections.sort(list);
System.out.println(list);
}
}
VisitBean
package mr.flow.weblog.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* 記錄的是一個訪問會話的 ip 地址 進入時間 出來時間 進來頁面 出來頁面 從哪個頁面過來的 總共瀏覽過多少個頁面
* @author 湯小萌
* @date 2018年11月28日 下午9:01:17
*/
public class VisitBean implements Writable {
private String session;
private String remote_addr;
private String inTime;
private String outTime;
private String inPage;
private String outPage;
private String referal;
private int pageVisits;
public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
this.session = session;
this.remote_addr = remote_addr;
this.inTime = inTime;
this.outTime = outTime;
this.inPage = inPage;
this.outPage = outPage;
this.referal = referal;
this.pageVisits = pageVisits;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(inTime);
out.writeUTF(outTime);
out.writeUTF(inPage);
out.writeUTF(outPage);
out.writeUTF(referal);
out.writeInt(pageVisits);
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.inTime = in.readUTF();
this.outTime = in.readUTF();
this.inPage = in.readUTF();
this.outPage = in.readUTF();
this.referal = in.readUTF();
this.pageVisits = in.readInt();
}
@Override
public String toString() {
return session + "\001" + remote_addr + "\001" + inTime + "\001" +
outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getInTime() {
return inTime;
}
public void setInTime(String inTime) {
this.inTime = inTime;
}
public String getOutTime() {
return outTime;
}
public void setOutTime(String outTime) {
this.outTime = outTime;
}
public String getInPage() {
return inPage;
}
public void setInPage(String inPage) {
this.inPage = inPage;
}
public String getOutPage() {
return outPage;
}
public void setOutPage(String outPage) {
this.outPage = outPage;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public int getPageVisits() {
return pageVisits;
}
public void setPageVisits(int pageVisits) {
this.pageVisits = pageVisits;
}
}
WeblogParser
package mr.flow.weblog.bean;
import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
/**
* 對載入進來的資料進行
* @author 湯小萌
*
*/
public class WeblogParser {
/**
* 0 ) 194.237.142.21
1 ) -
2 ) -
3 ) [18/Sep/2013:06:49:18
4 ) +0000]
5 ) "GET
6 ) /wp-content/uploads/2013/07/rstudio-git3.png
7 ) HTTP/1.1"
8 ) 304
9 ) 0
10 ) "-"
11 ) "Mozilla/4.0
12 ) (compatible;)"
* @param line
* @return
*/
public static WeblogBean parser(String line) {
WeblogBean weblogBean = new WeblogBean();
String[] arr = line.split(" ");
if(arr.length >11) {
weblogBean.setRemote_addr(arr[0]);
weblogBean.setRemote_user(arr[1]);
String time_local = formatDate(arr[3].substring(1));
if(null == time_local) time_local = "-invalid_time-";
weblogBean.setTime_local(time_local);
weblogBean.setRequest(arr[6]);
weblogBean.setStatus(arr[8]);
weblogBean.setBody_bytes_sent(arr[9]);
weblogBean.setHttp_referer(arr[10]);
// 如果useragent元素較多,則拼接useragent
if(arr.length > 12) {
StringBuffer sb = new StringBuffer();
for(int i = 11; i < arr.length; i++) {
sb.append(arr[i]);
}
weblogBean.setHttp_user_agent(sb.toString());
} else {
weblogBean.setHttp_user_agent(arr[11]);
}
if(Integer.parseInt(weblogBean.getStatus()) >= 400) { // 狀態碼 >=400 說明請求錯誤
weblogBean.setValid(false);
}
if("-invalid_time-".equals(weblogBean.getTime_local())) {
weblogBean.setValid(false);
}
} else {
weblogBean.setValid(false);
}
return weblogBean;
}
/**
* 過來靜態資源
*/
public static void filterStaticResource(WeblogBean bean, Set<String> pages) {
if(!pages.contains(bean.getRequest())) {
bean.setValid(false); // 在這些定義的url資源以外的資源都是作為靜態資源處理
}
}
public static SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);
public static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
/**
* 時間轉換
* @param time_local
* @return
*/
public static String formatDate(String time_local) {
try {
return sdf2.format(sdf1.parse(time_local));
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
@Test
public void testSpilt() {
String str = "194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] \"GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/4.0 (compatible;)\"";
String[] arr = str.split(" ");
int i = 1;
for(String s : arr) {
System.out.println(i + " ) " + s);
i++;
}
}
@Test
public void testProp() throws IOException {
}
public static void main(String[] args) throws IOException {
Properties pop = new Properties();
InputStream is = WeblogParser.class.getClassLoader().getResourceAsStream("com/thp/bigdata/webClick/mrBean/url_1.propeties");
pop.load(is);
String str = (String) pop.get("url");
System.out.println(str);
}
}
比較器
IpGroupingComparator
package mr.flow.weblog.bean;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 自定義的聚合規則
* 當key的ip相同的時候,就放入同一個reduce進行處理
* @author 湯小萌
*
*/
public class IpGroupingComparator extends WritableComparator {
public IpGroupingComparator() {
super(WeblogBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
WeblogBean aBean = (WeblogBean) a;
WeblogBean bBean = (WeblogBean) b;
return aBean.getRemote_addr().compareTo(bBean.getRemote_addr());
}
}
SessionIdGroupingComparator
package mr.flow.weblog.bean;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 自定義的聚合規則
* 相同的sessionId要進入同一個reduce進行處理
* @author 湯小萌
* @date 2018年11月28日 下午8:55:13
*/
public class SessionIdGroupingComparator extends WritableComparator {
public SessionIdGroupingComparator() {
super(PageViewsBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
PageViewsBean aBean = (PageViewsBean) a;
PageViewsBean bBean = (PageViewsBean) b;
// System.out.println(aBean.getSession() + " -- " + bBean.getSession());
// System.out.println(aBean.getSession().compareTo(bBean.getSession()));
return aBean.getSession().compareTo(bBean.getSession());
}
}
1. WeblogPreProcess 日誌預處理
package mr.flow.weblog.pre;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import mr.flow.weblog.bean.WeblogBean;
import mr.flow.weblog.bean.WeblogParser;
/**
* 處理原始的日誌,過濾出真實的PV情況
* 1)轉換時間格式
* 2)對缺失的欄位填充預設值
* 3)對記錄標記valid和invalid
* @author 湯小萌
*
*/
public class WeblogPreProcess {
static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Set<String> pages = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 從外部載入網站url分類
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
/*pages.add("/about/");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");*/
Properties pop = new Properties();
InputStream in = WeblogPreProcessMapper.class.getClassLoader().getResourceAsStream("url.propeties");
pop.load(in);
String urlStr = pop.getProperty("url");
String[] urls = urlStr.split(",");
for(String url : urls) {
pages.add(url);
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
WeblogBean weblogBean = WeblogParser.parser(line);
// 可插拔的方法 : 過濾 js/圖片/css等靜態資源
WeblogParser.filterStaticResource(weblogBean, pages);
if(weblogBean.isValid()) { // 無效的資料都被過濾出去了
k.set(weblogBean.toString());
context.write(k, v);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/input/access.log.fensi"));
FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/output"));
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("f:/weblog_2/output"))) {
fs.delete(new Path("f:/weblog_2/output"), true);
}
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
2. ClickStream
package mr.flow.weblog.mr;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import mr.flow.weblog.bean.IpGroupingComparator;
import mr.flow.weblog.bean.WeblogBean;
/**
* 將清洗過後的資料梳理出點選流pageViews模型資料
* 輸入的資料是經過預處理之後的資料
*
* 區分每一次會話,給每一次會話打上sessionId
* 梳理出每一次會話所訪問的每個頁面 (請求時間,url,停留時長,以及該頁面在這次session中的序號)
* 保留http_referer body_bytes_sent http_user_agent
*
* @author 湯小萌
*
*/
public class ClickStream {
static class ClickStreamMapper extends Mapper<LongWritable, Text, WeblogBean, Text> {
WeblogBean k = new WeblogBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("\001");
if(fields.length < 9) return;
k.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5],
fields[6], fields[7], fields[8]);
if(k.isValid()) { // 只有有效果的資料才會進入後續的處理
context.write(k, v);
}
}
}
/**
* 需要生成的資料:
*
* sessionId ip time_local request step http_referer Http_user_agent http_user_agent body_bytes_sent status
*
* @author 湯小萌
*
*/
static class ClickStreamReducer extends Reducer<WeblogBean, Text, Text, NullWritable> {
Text k = new Text();
NullWritable v = NullWritable.get();
@Override
protected void reduce(WeblogBean beanKey, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// System.out.println(beanKey);
System.out.println("---------------");
int step = 1; // 這個頁面在這個session是第幾次訪問的
String sessionId = UUID.randomUUID().toString(); // 生成sessionId
String lastTimeStr = null;
String lastSaveStr = null; // 需要保留上一條記錄的後面字串
String lastIpAndUser = null; // 需要保留的上一條記錄的ip地址和使用者屬性
String lastUrl = null; // 需要保留的上一條記錄的訪問的url
Long stayTime = 0L; // 前後兩次停留的時間
for(Text value : values) {
// System.out.println(beanKey);
/*k.set(sessionId+"\001"+beanKey.toString()+"\001"+beanKey.getRemote_user() + "\001" +
beanKey.getTime_local() + "\001" + beanKey.getRequest() + "\001" + step + "\001" + (60) +
"\001" + beanKey.getHttp_referer() + "\001" + beanKey.getHttp_user_agent() + "\001" +
beanKey.getBody_bytes_sent() + "\001"
+ beanKey.getStatus());
context.write(k, v);*/
if(lastTimeStr != null) {
try {
// beanKey又是下一次的記錄了 lastTimeStr 保留的是上一條記錄的訪問時間
// stayTime = toDate(beanKey.getTime_local()).getTime() - toDate(lastTimeStr).getTime();
stayTime = timeDiff(beanKey.getTime_local(), lastTimeStr);
} catch (ParseException e) {
e.printStackTrace();
}
if(stayTime < 30*60*1000) { // 同一個IP訪問的時間差 < 30 min 認為是同一個 session
k.set(sessionId + "\001" + lastIpAndUser + "\001" + lastUrl + "\001" + lastTimeStr + "\001" +
step + "\001" + (stayTime/1000) + "\001" + lastSaveStr);
// 往外寫資料了
context.write(k, v);
step++;
} else { // 同一個IP訪問的時間差 > 30min 認為是不同的session 上一條記錄的訪問時間 是 60
k.set(sessionId + "\001" + lastIpAndUser + "\001" + lastUrl + "\001" + lastTimeStr + "\001" +
step + "\001" + (60) + "\001" + lastSaveStr);
context.write(k, v); // 這一系的ip的最後一條資料 在這裡是不輸出的, 還要繼續往下走
// 輸出完上一條之後,重置step編號
step = 1;
// session 也要重新生成
sessionId = UUID.randomUUID().toString();
}
}
// 初識的設定
lastTimeStr = beanKey.getTime_local();
lastSaveStr = beanKey.getHttp_referer() + "\001" + beanKey.getHttp_user_agent() + "\001" +
beanKey.getBody_bytes_sent() + "\001" + beanKey.getStatus();
lastUrl = beanKey.getRequest();
lastIpAndUser = beanKey.getRemote_addr() + "\001" + beanKey.getRemote_user() ;
}
// 下面的這條資料是最後一條資料
k.set(sessionId + "\001" + lastIpAndUser + "\001" + lastUrl + "\001" + lastTimeStr + "\001" +
step + "\001" + (60) + "\001" + lastSaveStr);
context.write(k, v);
System.out.println("---------------");
}
// **********************工具方法************************
private String toStr(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
return sdf.format(date);
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
return sdf.parse(timeStr);
}
// 算時間差
private long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStream.class);
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
job.setMapOutputKeyClass(WeblogBean.class);
job.setMapOutputValueClass(Text.class);
// out
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(IpGroupingComparator.class);
FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/output"));
FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/pageviews"));
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("f:/weblog_2/pageviews"))) {
fs.delete(new Path("f:/weblog_2/pageviews"), true);
}
job.waitForCompletion(true);
}
}
3. ClickStreamVisit
package mr.flow.weblog.mr;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import mr.flow.weblog.bean.PageViewsBean;
import mr.flow.weblog.bean.SessionIdGroupingComparator;
import mr.flow.weblog.bean.VisitBean;
/**
* 從PageViews模型中根據sessionId來繼續梳理出同一次會話中的資訊
*
* 梳理之後向外輸出的資料的格式 :
* sessionId 開始訪問的時間 訪問結束的時間 開始的頁面 訪問結束的頁面 總共訪問的頁數
*
* @author 湯小萌
* @date 2018年11月28日 下午8:42:26
*/
public class ClickStreamVisit {
static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, PageViewsBean, Text> {
/**
* 這個Mapper的輸出可以為NullWritable 由於當我我在測試的時候就寫成了Text就一直沒改
*/
PageViewsBean beanKey = new PageViewsBean();
// NullWritable v = NullWritable.get();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("\001");
int step = Integer.parseInt(fields[5]); // 需要將每個PageViewBean的在這個session中是處在第幾步這個step變成int型別
beanKey.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
v.set(beanKey.getSession() + " " + step);
context.write(beanKey, v);
}
}
static class ClickStreamVisitReducer extends Reducer<PageViewsBean, Text, NullWritable, VisitBean> {
NullWritable k = NullWritable.get();
// 取這次visit的首尾pageViews記錄,放入VisitBean中
VisitBean visitBean = new VisitBean();
@Override
protected void reduce(PageViewsBean beanKey, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// System.out.println("----------------");
ArrayList<PageViewsBean> pvBeanList = new ArrayList<PageViewsBean>();
for(Text str : values) {
// System.out.println(beanKey + " || " + str);
// 不能直接這樣天劍 是為是引用型別
// pvBeanList.add(beanKey);
PageViewsBean pvBean = new PageViewsBean();
try {
BeanUtils.copyProperties(pvBean, beanKey);
pvBeanList.add(pvBean);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
// System.out.println(pvBeanList);
// System.out.println("----------------");
// 取visit 的首記錄
// visitBean.setInPage(pvBeanList.get(0).getRequest());
visitBean.setInTime(pvBeanList.get(0).getTimeStr());
// 取visit 的尾記錄
visitBean.setOutPage(pvBeanList.get(pvBeanList.size() - 1).getRequest());
visitBean.setOutTime(pvBeanList.get(pvBeanList.size() - 1).getTimeStr());
// visit訪問的頁面數
visitBean.setPageVisits(pvBeanList.size());
// 來訪者的ip
visitBean.setRemote_addr(pvBeanList.get(0).getRemote_addr());
// 本次visit的referal
visitBean.setReferal(pvBeanList.get(0).getReferal());
visitBean.setSession(pvBeanList.get(0).getSession());
context.write(k, visitBean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamVisit.class);
job.setMapperClass(ClickStreamVisitMapper.class);
job.setReducerClass(ClickStreamVisitReducer.class);
job.setMapOutputKeyClass(PageViewsBean.class);
// job.setMapOutputValueClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputKeyClass(VisitBean.class);
job.setGroupingComparatorClass(SessionIdGroupingComparator.class);
FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/pageviews/testLog.txt"));
FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/visitout"));
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("f:/weblog_2/visitout"))) {
fs.delete(new Path("f:/weblog_2/visitout"), true);
}
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}