1. 程式人生 > >基於Hive的手機應用資訊統計分析系統

基於Hive的手機應用資訊統計分析系統

目錄

專案概要

需求

手機應用日誌

定期離線分析手機應用新增使用者、活躍使用者、沉默使用者、啟動次數、版本分佈和留存使用者等業務指標。

工作流程

  1. 手機APP啟動時,上報啟動日誌、錯誤日誌、頁面日誌、事件日誌、使用時長日誌等資訊到日誌收集伺服器。
  2. 日誌收集伺服器將收集到的日誌資訊傳送給kafka。
  3. Flume分別消費kafka中的5種主題資訊,並把資料儲存到HDFS上。
  4. 通過crontab任務排程定時把HDFS中的資訊拷貝到Hive資料倉庫中。
  5. 核心業務操作採用Hive查詢。

一般資料開發就負責2到5或6(展示),即資料收集和處理或加上展示。

具體實現

公共模板、手機端日誌生成模組、日誌收集web模組、flume攔截器、hive自定義函式

日誌

public class AppBaseLog implements Serializable 

日誌基類包含日誌建立時間、應用ID、租戶企業ID、裝置ID(手機使用者ID)、應用版本、應用下載渠道、作業系統及其版本、機型等資訊。

其他日誌繼承自基類,錯誤日誌類、事件日誌類、頁面日誌類、啟動日誌類、使用時長日誌類、地理資訊類。

還有一個AppLogEntity類,裡面包含了這些上述日誌的陣列,手機應用每次傳送的日誌資訊就是使用這個類。

手機客戶端

一個模擬日誌生成的類。

資料收集模板

WebController類,將收集到的日誌資訊傳送給Kafka。

一個手機使用者傳送一份AppLogEntity物件資料,裡面包含了各種日誌型別的資料。將這些資料分別轉化為Json格式,然後通過Kafka的ProducerRecord傳送到Kafka叢集。

總體流程

Tomcat執行web程式,generateData類不斷向埠傳送日誌資訊。web類的controller層將資料傳送到kafka叢集。搭建Flume管道,從Kafka叢集中拉取資料到HDFS。

編寫Hive指令碼,通過crondtab實現每隔一段時間從HDFS上匯入資料到Hive。

Hive UDF類

根據輸入的時間資訊:

  • 返回當天的起始時間;

  • 返回本週的起始時間;

  • 返回本月的起始時間;

根據輸入的時間和時間格式化資訊:返回按照格式化要求顯示的資訊。

新建一個DateUtil類,裡面有輸入時間獲取相應起始時間的函式。

新建一個DayBegin類,繼承UDF類,重寫並重載各種evaluate方法。實現的方法包括無參返回當天的零點;偏移量引數返回當天加偏移量的那一天的零點;指定日期加偏移量返回該天的起始時間;根據String計算某天的起始時間;string時間 + offset;string時間 + 指定時間格式;string時間 + 指定時間格式 + 偏移。

按DayBegin類的思路,再實現周、月的類。

還有一個FormatTimeUDF類,增加根據long時間 + string指定時間格式 返回string時間;根據string時間 + string指定時間格式 返回string時間;根據long時間 +string指定時間格式 + 輔助引數 返回周內第一天的string時間。

public class FormatTimeUDF extends UDF {

// 根據輸入的時間毫秒值(long型別)和格式化要求,返回String型別時間
public String evaluate(long ms,String fmt) throws ParseException {

    SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
    Date d = new Date();
    d.setTime(ms);

    return sdf.format(d) ;
}

// 根據輸入的時間毫秒值(String型別)和格式化要求,返回String型別時間
public String evaluate(String ms,String fmt) throws ParseException {

    SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
    Date d = new Date();
    d.setTime(Long.parseLong(ms));

    return sdf.format(d) ;
}

// 根據輸入的時間毫秒值(long型別)、格式化要求,和區分周的任意值,返回String型別時間
public String evaluate(long ms ,String fmt , int week) throws ParseException {

    Date d = new Date();
    d.setTime(ms);

    //周內第一天
    Date firstDay = DateUtil.getWeekBeginTime(d) ;
    SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;

    return sdf.format(firstDay) ;
}

新增函式:將jar包放到hive/lib,修改hive-site,在hiveclient上註冊函式(一個類一個函式)

HIVE查詢

主要包括:新增使用者、活躍使用者、沉默使用者、留存使用者(上週沒,這周有;這兩週沒,之前有;本週留存)、新鮮度

新增使用者

createdatms為日誌建立時間

# 判斷今天的新增使用者
select count(*)
from (
    select min(createdatms) mintime
    from ext_startup_logs
    where appid = 'sdk34734'
    group by deviceid
    having mintime >= getdaybegin()
)t;
# 昨天就是 mintime >= getdaybegin(-1) and mintime < getdaybegin()

活躍使用者

# 日活躍使用者
select count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734' and createdatms >= getdaybegin() and createdatms < getdaybegin(1);
# 優化,根據時間分割槽表去查詢,避免全表掃描。ym和day在load資料時已經設定好的分割槽名
where ym = formattime(getdaybegin(),'yyyyMM') and day = formattime(getdaybegin(),'dd') and  appid = 'sdk34734';

# 一週內每日的日活躍數。formattime得到每日的開始。
select formattime(createdatms,'yyyy/MM/dd') day ,count(distinct deviceid)
from ext_startup_logs
where concat(ym, day) >= formattime(getdaybegin(-6), "yyyyMMdd")
and appid = 'sdk34734'
group by formattime(createdatms,'yyyy/MM/dd');

# 一次查詢出過去的5周,每週的周活躍數。formattime的0引數是為了得出當日對應的周開始
select formattime(createdatms,'yyyy/MM/dd',0) week ,count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getweekbegin(-6) and createdatms < getweekbegin(-1)
group by formattime(createdatms,'yyyy/MM/dd',0);
# 過去5周,包含本週的每週周活躍數
where concat(ym,day) >= formattime(getweekbegin(-4),'yyyyMMdd') and appid ='sdk34734'

# 一次查詢出過去的三個月內的月活躍數
select formattime(createdatms,'yyyy/MM',0) month ,count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getmonthbegin(-2) and createdatms < getmonthbegin(-1)
group by formattime(createdatms,'yyyy/MM',0);
# 包含本月
where ym >= formattime(getmonthbegin(-2),'yyyyMM') and appid ='sdk34734'

# 連續三週活躍使用者
select deviceid , count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
from ext_startup_logs
where concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd') and appid = 'sdk34734'
group by deviceid
having c = 3;

沉默使用者

條件:只有一條日誌;安裝2天內不算

select count(*)
from (
    select deviceid, count(createdate) c
    from ext_startup_logs
    where concat(ym, day) < formattime(getdaybegin(-2), 'yyyyMMdd') and appid = 'sdk34734'
    group by deviceid
    having c = 1;
);

留存使用者

# 本週迴流,即上週沒啟動過,本週啟動了
select distinct s.deviceid
from ext_startup_logs s
where appid = 'sdk34734' and concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd') and deviceid not in (
    select distinct t.deviceid
    from ext_startup_logs t
    where t.appid = 'sdk34734' 
    and concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd') 
    and concat(t.ym,t.day) < formattime(getweekbegin(),'yyyyMMdd') 
    );

# 連續2周沒有啟動的使用者,之前啟動過
select distinct s.deviceid
from ext_startup_logs s
where concat(ym,day) < formattime(getweekbegin(-1),'yyyyMMdd')
and appid='sdk34734'
and s.deviceid not in (
    select distinct(t.deviceid)
    from ext_startup_logs t
    where concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd')
    and t.appid='sdk34734'
    );
    
# 留存使用者。本週留存指上週新增,且本週活躍。
select distinct s.deviceid
from ext_startup_logs s
where concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd')
and s.appid = 'sdk34734'
and s.deviceid in (
    select tt.deviceid , min(tt.createdatms) mintime
    from ext_startup_logs tt
    where tt.appid = 'sdk34734'
    group by tt.deviceid 
    having mintime >= getweekbegin(-2) 
    and mintime < getweekbegin(-1)
    );

新鮮度分析

# 新鮮度分析,某段時間的新增使用者數/某段時間的活躍的使用者數
# 今天新增
select count(*)
from(
    select min(createdatms) mintime
    from ext_startup_logs
    where appid = 'sdk34734'
    group by deviceid
    having mintime >= getdaybegin() 
    )t;
# 今天活躍
select count(distinct deviceid)
from ext_startup_logs
where concat(ym, day) = formattime(getdaybegin, "yyyyMMdd");
and appid = 'sdk34734'