大資料離線---網站日誌流量分析系統(2)---資料獲取和預處理
阿新 • • 發佈:2018-12-10
本次接上一篇,進行實際資料的獲取和預處理,會有較多的程式碼內容
- 資料的獲取
- 資料的預處理
資料的獲取
需求
資料採集的需求廣義上來說分為兩大部分。
- 是在頁面採集使用者的訪問行為,具體開發工作: 1、 開發頁面埋點 js,採集使用者訪問行為 2、 後臺接受頁面 js 請求記錄日誌
- 是從 web 伺服器上匯聚日誌到 HDFS,是資料分析系統的資料採集,具體的技術實現有很多方式:
- Shell 指令碼 優點:輕量級,開發簡單 缺點:對日誌採集過程中的容錯處理不便控制
- Java 採集程式 優點:可對採集過程實現精細控制 缺點:開發工作量大
- Flume 日誌採集框架
成熟的開源日誌採集系統,且本身就是 hadoop 生態體系中的一員,與hadoop體系中的各種框架元件具有天生的親和力,可擴充套件性強。在網站 web 流量日誌分析這種場景中,對資料採集部分的可靠性、容錯能力要求通常不會非常嚴苛,因此使用通用的 flume 日誌採集框架完全可以滿足需求。
本專案即使用 flume 來實現日誌採集。
Flume 日誌採集系統搭建
- 日誌切割
nginx 伺服器所生成的流量日誌,存放在各臺 nginx 伺服器上。由於 nginx 沒有自動分開檔案儲存日誌的機制, 不會自動地進行切割, 都是寫在一個檔案 access.log 當中的。 如果訪問量很大的話,將導致日誌檔案容量非常大,不便於管理。 那就需要手動對這個檔案進行切割。
切割需要使用 date 命令以獲得昨天的日期、使用 kill 命令向 Nginx 程序傳送重新開啟日誌檔案的訊號,以及 crontab 設定執行任務週期。 我們需要每天零點將前一天的日誌存為另外一個檔案,這裡我們就將 Nginx 位於 logs 目錄中的 access.log 存為access_[yyyy-MM-dd].log 的檔案。
自動切割 shell 指令碼 nginx_log.sh:
#!/bin/bash #設定日誌檔案存放目錄 logs_path="/usr/local/nginx/logs/" #設定 pid 檔案 pid_path="/usr/local/nginx/nginx-1.7.3/logs/nginx.pid" #日誌檔案 filepath=${logs_path}"access.log" # Source function library. #重新命名日誌檔案 mv ${logs_path}access.log ${logs_path}access_$(date -d '-1 day' '+%Y-%m-%d').log #向 nginx 主程序發訊號重新開啟日誌 kill -USR1 `cat ${pid_path}`
crontab 設定作業(每天零時零分執行):
0 0 * * * sh /usr/local/nginx/nginx_log.sh
- Flume採集實現 Flume 採集系統的搭建相對簡單: 1、 在個 web 伺服器上部署 agent 節點,修改配置檔案 2、 啟動 agent 節點,將採集到的資料匯聚到指定的 HDFS 目錄中 針對上述的 nginx 日誌生成場景, 如果通過 flume( 1.6)收集,無論是 SpoolingDirectory Source 和 Exec Source 均不能滿足動態實時收集的需求,在當前 flume1.7 穩定版本中,提供了一個非常好用的 TaildirSource,使用這個 source,可以監控一個目錄,並且使用正則表示式匹配該目錄中的檔名進行實時收集。 核心配置如下
a1.sources = r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
說明:
- filegroups:指定 filegroups,可以有多個,以空格分隔;( TailSource 可以同時監控tail 多個目錄中的檔案)
- positionFile:配置檢查點檔案的路徑,檢查點檔案會以 json 格式儲存已經 tail 檔案的位置,解決了斷點不能續傳的缺陷。
- filegroups.: 配置每個 filegroup 的檔案絕對路徑,檔名可以用正則表示式匹配通過以上配置,就可以監控檔案內容的增加和檔案的增加。 產生和所配置的檔名正則表示式不匹配的檔案,則不會被 tail。
- 資料樣本 資料的具體內容在採集階段其實不用太關心。程式碼使用的資料來源可以在這裡獲取 點選跳轉到資料來源
8.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
資料說明: 1、訪客 ip 地址: 58.215.204.118 2、訪客使用者資訊: - - 3、請求時間: [18/Sep/2013:06:51:35 +0000] 4、請求方式: GET 5、請求的 url: /wp-includes/js/jquery/jquery.js?ver=1.10.2 6、請求所用協議: HTTP/1.1 7、響應碼: 304 8、返回的資料流量: 0 9、訪客的來源 url: http://blog.fens.me/nodejs-socketio-chat/
資料的預處理
主要目的
過濾“不合規”資料,清洗無意義的資料格式轉換和規整,根據後續的統計需求,過濾分離出各種不同主題(不同欄目 path)的基礎資料。 資料的處理分為三個階段進行: 階段一:對無意義資料的清洗,無用的資料做標記 階段二:對相同使用者的資訊整合,進行頁面點選流模型的梳理,參考上一篇 階段三:點選流模型 Visits 表(按 session 聚集的頁面訪問資訊)
- 階段一:對無意義資料的清洗,無用的資料做標記 WebLogBean類的程式碼
package com.weblog.mr.preprocess;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WebLogBean implements Writable{
private boolean valid = true;// 判斷資料是否合法
private String remote_addr;// 記錄客戶端的ip地址
private String remote_user;// 記錄客戶端使用者名稱稱,忽略屬性"-"
private String time_local;// 記錄訪問時間與時區
private String request;// 記錄請求的url與http協議
private String status;// 記錄請求狀態;成功是200
private String body_bytes_sent;// 記錄傳送給客戶端檔案主體內容大小
private String http_referer;// 用來記錄從那個頁面連結訪問過來的
private String http_user_agent;// 記錄客戶瀏覽器的相關資訊
public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public void setValid(boolean valid) {
this.valid = valid;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public void setRequest(String request) {
this.request = request;
}
public void setStatus(String status) {
this.status = status;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public String getRemote_addr() {
return remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public String getTime_local() {
return time_local;
}
public String getRequest() {
return request;
}
public String getStatus() {
return status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
//重新整合資料
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.getRemote_addr());
sb.append("\001").append(this.getRemote_user());
sb.append("\001").append(this.getTime_local());
sb.append("\001").append(this.getRequest());
sb.append("\001").append(this.getStatus());
sb.append("\001").append(this.getBody_bytes_sent());
sb.append("\001").append(this.getHttp_referer());
sb.append("\001").append(this.getHttp_user_agent());
return sb.toString();
}
//重新定義WebLogBean的序列化方式,減輕無用資訊的負載
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.valid);
out.writeUTF(null==remote_addr?"":remote_addr);
out.writeUTF(null==remote_user?"":remote_user);
out.writeUTF(null==time_local?"":time_local);
out.writeUTF(null==request?"":request);
out.writeUTF(null==status?"":status);
out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
out.writeUTF(null==http_referer?"":http_referer);
out.writeUTF(null==http_user_agent?"":http_user_agent);
}
@Override
public void readFields(DataInput in) throws IOException {
this.valid = in.readBoolean();
this.remote_addr = in.readUTF();
this.remote_user = in.readUTF();
this.time_local = in.readUTF();
this.request = in.readUTF();
this.status = in.readUTF();
this.body_bytes_sent = in.readUTF();
this.http_referer = in.readUTF();
this.http_user_agent = in.readUTF();
}
}
處理資料清洗的方法類WebLogParser類
package com.weblog.mr.preprocess;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;
public class WebLogParser {
static SimpleDateFormat df1= new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
static SimpleDateFormat df2= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
//對日誌檔案的實際處理方法
public static WebLogBean parser(String line){
WebLogBean webLogBean = new WebLogBean();
//切割
String[] arr = line.split(" ");
if(arr.length>11){
//設定IP地址
webLogBean.setRemote_addr(arr[0]);
//設定使用者資訊
webLogBean.setRemote_user(arr[1]);
//[18/Sep/2013:06:49:18 +0000] 設定時間資訊
String time_local = formatDate(arr[3].substring(1));
webLogBean.setTime_local(time_local);
//HTTP/1.1
webLogBean.setRequest(arr[6]);
//304
webLogBean.setStatus(arr[8]);
//0
webLogBean.setBody_bytes_sent(arr[9]);
//Mozilla/4.0 (compatible;)
webLogBean.setHttp_referer(arr[10]);
//使用者瀏覽器的資訊較多,需要進行拼接
//"Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for(int i=11;i<arr.length;i++){
sb.append(arr[i]);
}
webLogBean.setHttp_user_agent(sb.toString());
}else{
//如果不大於12 ,說明使用者瀏覽器的資訊DNSPod-Monitor/1.0,類似這種
webLogBean.setHttp_user_agent(arr[11]);
}
//對於響應碼大於400的,設定valid為false
if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大於400,HTTP錯誤
webLogBean.setValid(false);
}
//非法時間格式的也設定為false;
if("-invalid_time-".equals(webLogBean.getTime_local())){
webLogBean.setValid(false);
}
}else{
return null;
}
return webLogBean;
}
//格式化時間的方法
private static String formatDate(String time_locale) {
try {
return df2.format(df1.parse(time_locale));
} catch (ParseException e) {
e.printStackTrace();
return null;
}
}
public static void filtStaticResource(WebLogBean bean,Set<String> pages){
//如果不是規定的協議請求,標記為false
if (!pages.contains(bean.getRequest())) {
bean.setValid(false);
}
}
}
執行檔案處理的類WeblogPreProcess
package com.weblog.mr.preprocess;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class WeblogPreProcess {
static class WeblogPreProcessMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
Set<String> pages = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();
//從外部配置檔案中載入網站的有用url分類資料 儲存到maptask的記憶體中,用來對日誌資料進行過濾
//wp-content/themes/silesia/images/ico-twitter.png HTTP/1.1
@Override
protected void setup(Context context) throws IOException, InterruptedException {
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
WebLogBean webLogBean = WebLogParser.parser(line);
if (webLogBean !=null){
//過濾CSS/圖片/js等靜態資源
WebLogParser.filtStaticResource(webLogBean,pages);
k.set(webLogBean.toString());
context.write(k,v);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//載入類資訊
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
//載入輸出的資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//載入檔案的位置
FileInputFormat.setInputPaths(job, new Path("d:/weblog/input"));
FileOutputFormat.setOutputPath(job, new Path("d:/weblog/output"));
//設定reduce不執行
job.setNumReduceTasks(0);
//輸出結果
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
- 階段二:對相同使用者的資訊整合,進行頁面點選流模型的梳理 資料裝載類PageViewsBean
package cn.itcast.mr.weblog.pageview;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class PageViewsBean implements Writable {
private String session;
private String remote_addr;
private String timestr;
private String request;
private int step;
private String staylong;
private String referal;
private String useragent;
private String bytes_send;
private String status;
public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {
this.session = session;
this.remote_addr = remote_addr;
this.useragent = useragent;
this.timestr = timestr;
this.request = request;
this.step = step;
this.staylong = staylong;
this.referal = referal;
this.bytes_send = bytes_send;
this.status = status;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getTimestr() {
return timestr;
}
public void setTimestr(String timestr) {
this.timestr = timestr;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public int getStep() {
return step;
}
public void setStep(int step) {
this.step = step;
}
public String getStaylong() {
return staylong;
}
public void setStaylong(String staylong) {
this.staylong = staylong;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public String getUseragent() {
return useragent;
}
public void setUseragent(String useragent) {
this.useragent = useragent;
}
public String getBytes_send() {
return bytes_send;
}
public void setBytes_send(String bytes_send) {
this.bytes_send = bytes_send;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.timestr = in.readUTF();
this.request = in.readUTF();
this.step = in.readInt();
this.staylong = in.readUTF();
this.referal = in.readUTF();
this.useragent = in.readUTF();
this.bytes_send = in.readUTF();
this.status = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(timestr);
out.writeUTF(request);
out.writeInt(step);
out.writeUTF(staylong);
out.writeUTF(referal);
out.writeUTF(useragent);
out.writeUTF(bytes_send);
out.writeUTF(status);
}
}
資料處理類
package cn.itcast.mr.weblog.pageview;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;
import cn.itcast.mr.weblog.preprocess.WebLogBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
* 將清洗之後的日誌梳理出點選流pageviews模型資料
*
* 輸入資料是清洗過後的結果資料
*
* 區分出每一次會話,給每一次visit(session)增加了session-id(隨機uuid)
* 梳理出每一次會話中所訪問的每個頁面(請求時間,url,停留時長,以及該頁面在這次session中的序號)
* 保留referral_url,body_bytes_send,useragent
*
*
* @author
*
*/
public class ClickStreamPageView {
static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
Text k = new Text();
WebLogBean v = new WebLogBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\001");
if (fields.length < 9) return;
//將切分出來的各欄位set到weblogbean中
v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
//只有有效記錄才進入後續處理
if (v.isValid()) {
//此處用ip地址來標識使用者
k.set(v.getRemote_addr());
context.write(k, v);
}
}
}
static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
// 先將一個使用者的所有訪問記錄中的時間拿出來排序
try {
for (WebLogBean bean : values) {
WebLogBean webLogBean = new WebLogBean();
try {
BeanUtils.copyProperties(webLogBean, bean);
} catch(Exception e) {
e.printStackTrace();
}
beans.add(webLogBean);
}
//將bean按時間先後順序排序
Collections.sort(beans, new Comparator<WebLogBean>() {
@Override
public int compare(WebLogBean o1, WebLogBean o2) {
try {
Date d1 = toDate(o1.getTime_local());
Date d2 = toDate(o2.getTime_local());
if (d1 == null || d2 == null)
return 0;
return d1.compareTo(d2);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
});
/**
* 以下邏輯為:從有序bean中分辨出各次visit,並對一次visit中所訪問的page按順序標號step
* 核心思想:
* 就是比較相鄰兩條記錄中的時間差,如果時間差<30分鐘,則該兩條記錄屬於同一個session
* 否則,就屬於不同的session
*
*/
int step = 1;
String session = UUID.randomUUID().toString();
for (int i = 0; i < beans.size(); i++) {
WebLogBean bean = beans.get(i);
// 如果僅有1條資料,則直接輸出
if (1 == beans.size()) {
// 設定預設停留時長為60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
+ bean.getStatus());
context.write(NullWritable.get(), v);
session = UUID.randomUUID().toString();
break;
}
// 如果不止1條資料,則將第一條跳過不輸出,遍歷第二條時再輸出
if (i == 0) {
continue;
}
// 求近兩次時間差
long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
// 如果本次-上次時間差<30分鐘,則輸出前一次的頁面訪問資訊
if (timeDiff < 30 * 60 * 1000) {
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果本次-上次時間差>30分鐘,則輸出前一次的頁面訪問資訊且將step重置,以分隔為新的visit
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
// 輸出完上一條之後,重置step編號
step = 1;
session = UUID.randomUUID().toString();
}
// 如果此次遍歷的是最後一條,則將本條直接輸出
if (i == beans.size() - 1) {
// 設定預設停留市場為60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
context.write(NullWritable.get(), v);
}
}
} catch (ParseException e) {
e.printStackTrace();
}
}
private String toStr(Date date) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.format(date);
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.parse(timeStr);
}
private long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
private long timeDiff(Date time1, Date time2) throws ParseException {
return time1.getTime() - time2.getTime();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamPageView.class);
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLogBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("d:/weblog/output"));
FileOutputFormat.setOutputPath(job, new Path("d:/weblog/pageviews"));
job.waitForCompletion(true);
}
}
- 階段三:點選流模型 Visits 表(按 session 聚集的頁面訪問資訊) 資料裝載類VisitBean
package cn.itcast.mr.weblog.vists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class VisitBean implements Writable {
private String session;
private String remote_addr;
private String inTime;
private String outTime;
private String inPage;
private String outPage;
private String referal;
private int pageVisits;
public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
this.session = session;
this.remote_addr = remote_addr;
this.inTime = inTime;
this.outTime = outTime;
this.inPage = inPage;
this.outPage = outPage;
this.referal = referal;
this.pageVisits = pageVisits;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getInTime() {
return inTime;
}
public void setInTime(String inTime) {
this.inTime = inTime;
}
public String getOutTime() {
return outTime;
}
public void setOutTime(String outTime) {
this.outTime = outTime;
}
public String getInPage() {
return inPage;
}
public void setInPage(String inPage) {
this.inPage = inPage;
}
public String getOutPage() {
return outPage;
}
public void setOutPage(String outPage) {
this.outPage = outPage;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public int getPageVisits() {
return pageVisits;
}
public void setPageVisits(int pageVisits) {
this.pageVisits = pageVisits;
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.inTime = in.readUTF();
this.outTime = in.readUTF();
this.inPage = in.readUTF();
this.outPage = in.readUTF();
this.referal = in.readUTF();
this.pageVisits = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(inTime);
out.writeUTF(outTime);
out.writeUTF(inPage);
out.writeUTF(outPage);
out.writeUTF(referal);
out.writeInt(pageVisits);
}
@Override
public String toString() {
return session + "\001" + remote_addr + "\001" + inTime + "\001" + outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits;
}
}
資料處理類ClickStreamVisit
package cn.itcast.mr.weblog.vists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import cn.itcast.mr.weblog.pageview.PageViewsBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 輸入資料:pageviews模型結果資料
* 從pageviews模型結果資料中進一步梳理出visit模型
* sessionid start-time out-time start-page out-page pagecounts ......
*
* @author
*
*/
public class ClickStreamVisit {
// 以session作為key,傳送資料到reducer
static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {
PageViewsBean pvBean = new PageViewsBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\001");
int step = Integer.parseInt(fields[5]);
//(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status)
//299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200
pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
k.set(pvBean.getSession());
context.write(k, pvBean);
}
}
static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
@Override
protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {
// 將pvBeans按照step排序
ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
for (PageViewsBean pvBean : pvBeans) {
PageViewsBean bean = new PageViewsBean();
try {
BeanUtils.copyProperties(bean, pvBean);
pvBeansList.add(bean);
} catch (Exception e) {
e.printStackTrace();
}
}
Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {
@Override
public int compare(PageViewsBean o1, PageViewsBean o2) {
return o1.getStep() > o2.getStep() ? 1 : -1;
}
});
// 取這次visit的首尾pageview記錄,將資料放入VisitBean中
VisitBean visitBean = new VisitBean();
// 取visit的首記錄
visitBean.setInPage(pvBeansList.get(0).getRequest());
visitBean.setInTime(pvBeansList.get(0).getTimestr());
// 取visit的尾記錄
visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
// visit訪問的頁面數
visitBean.setPageVisits(pvBeansList.size());
// 來訪者的ip
visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
// 本次visit的referal
visitBean.setReferal(pvBeansList.get(0).getReferal());
visitBean.setSession(session.toString());
context.write(NullWritable.get(), visitBean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamVisit.class);
job.setMapperClass(ClickStreamVisitMapper.class);
job.setReducerClass(ClickStreamVisitReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageViewsBean.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(VisitBean.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("d:/weblog/pageviews"));
FileOutputFormat.setOutputPath(job, new Path("d:/weblog/visitout"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
上述各項操作, 在實際的生產環境中, 明顯是需要週期性執行的。所以任務的工作流排程就顯得特別的重要。 除了使用我們 linux 自帶的定時排程外,市面上還有許多成熟的排程產品。
下一篇介紹工作流排程器。