基於歷史資料的使用者訪問次數,每天新老使用者,日活,周活,月活的hive計算
最近有一個需求,統計每天的新老使用者,日活,周活,月活。
我們每天的增量資料會加入到hive歷史資料表中,包含使用者訪問網站的一些資訊,欄位有很多,包括使用者唯一標識guid。
當然了日活,周活,月活就是一個count(distinct(guid))語句,非常常用的sql。
但是這裡的問題是:
A:每天的新老使用者應該怎麼統計呢?
B:這還不簡單,判斷使用者guid是否存在與歷史庫guid中嘛?
A:歷史資料幾十個T,大概一百億行,你要每天將當日資料(2~3億行)與歷史資料幾億行進行join判斷?
B:額,這個,這個,好像不行哦!
是的,歷史資料裡面是使用者網站訪問行為,同一個使用者在同一天,不同的天都有可能出現,guid在歷史表中會有多次。如果直接join,效能很差,實際上是做了很多不必要的工作。
解決方案:
維護一張使用者表,裡面有4列:guid, starttime, endtime, num,分別是使用者的guid,第一次訪問時間,最後一次訪問時間,訪問天數;
從某個狀態開始,歷史表中guid是唯一的;
當天資料去重後,與歷史庫join,如果guid在歷史庫出現過,則將endtime更新為當天時間,num加一;
否則,這是一個新使用者,插入歷史庫,starttime, endtime都為當天時間,num初始值為1。
維護了這麼一張使用者表後,接下來就可以寫hql統計業務了,計算當天新老使用者時,只需要與這個歷史庫進行join就行了(目前為止4千萬),當日guid去重後是1千多萬,這樣就是4千萬~1千萬的join了,與開始4千萬~100億的join,效能會有巨大提升。
hive歷史表的設計與hive相關配置
可以看到這裡hive歷史表history_helper需要頻繁修改,hive表支援資料修改需要在${HIVE_HOME}/conf/hive-site.xml中新增事務支援:
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name >
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
為了提高查詢速度,hive歷史表與增量表這裡都分桶,hive-xite.xml配置:
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
為了提高reduce並行度,也設定一下:
set mapred.reduce.tasks = 50;
這個最好在hive命令列配置,表明只在當前程式使用該配置,就不要配置配置檔案了。
歷史庫建表語句:
create external table if not exists hm2.history_helper
(
guid string,
starttime string,
endtime string,
num int
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ("transactional"="true");
當天增量表,儲存去重後的guid,建表語句:
create table if not exists hm2.daily_helper
(
guid string,
dt string
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ("transactional"="true");
思路
由於這種需要寫成定時模式,所以這裡用python指令碼來實現,將hive查詢結果儲存到本地檔案result.txt,然後python讀取result.txt,連線資料庫,儲存當天的查詢結果。
程式碼
helper.py
#!/usr/bin/python
# -*- coding:utf-8 -*-
# hive更新歷史使用者表,日常查詢,儲存到MySQL
import sys
import datetime
import commands
import MySQLdb
# 獲取起始中間所有日期
def getDays(starttime,endtime,regx):
datestart=datetime.datetime.strptime(starttime,regx)
dateend=datetime.datetime.strptime(endtime,regx)
days = []
while datestart<=dateend:
days.append(datestart.strftime(regx))
datestart+=datetime.timedelta(days=1)
return days
# 獲得指定時間的前 n 天的年、月、日,n取負數往前,否則往後
def getExacYes(day, regx, n):
return (datetime.datetime.strptime(day,regx) + datetime.timedelta(days=n)).strftime(regx)
# 獲得距離現在天數的年、月、日,n 取值正負含義同上,昨天就是getYes(regx,-1)
def getYes(regx, n):
now_time = datetime.datetime.now()
yes_time = now_time + datetime.timedelta(days=n)
yes_time_nyr = yes_time.strftime(regx)
return yes_time_nyr
# 執行hive命令
def execHive(cmd):
print cmd
res = commands.getstatusoutput(cmd)
return res
# 獲得當前是星期幾
def getWeek(regx):
now_time = datetime.datetime.now()
week = now_time.strftime(regx)
return week
# 格式化日期,加上雙引號
def formatDate(day):
return "\"" + day + "\""
# 資料儲存到mysql
def insertMysql(dt, path, tbName, regx):
# new, dayAll, stay
values = []
with open(path) as file:
line = file.readline()
while line:
values.append(line.strip())
line = file.readline()
dayAll = int(values[1])
new = float(values[0])/dayAll
old = 1 - new
# 獲取資料庫連線
conn = MySQLdb.connect("0.0.0.0", "statistic", "123456", "statistic")
# 獲取遊標
cursor = conn.cursor()
# 查詢昨天的使用者人數
yesDay = getExacYes(dt, regx, -1)
sql = 'select dayAll from %s where dt = %s'%(tbName, formatDate(yesDay))
try:
cursor.execute(sql)
except Exception as e:
print e
yesAll = int(cursor.fetchall()[0][0])
stay = float(values[2]) / yesAll
print stay
# 獲取遊標
cursor2 = conn.cursor()
sql = 'insert into %s\
values("%s",%f,%f,%f,%d)'%(tbName, dt, new, old, stay, dayAll)
print sql
try:
cursor2.execute(sql)
conn.commit()
except:
conn.rollback()
finally:
conn.close()
# 初始化,刪除臨時表,並且建立
def init():
# 設定分桶環境
cmd = 'source /etc/profile;hive -e \'set hive.enforce.bucketing = true;set mapred.reduce.tasks = 50;\''
(status,result) = execHive(cmd)
# 清除當天的臨時表,結果儲存
cmd = 'source /etc/profile;hive -e \'drop table hm2.daily_helper;\''
(status,result) = execHive(cmd)
if status == 0:
print '%s昨天臨時表刪除完畢...'%(day)
else:
print result
sys.exit(1)
cmd = 'source /etc/profile;hive -e \'create table if not exists hm2.daily_helper\
(\
guid string,\
dt string\
)\
clustered by(guid) into 50 buckets \
stored as orc TBLPROPERTIES ("transactional"="true");\''
(status,result) = execHive(cmd)
if status == 0:
print '%s臨時表建立完畢...'%(day)
else:
print result
sys.exit(1)
# 主函式入口
if __name__ == '__main__':
regx = '%Y-%m-%d'
resultPath = '/home/hadoop/statistic/flash/helper/result.txt'
days = getDays('2018-07-01','2018-07-20',regx)
tbName = 'statistic_flash_dailyActive_helper'
for day in days:
init()
# 當天資料去重後儲存到臨時表daily_helper
cmd = 'source /etc/profile;hive -e \'insert into hm2.daily_helper select distinct(guid),dt from hm2.helper \
where dt = "%s" and guid is not null;\''%(day)
print '%s資料正在匯入臨時表...'%(day)
(status,result) = execHive(cmd)
if status == 0:
print '%s資料匯入臨時表完畢...'%(day)
else:
print result
sys.exit(1)
# guid存在則更新 endtime 與 num
cmd = 'source /etc/profile;hive -e \'update hm2.history_helper set endtime = "%s",num = num + 1 \
where guid in (select guid from hm2.daily_helper);\''%(day)
print '正在更新endtime 與 num...'
(status,result) = execHive(cmd)
if status == 0:
print '%s history_helper資料更新完畢'%(day)
else :
print result
sys.exit(1)
# 當天新使用者
cmd = 'source /etc/profile;hive -e \'select count(1) from hm2.daily_helper \
where guid not in (select guid from hm2.history_helper);\' > %s'%(resultPath)
(status,result) = execHive(cmd)
if status != 0:
print result
sys.exit(1)
# 不存在插入
cmd = 'source /etc/profile;hive -e \'insert into hm2.history_helper\
select daily.guid,dt,dt,1 from hm2.daily_helper daily\
where daily.guid not in (select guid from hm2.history_helper where guid is not null);\''
print '正在插入資料到history_helper表...'
(status,result) = execHive(cmd)
if status == 0:
print '%s資料插入hm2.history_helper表完成'%(day)
else:
print result
sys.exit(1)
# 當天總人數
cmd = 'source /etc/profile;hive -e \'select count(1) from hm2.daily_helper;\' >> %s'%(resultPath)
(status,result) = execHive(cmd)
if status != 0:
print result
sys.exit(1)
# 次日活躍留存
cmd = 'source /etc/profile;hive -e \'select count(1) from\
(select guid from hm2.helper where dt = "%s" group by guid) yes\
inner join\
(select guid from hm2.helper where dt = "%s" group by guid) today\
where yes.guid = today.guid;\' >> %s'%(getExacYes(day, regx, -1), day, resultPath)
(status,result) = execHive(cmd)
if status != 0:
print result
sys.exit(1)
# 結果儲存到mysql
insertMysql(day, resultPath, tbName, regx)
print '=========================%s hive 查詢完畢,結果儲存資料到mysql完成=============================='%(day)
這是在處理歷史資料,然後就是每天定時處理了,在linux crontab里加個定時器任務就好了。