Hadoop實戰: 論壇點選流日誌分析
簡介
網站點選流日誌資料,比如,點選了哪一個連結,在哪個網頁停留時間最多,採用了哪個搜尋項、總體瀏覽時間等。而所有這些資訊都可被儲存在網站日誌中。通過分析這些資料,可以獲知許多對網站運營至關重要的資訊。採集的資料越全面,分析就能越精準。專案主要使用的技術有MapReduce,Hive,Sqoop,Spring,SpringMVC,Mybatis,Echarts;其中,在資料規整和ETL階段,只要使用的技術時MapReduce,HIve,Sqoop等工具,程式碼我已經放在GitHub上;在進行資料抽離後,使用JavaEE框架進行視覺化展示,這裡的程式碼放在這裡。
概念介紹
點選流概念
點選流這個概念更注重使用者瀏覽網站的整個流程,網站日誌中記錄的使用者點選就像是圖上的“點”,而點選流更像是將這些“點”串起來形成的“線”。也可以把“點”認為是網站的Page,而“線”則是訪問網站的Session。所以
點選流模型生成
點選流資料在具體操作上是由散點狀的點選日誌資料梳理所得,從而,點選資料在資料建模時應該存在兩張模型表(Pageviews和visits)。
1、用於生成點選流的訪問日誌表
時間戳 | IP地址 | Cookie | Session | 請求URL | Referal |
2012-01-01 12:31:12 | 101.0.0.1 | User01 | S001 | /a/... | somesite.com |
2012-01-01 12:31:16 | 201.0.0.2 | User02 | S002 | /a/... | - |
2012-01-01 12:33:06 | 101.0.0.2 | User03 | S002 | /b/... | baidu.com |
2012-01-01 15:16:39 | 234.0.0.3 | User01 | S003 | /c/... | google.com |
2012-01-01 15:17:11 | 101.0.0.1 | User01 | S004 | /d/... | /c/... |
2012-01-01 15:19:23 | 101.0.0.1 | User01 | S004 | /e/... | /d/.... |
2、頁面點選流模型Pageviews表(按session聚集的訪問頁面資訊)(每個session中的每個url也即是訪問頁面,的記錄資訊, 想差半個小時了就認為是下一個session了)
Session | userid | 時間 | 訪問頁面URL | 停留時長 | 第幾步 |
S001 | User01 | 2012-01-01 12:31:12 | /a/.... | 30 | 1 |
S002 | User02 | 2012-01-01 12:31:16 | /a/.... | 10 | 1 |
S002 | User02 | 2012-01-01 12:33:06 | /b/.... | 110 | 2 |
S002 | User02 | 2012-01-01 12:35:06 | /e/.... | 30 | 3 |
Session | 起始時間 | 結束時間 | 進入頁面 | 離開頁面 | 訪問頁面數 | IP | cookie | referal |
S001 | 2012-01-01 12:31:12 | 2012-01-01 12:31:12 | /a/... | /a/... | 1 | 101.0.0.1 | User01 | somesite.com |
S002 | 2012-01-01 12:31:16 | 2012-01-01 12:35:06 | /a/... | /e/... | 3 | 201.0.0.2 | User02 | - |
S003 | 2012-01-01 12:35:42 | 2012-01-01 12:35:42 | /c/... | /c/... | 1 | 234.0.0.3 | User03 | baidu.com |
S004 | 2012-01-01 15:16:39 | 2012-01-01 15:19:23 | /c/... | /e/... | 3 | 101.0.0.1 | User01 | google.com |
…… | …… | …… | …… | …… | …… | …… | …… | …… |
這就是點選流模型。當WEB日誌轉化成點選流資料的時候,很多網站分析度量的計算變得簡單了,這就是點選流的“魔力”所在。基於點選流資料我們可以統計出許多常見的網站分析度量
多維度網站流量分析
細分是指通過不同維度對指標進行分割,檢視同一個指標在不同維度下的表現,進而找出有問題的那部分指標,對這部分指標進行優化。
網站常用的指標分析
PV(Page View)訪問量, 即頁面瀏覽量或點選量,衡量網站使用者訪問的網頁數量;在一定統計週期內使用者每開啟或重新整理一個頁面就記錄1次,多次開啟或重新整理同一頁面則瀏覽量累計。
UV(Unique Visitor)獨立訪客,統計1天內訪問某站點的使用者數(以cookie為依據);訪問網站的一臺電腦客戶端為一個訪客。可以理解成訪問某網站的電腦的數量。網站判斷來訪電腦的身份是通過來訪電腦的cookies實現的。如果更換了IP後但不清除cookies,再訪問相同網站,該網站的統計中UV數是不變的。如果使用者不儲存cookies訪問、清除了cookies或者更換裝置訪問,計數會加1。00:00-24:00內相同的客戶端多次訪問只計為1個訪客。
IP(Internet Protocol)獨立IP數,是指1天內多少個獨立的IP瀏覽了頁面,即統計不同的IP瀏覽使用者數量。同一IP不管訪問了幾個頁面,獨立IP數均為1;不同的IP瀏覽頁面,計數會加1。 IP是基於使用者廣域網IP地址來區分不同的訪問者的,所以,多個使用者(多個區域網IP)在同一個路由器(同一個廣域網IP)內上網,可能被記錄為一個獨立IP訪問者。如果使用者不斷更換IP,則有可能被多次統計。
專案的機構
系統的資料分析不是一次性的,而是按照一定的時間頻率反覆計算,因而整個處理鏈條中的各個環節需要按照一定的先後依賴關係緊密銜接,即涉及到大量任務單元的管理排程,所以,專案中需要新增一個任務排程模組
技術選型
在點選流日誌分析這種場景中,對資料採集部分的可靠性、容錯能力要求通常不會非常嚴苛,因此使用通用的flume日誌採集框架完全可以滿足需求。
本專案即使用flume來實現日誌採集。
資料內容
資料的意義:
IP地址
時間
請求方式
請求地址
狀態碼
請求位元組數
來源url地址
終端
資料規整
在收集到網站日誌資料後,網站日誌資料我放在這裡,根據專案需求,我們需要將相關的資料規整;
進行資料規整的目的是
過濾“不合規”資料
格式轉換和規整
根據後續的統計需求,過濾分離出各種不同主題(不同欄目path)的基礎資料
實現的程式碼如下:
package cn.edu.hust.preprocess;
import cn.edu.hust.preprocess.domain.WebLogBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class ClickStream {
static class ClickStreamMapper extends Mapper<LongWritable,Text,Text,WebLogBean>
{
public static String formatDate(String dateStr) {
if (dateStr == null || StringUtils.isBlank(dateStr)) return "2012-04-04 12.00.00";
SimpleDateFormat format = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.CHINA);
String result = null;
try {
Date date = format.parse(dateStr);
result = format1.format(date);
} catch (ParseException e) {
e.printStackTrace();
} finally {
return result;
}
}
static Text k=new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try
{
String message=value.toString();
String[] splits=message.split(" ");
if(splits.length<12) return;
String time=formatDate(splits[3].substring(1));
String method=splits[5].substring(1);
String protocol=StringUtils.isBlank(splits[7])?"HTTP/1.1":splits[7].substring(0,splits[7].length()-1);
int status= StringUtils.isBlank(splits[8])?0:Integer.parseInt(splits[8]);
int bytes=StringUtils.isBlank(splits[9])?0:Integer.parseInt(splits[9]);
String from_url=StringUtils.isBlank(splits[9])?"-":splits[10].substring(1,splits[10].length()-1);
StringBuilder sb=new StringBuilder();
for (int i=11;i<splits.length;i++)
{
sb.append(splits[i]);
}
String s=sb.toString();
String platform=s.substring(1,s.length()-1);
WebLogBean ms=new WebLogBean(splits[0],time,method,splits[6],protocol,status,bytes,from_url,platform);
k.set(splits[0]);
context.write(k,ms);
}catch (Exception e){
return ;
}
}
}
static class ClickStreamReducer extends Reducer<Text,WebLogBean,NullWritable,Text>
{
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
try
{
for (WebLogBean bean : values) {
WebLogBean webLogBean = new WebLogBean();
try {
BeanUtils.copyProperties(webLogBean, bean);
} catch(Exception e) {
e.printStackTrace();
}
beans.add(webLogBean);
}
//將bean按時間先後順序排序
Collections.sort(beans, new Comparator<WebLogBean>() {
public int compare(WebLogBean o1, WebLogBean o2) {
try {
Date d1 = toDate(o1.getTimeStr());
Date d2 = toDate(o2.getTimeStr());
if (d1 == null || d2 == null)
return 0;
return d1.compareTo(d2);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
});
/**
* 以下邏輯為:從有序bean中分辨出各次visit,並對一次visit中所訪問的page按順序標號step
*/
int step = 1;
String session = UUID.randomUUID().toString();
for (int i = 0; i < beans.size(); i++) {
WebLogBean bean = beans.get(i);
// 如果僅有1條資料,則直接輸出
if (1 == beans.size()) {
// 設定預設停留市場為60s
v.set(session+","+bean.getIp() + "," + bean.getTimeStr() + "," + bean.getRequest_url() + "," + step + "," + (60) + "," + bean.getFrom_url() + "," + bean.getPlatform() + "," + bean.getBytes() + ","
+ bean.getStatus());
context.write(NullWritable.get(), v);
session = UUID.randomUUID().toString();
break;
}
// 如果不止1條資料,則將第一條跳過不輸出,遍歷第二條時再輸出
if (i == 0) {
continue;
}
// 求近兩次時間差
long timeDiff = timeDiff(toDate(bean.getTimeStr()), toDate(beans.get(i - 1).getTimeStr()));
// 如果本次-上次時間差<30分鐘,則輸出前一次的頁面訪問資訊
if (timeDiff < 30 * 60 * 1000) {
v.set(session+","+beans.get(i - 1).getIp() + "," + beans.get(i - 1).getTimeStr() + "," + beans.get(i - 1).getRequest_url() + "," + step + "," + (timeDiff / 1000) + "," + beans.get(i - 1).getFrom_url() + ","
+ beans.get(i - 1).getPlatform() + "," + beans.get(i - 1).getBytes() + "," + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果本次-上次時間差>30分鐘,則輸出前一次的頁面訪問資訊且將step重置,以分隔為新的visit
v.set(session+","+beans.get(i - 1).getIp() + "," + beans.get(i - 1).getTimeStr() + "," + beans.get(i - 1).getRequest_url() + "," + step + "," + (60) + "," + beans.get(i - 1).getFrom_url() + ","
+ beans.get(i - 1).getPlatform()+ "," + beans.get(i - 1).getBytes()+ "," + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
// 輸出完上一條之後,重置step編號
step = 1;
session = UUID.randomUUID().toString();
}
// 如果此次遍歷的是最後一條,則將本條直接輸出
if (i == beans.size() - 1) {
// 設定預設停留市場為60s
v.set(session+","+bean.getIp() + "," + bean.getTimeStr() + "," + bean.getRequest_url() + "," + step + "," + (60) + "," + bean.getFrom_url() + "," + bean.getPlatform() + "," + bean.getBytes() + "," + bean.getStatus());
context.write(NullWritable.get(), v);
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
private static Date toDate(String timeStr) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.CHINA);
return df.parse(timeStr);
}
private static long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
private static long timeDiff(Date time1, Date time2) throws ParseException {
return time1.getTime() - time2.getTime();
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(ClickStream.class);
//設定job的mapper和reducer
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
//設定mapper過後的細節
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLogBean.class);
//設定Reducer細節
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(4);
//設定檔案輸出路徑
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean flag=job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
在MapReduce程式中,我們需要使用自定義的Bean,下面是詳細程式碼:
package cn.edu.hust.preprocess.domain;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WebLogBean implements Writable {
/*
* 來源IP
*/
private String ip;
/**
* 訪問時間
*/
private String timeStr;
/**
* 請求方式
*/
private String method;
/**
* 請求的url
*/
private String request_url;
/**
* 使用的協議
*/
private String protocol;
/**
* 狀態碼
*/
private int status;
/**
* 位元組數
*/
private int bytes;
/**
* 來源url
*/
private String from_url;
/**
* 使用的平臺
*/
private String platform;
public WebLogBean() {
}
public WebLogBean(String ip, String timeStr, String method, String request_url, String protocol, int status, int bytes, String from_url, String platform) {
this.ip = ip;
this.timeStr= timeStr;
this.method = method;
this.request_url = request_url;
this.protocol = protocol;
this.status = status;
this.bytes = bytes;
this.from_url = from_url;
this.platform = platform;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getTimeStr() {
return timeStr;
}
public void setTimeStr(String timeStr) {
this.timeStr=timeStr;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getRequest_url() {
return request_url;
}
public void setRequest_url(String request_url) {
this.request_url = request_url;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getBytes() {
return bytes;
}
public void setBytes(int bytes) {
this.bytes = bytes;
}
public String getFrom_url() {
return from_url;
}
public void setFrom_url(String from_url) {
this.from_url = from_url;
}
public String getPlatform() {
return platform;
}
public void setPlatform(String platform) {
this.platform = platform;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(ip);
dataOutput.writeUTF(this.timeStr);
dataOutput.writeUTF(this.method);
dataOutput.writeUTF(this.request_url);
dataOutput.writeUTF(this.protocol);
dataOutput.writeInt(this.status);
dataOutput.writeInt(this.bytes);
dataOutput.writeUTF(this.from_url);
dataOutput.writeUTF(this.platform);
}
public void readFields(DataInput dataInput) throws IOException {
this.ip=dataInput.readUTF();
this.timeStr=dataInput.readUTF();
this.method=dataInput.readUTF();
this.request_url=dataInput.readUTF();
this.protocol=dataInput.readUTF();
this.status=dataInput.readInt();
this.bytes=dataInput.readInt();
this.from_url=dataInput.readUTF();
this.platform=dataInput.readUTF();
}
}
在資料清洗之後,就變成了我們需要的結果,我們可以將資料匯入到HIve中,進行資料的ETL。
ETL過程
1.建立ViSiT模型
#1.建立visit模型
create external table click_stream_visit(
session string,
ip string,
timestr string,
request_url string,
setp string,
stayLong string,
from_url string,
platform string,
byte string,
status string
) partitioned by(datestr string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
這裡主要,我這裡是根據時間來進行分割槽的。
同時我們將資料規整的結果匯入到這個表中。
load data inpath '/log/output/part-r-0000*' into table click_stream_visit partition(datestr='2012-01-04');
2.建立貼源表
主要包含資料的維度表和進一步資料抽離。
#2.建立貼源表
drop table if exists ods_click_pageviews;
create table ods_click_pageviews(
session string,
remote_addr string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string,
step string,
stayLong string
)partitioned by (datestr string)
row format delimited
fields terminated by ',';
insert into table ods_click_pageviews partition(datestr='2012-01-04') select session,ip,timestr,request_url,status,byte,from_url,platform,setp,stayLong from click_stream_visit;
#建立時間維度表
drop table dim_time if exists dim_time;
create table dim_time(
year string,
month string,
day string,
hour string)
row format delimited
fields terminated by ',';
#建立瀏覽器維度表
create table dim_browser(
browser string
);
#建立終端維度表
create table dim_os(
os string
);
##建立地域維度
create table dim_region(
province string,
city string
);
3.明細表
#建立明細表
drop table ods_weblog_detail;
create table ods_weblog_detail(
remote_addr string, --來源IP
remote_user string, --使用者標識
time_local string, --訪問完整時間
daystr string, --訪問日期
timestr string, --訪問時間
yearstr string, --訪問年
month string, --訪問月
day string, --訪問日
hour string, --訪問時
request string, --請求的url
status string, --響應碼
body_bytes_sent string, --傳輸位元組數
http_referer string, --來源url
ref_host string, --來源的host
ref_path string, --來源的路徑
ref_query string, --來源引數query
http_user_agent string,--客戶終端標識
os string, ---作業系統
province string,
city string )partitioned by(datestr string) row format delimited
fields terminated by ',';
在建立這些表之後,我們將會進行資料的抽離,這裡需要自定義幾個函式,我們需要自己實現UDF,具體程式碼如下。
package cn.edu.hust.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public class BrowserUtils extends UDF {
public String evaluate(String s)
{
if(s.toLowerCase().contains("chrome"))
return "Chrome";
else if(s.toLowerCase().contains("firefox"))
return "Firefox";
else if(s.toLowerCase().contains("mozilla"))
return "Mozilla";
else if(s.toLowerCase().contains("ie"))
return "IE";
else if(s.toLowerCase().contains("opera"))
return "Oprea";
else if(s.toLowerCase().contains("safari"))
return "Safari";
else if(s.toLowerCase().contains("uc"))
return "UC";
else if(s.toLowerCase().contains("qq"))
return "QQ";
else
return "Others";
}
}
package cn.edu.hust.udf;
import cn.edu.hust.udf.bean.Pair;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
public class CityUtils extends UDF {
static ArrayList<HashMap<Pair<Long,Long>,String>> ips=new ArrayList<HashMap<Pair<Long, Long>, String>>();
static
{
BufferedReader reader= null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream("/home/hadoop/ip.txt")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
String line;
try
{
while((line=reader.readLine())!=null){
String[] splits=line.split("\\|");
Long up=Long.parseLong(splits[2]);
Long down=Long.parseLong(splits[3]);
Pair<Long,Long> pair=new Pair<Long, Long>();
pair.setFirst(up);
pair.setSecond(down);
StringBuilder sb=new StringBuilder();
sb.append(splits[6]).append("|"+splits[7]);
HashMap<Pair<Long,Long>,String> ip=new HashMap<Pair<Long, Long>, String>();
ip.put(pair,sb.toString());
ips.add(ip);
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
//獲取省份和城市
public static synchronized String getProvinceAndCity(String ip)
{
String[] splits=ip.split("\\.");
double value=0;
for(int i=0;i<splits.length;i++)
{
value+=Long.parseLong(splits[i])*Math.pow(2,8*(3-i));
}
int high=ips.size()-1;
int low=0;
while(low<=high)
{
int mid=(low+high)/2;
Pair<Long,Long> pair=(Pair<Long,Long>)ips.get(mid).keySet().toArray()[0];
if(value>=pair.getFirst()&&value<=pair.getSecond())
{
return (String)ips.get(mid).values().toArray()[0];
}
else if(value>pair.getSecond())
{
low=mid+1;
}
else if(value<pair.getFirst()) {
high = mid - 1;
}
}
return "未知|未知";
}
public synchronized String evaluate(String s)
{
String[] t=getProvinceAndCity(s).split("\\|");
System.out.println(t.length);
if(t.length<2) return t[0];
return t[1];
}
}
package cn.edu.hust.udf;
import cn.edu.hust.udf.bean.Pair;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
public class IPUtils extends UDF {
static ArrayList<HashMap<Pair<Long,Long>,String>> ips=new ArrayList<HashMap<Pair<Long, Long>, String>>();
static
{
BufferedReader reader= null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream("/home/hadoop/ip.txt")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
String line;
try
{
while((line=reader.readLine())!=null){
String[] splits=line.split("\\|");
Long up=Long.parseLong(splits[2]);
Long down=Long.parseLong(splits[3]);
Pair<Long,Long> pair=new Pair<Long, Long>();
pair.setFirst(up);
pair.setSecond(down);
StringBuilder sb=new StringBuilder();
sb.append(splits[6]).append("|"+splits[7]);
HashMap<Pair<Long,Long>,String> ip=new HashMap<Pair<Long, Long>, String>();
ip.put(pair,sb.toString());
ips.add(ip);
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
//獲取省份和城市
public static synchronized String getProvinceAndCity(String ip)
{
String[] splits=ip.split("\\.");
double value=0;
for(int i=0;i<splits.length;i++)
{
value+=Long.parseLong(splits[i])*Math.pow(2,8*(3-i));
}
int high=ips.size()-1;
int low=0;
while(low<=high)
{
int mid=(low+high)/2;
Pair<Long,Long> pair=(Pair<Long,Long>)ips.get(mid).keySet().toArray()[0];
if(value>=pair.getFirst()&&value<=pair.getSecond())
{
return (String)ips.get(mid).values().toArray()[0];
}
else if(value>pair.getSecond())
{
low=mid+1;
}
else if(value<pair.getFirst()) {
high = mid - 1;
}
}
return "未知|未知";
}
public synchronized String evaluate(String s)
{
return getProvinceAndCity(s).split("\\|")[0];
}
}
package cn.edu.hust.udf.bean;
import java.io.Serializable;
public class Pair<T,U> implements Serializable {
private T first;
private U second;
public Pair() {
}
public Pair(T first, U second) {
this.first = first;
this.second = second;
}
public T getFirst() {
return first;
}
public void setFirst(T first) {
this.first = first;
}
public U getSecond() {
return second;
}
public void setSecond(U second) {
this.second = second;
}
}
package cn.edu.hust.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public class OSUtils extends UDF{
public String evaluate(String s)
{
if(s.toLowerCase().contains("windows"))
return "Windows";
else if(s.toLowerCase().contains("macos"))
return "MacOS";
else if(s.toLowerCase().contains("linux"))
return "Linux";
else if(s.toLowerCase().contains("android"))
return "Android";
else if(s.toLowerCase().contains("ios"))
return "IOS";
else
return "Others";
}
}
將這個工程打成Jar包,然後就匯入到HIve,具體如下:
#建立自己的函式,詳見工程原始碼
add jar /home/hadoop/logAnalyzeHelper.jar;
#建立臨時函式
create temporary function getOS as 'cn.edu.hust.udf.OSUtils';
create temporary function getBrowser as 'cn.edu.hust.udf.BrowserUtils';
create temporary function getProvince as 'cn.edu.hust.udf.IPUtils';
create temporary function getCity as 'cn.edu.hust.udf.CityUtils';
根據維度匯入資料
#匯入作業系統維度表
insert into dim_os select distinct getOS(http_user_agent) from ods_click_pageviews;
#匯入瀏覽器維度表
insert into dim_browser select distinct getBrowser(http_user_agent) from ods_click_pageviews;
##匯入維度資料
insert into dim_region (city,province)
select distinct a.city as city,a.province as province
from ods_weblog_detail a
join (select distinct province from ods_weblog_detail) b
on a.province=b.province
where a.datestr='2012-01-04';
#匯入資料到明細表
insert into ods_weblog_detail partition(datestr='2012-01-04')
select remote_addr,session,time_local,substring(time_local,0,10) as daystr,substring(time_local,12) as timestr,substring(time_local,0,4) as yearstr,substring(time_local,6,2) as month,
substring(time_local,9,2) as day,substring(time_local,12,2) as hour,split(request,"\\?")[0],status
,body_bytes_sent,http_referer,parse_url(http_referer,'HOST') as ref_host,parse_url(http_referer,'PATH') as ref_path,
parse_url(http_referer,'QUERY') as ref_query,getBrowser(http_user_agent) as http_user_agent,
getOS(http_user_agent) as os ,getProvince(remote_addr),getCity(remote_addr) from ods_click_pageviews;
模組開發
#以時間維度統計
select count(1),yearstr,month,day,hour from ods_weblog_detail
group by yearstr,month,day,hour;
##每一個小時來統計PV
drop table dw_pvs_hour;
create table dw_pvs_hour(year string,month string,day string,hour string,pvs bigint)
row format delimited
fields terminated by '\t';
###插入資料
insert into table dw_pvs_hour
select a.yearstr as year ,a.month as month,a.day as day,a.hour as hour,
count(1) as pvs
from ods_weblog_detail a
group by a.yearstr,a.month,a.day,a.hour;
##以天為維度來進行統計PV
drop table dw_pvs_day;
create table dw_pvs_day(pvs bigint,year string,month string,day string)
row format delimited
fields terminated by '\t';
###插入資料
insert into table dw_pvs_day
select count(1) as pvs,a.year as year,a.month as month,a.day as day from dim_time a
join ods_weblog_detail b
on a.year=b.yearstr and a.month=b.month and a.day=b.day
group by a.year,a.month,a.day;
##以瀏覽器型別來進行統計
drop table dw_pvs_browser;
create table dw_pvs_browser(pvs bigint,browser string,
year string,month string,day string)
row format delimited
fields terminated by '\t';
###匯入資料
insert into dw_pvs_browser
select count(1) as pvs, a.browser as browser,
b.yearstr as year,
b.month as month,b.day
as day from dim_browser a
join ods_weblog_detail b
on a.browser=b.http_user_agent
group by a.browser,b.yearstr,month,day order by pvs desc;
##按照作業系統來進行統計
drop table dw_pvs_os;
create table dw_pvs_os(
pvs bigint,
os string,
year string,
month string,
day string
);
insert into dw_pvs_os
select count(1) as pvs, a.os as os,
b.yearstr as year,
b.month as month,b.day
as day from dim_os a
join ods_weblog_detail b
on a.os=b.os
group by a.os,b.yearstr,month,day order by pvs desc;
##按照地域的維度去統計PV
drop table dw_pvs_region;
create table dw_pvs_region(pvs bigint,province string,
city string,year string,
month string,day string)
row format delimited
fields terminated by '\t';
###匯入資料
insert into dw_pvs_region
select count(1) as pvs,a.province as province,
a.city as city,b.yearstr as year,
b.month as month,b.day as day from dim_region a
join ods_weblog_detail b on
a.province=b.province and a.city=b.city
group by a.province,a.city,b.yearstr,month,day order by pvs desc;
##統計uv
drop table dw_uv;
create table dw_uv(
uv int,
year varchar(4),
month varchar(2),
day varchar(2)
);
###匯入資料
insert into dw_uv
select count(1) as uv,a.yearstr as year,
a.month as month,a.day as day from
(select distinct remote_user,yearstr,month,day from ods_weblog_detail) a
group by a.yearstr,a.month,a.day;
##統計IP
drop table dw_ip;
create table dw_ip(
ip int,
year varchar(4),
month varchar(2),
day varchar(2)
);
###匯入資料
insert into dw_ip
select count(1) as ip,a.yearstr as year,
a.month as month,a.day as day from
(select distinct remote_addr,yearstr,month,day from ods_weblog_detail) a
group by a.yearstr,a.month,a.day;
#人均瀏覽頁面
##總的請求頁面/去重的人數
drop table dw_avgpv_user_d;
create table dw_avgpv_user_d(
day string,
avgpv string);
###插入資料
insert into table dw_avgpv_user_d
select '2012-01-14',sum(b.pvs)/count(b.remote_user) from
(select remote_user,count(1) as pvs from ods_weblog_detail where datestr='2012-01-04' group by remote_user) b;
#按referer維度統計pv總量
##按照小時為進行統計
drop table dw_pvs_referer_h;
create table dw_pvs_referer_h(referer_url string,referer_host string,year string,month string,day string,hour string,pv_referer_cnt bigint);
###插入資料
insert into table dw_pvs_referer_h
select split(http_referer,"\\?")[0],ref_host,yearstr,month,day,hour,count(1) as pv_referer_cnt
from ods_weblog_detail
group by http_referer,ref_host,yearstr,month,day,hour
having ref_host is not null
order by hour asc,day asc,month asc,yearstr asc,pv_referer_cnt desc;
使用Sqoop匯入到MySQL
在MySQL中建立表,然後匯入到MySQL中
#將需要展示的資料匯入到mysql
##mysql 需要建立的表
drop table dw_pvs_hour;
create table dw_pvs_hour(
id int primary key auto_increment,
year varchar(4),
month varchar(2),day varchar(2),
hour varchar(2),pvs int);
###sqoop匯入資料
bin/sqoop export --connect jdbc:mysql://10.211.55.16:3306/log --username root --password root --table dw_pvs_day --columns pvs,year,month,day --export-dir '/user/hive/warehouse/loganalyze.db/dw_pvs_day/' --fields-terminated-by '\t';
drop table dw_pvs_day;
create table dw_pvs_day(
id int primary key auto_increment,
year varchar(4),
month varchar(2),
day varchar(2),
pvs int);
###sqoop匯入資料
bin/sqoop export --connect jdbc:mysql://10.211.55.16:3306/log --username root --password root --table dw_pvs_browser --columns pvs,browser,year,month,day --export-dir '/user/hive/warehouse/loganalyze.db/dw_pvs_browser/' --fields-terminated-by '\t';
drop table dw_pvs_browser;
create table dw_pvs_browser(
id int primary key auto_increment,
browser varchar(20),
year varchar(4),
month varchar(2),
day varchar(2),
pvs int);
create table dw_pvs_os(
id int primary key auto_increment,
pvs bigint,
os varchar(10),
year varchar(4),
month varchar(2),
day varchar(2)
);
###sqoop匯入資料
bin/sqoop export --connect jdbc:mysql://10.211.55.16:3306/log --username root --password root --table dw_pvs_region --columns pvs,province,city,year,month,day --export-dir '/user/hive/warehouse/loganalyze.db/dw_pvs_region/' --fields-terminated-by '\t';
drop table dw_pvs_region;
create table dw_pvs_region(
id int primary key auto_increment,
province varchar(20),
city varchar(20),
year varchar(4),
month varchar(2),
day varchar(2),
pvs int);
###統計uv
drop table dw_uv;
create table dw_uv(
id int primary key auto_increment,
year varchar(4),
month varchar(2),
day varchar(2),
uv int);
##統計ip
drop table dw_ip;
create table dw_ip(
id int primary key auto_increment,
year varchar(4),
month varchar(2),
day varchar(2),
ip int);
##統計人均訪問頁面
drop table dw_avgpv_user_d;
create table dw_avgpv_user_d(
id int primary key auto_increment,
day varchar(12),
avgpv float);
drop table dw_pvs_referer_h;
create table dw_pvs_referer_h(
id int primary key auto_increment,
referer_url varchar(800),
referer_host varchar(200),
year varchar(4),
month varchar(2),
day varchar(2),
hour varchar(2),
pv_referer_cnt bigint);
利用JavaEE將資料視覺化
先建立JavaEE工程:
需要的pom檔案如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.hust</groupId>
<artifactId>ForumLogAnaloyze</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<name>ForumLogAnaloyze Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<!-- spring版本號 -->
<spring.version>4.0.2.RELEASE</spring.version>
<mybatis.version>3.2.6</mybatis.version>
<!-- log4j日誌檔案管理包版本 -->
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.6.2.RELEASE</version>
</dependency>
<!-- mybatis核心包 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<!-- mybatis/spring包 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.2.2</version>
</dependency>
<!-- 匯入javaee jar 包 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- 匯入Mysql資料庫連結jar包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.36</version>
</dependency>
<!-- 匯入dbcp的jar包,用來在applicationContext.xml中配置資料庫 -->
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.0</version>
</dependency>
<!-- 日誌檔案管理包 -->
<!-- log start -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- 格式化物件,方便輸出日誌 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.41</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- log end -->
<!-- 映入JSON -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<!--redis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- 上傳元件包 -->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
<version>1.9.1</version></dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.2</version>
<configuration>
<!--配置檔案的位置-->
<configurationFile>src/main/resources/generatorConfig.xml</configurationFile>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
<executions>
<execution>
<id>Generate MyBatis Artifacts</id>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
工程架構如下
具體的實現程式碼,可以參考我的GitHub這裡,