1. 程式人生 > 實用技巧 >離線資料分析之 人物興趣取向分析(二)離線/實時專案構建思路

離線資料分析之 人物興趣取向分析(二)離線/實時專案構建思路

一、離線 vs 實時流框架

以下用spark資料清洗的過程見:日誌分析 https://www.cnblogs.com/sabertobih/p/14070357.html

二、離線(本專案)處理思路

flume

資料採集,實時監控新增資料,傳輸到kfk (見:https://www.cnblogs.com/sabertobih/p/14115501.html

kafka

削峰,實時資料監控命令:https://www.cnblogs.com/sabertobih/p/14024011.html

kafka-> sparkstreaming -> kafka

格式轉換:https://www.cnblogs.com/sabertobih/p/14136154.html

kafka-> HBase

由於Rowkey唯一,重複的Rowkey自動覆蓋,可以完成去重

HBase-> hive

建立外部對映表(資料存放在hdfs上,hive用於大批資料的複雜查詢,hbase用於資料的有序對映)

create database if not exists events_db
SET hivevar:db=events_db
use ${db};
create external table ${db}.users(
userid string,
birthday string,
gender string,
locale string,
location string,
timezone string,
joinedAt string
)
stored 
by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,profile:birthday,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt') tblproperties('hbase.table.name'='event_db:users') ---userfriends create external table event_db.userfriends( ukey string, userid string, friendid string ) stored
by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,uf:userid,uf:friendid') tblproperties('hbase.table.name'='event_db:user_friends') ---events_db create external table event_db.events( eventid string, startTime string, city string, dstate string, zip string, country string, lat string, lng string, userId string, features string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,schedule:startTime,location:city,location:state,location:zip,location:country,location:lat,location:lng,creator:userId,remark:features') tblproperties('hbase.table.name'='event_db:events') ---eventAttendees create external table event_db.eventAttendees( ekey string, eventId string, userId string, status string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,euat:eventId,euat:userId,euat:status') tblproperties('hbase.table.name'='event_db:event_attendees') ---train create external table event_db.train( tkey string, userId string, eventId string, invitedId string, etimestamp string, interested string, notinterested string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,eu:userId,eu:eventId,eu:invitedId,eu:timestamp,eu:interested,eu:notinterested') tblproperties('hbase.table.name'='event_db:train') ---test create external table event_db.test( tkey string, tuser string, event string, invited string, ttimestamp string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,eu:user,eu:event,eu:invited,eu:timestamp') tblproperties('hbase.table.name'='event_db:test')

hive中構建數倉

(一)ods層:資料存在hdfs中的內部表

1. 以ORC格式,查詢速度快(詳見:https://www.jianshu.com/p/91f793a1bdb3

2. 順帶清洗工作

create table ${db}.users
stored as ORC AS 
    SELECT * from ${db}.hb_users

drop database if exists ods_events cascade;
create database ods_events;
use ods_events;

create table ods_events.ods_users
stored as ORC as select * from event_db.users;
create table ods_events.ods_eventsTable
stored as ORC as select * from event_db.events;
create table ods_events.ods_eventAttendees
stored as ORC as select * from event_db.eventAttendees;
create table ods_events.ods_userfriends
stored as ORC as select * from event_db.userfriends;
create table ods_events.ods_train
stored as ORC as select * from event_db.train;
create table ods_events.ods_test
stored as ORC as select * from event_db.test;

(二) dwd層:資料歸一化

//資料歸一化
drop database if exists dwd_events;
create database dwd_events;
use dwd_events;

//birthyear空賦值
create table dwd_users as 
select userid,locale,location,
case when cast(birthyear as int) is null then avg_year else birthyear end as birthyear,
case gender when 'male' then -1 when 'female' then 1 else 0 end as gender,
case when cast(timezone as int) is null then avg_timezone else timezone end as timezone,
case when trim(joinedat)='' or lower(joinedat)='none' then avg_member else unix_timestamp(translate(joinedat,'TZ',' ')) end as members
from
(select * from ods_events.ods_users 
cross join
(
select floor(avg(cast(birthyear as int))) avg_year,
floor(avg(cast(timezone as int))) avg_timezone,
floor(avg(unix_timestamp(translate(joinedat,'TZ',' ')))) avg_member
from ods_events.ods_users
)tmp
)a;

create table dwd_events.dwd_events as 
select eventid,unix_timestamp(translate(starttime,'TZ',' '))starttime,
city,country,
case when cast(lat as float) is null then avg_lat else lat end lat,
case when cast(lng as float) is null then avg_lng else lng end lng,
userid,features
from 
(ods_events.ods_events cross join
(select round(avg(cast(lat as float)),3)avg_lat,
round(avg(cast(lng as float)),3)avg_lng 
from ods_events.ods_events
)tmp
)a;


create table dwd_events.dwd_eventAttendees as select * from ods_events.ods_eventattendees;
create table dwd_events.dwd_usersFriends as select * from ods_events.ods_userfriends;
create table dwd_events.dwd_train as select trid,userid,eventid,invited,tr_timestamp ttime,interested lable from ods_events.ods_train;

(三)dws層:輕聚合維度表

drop database if exists dws_events;
create database dws_events;
use dws_events;

create temporary macro maxandmin(cdata int ,maxdata int,mindata int) (cdata-mindata)/(maxdata-mindata);

create temporary macro calcage(y int) year(current_date())-y;

create table dws_events.dws_temp_users as
select
userid,
locale,
gender,
maxandmin(cage,max_age,min_age) age,
maxandmin(timezone,max_timezone,min_timezone) timezone,
maxandmin(member_days,max_members,min_members) members
from
(select
userid,
case when l.locale is null then 0 else l.localeid end locale,
gender,
calcage(birthyear) cage,
min_age,
max_age,
timezone,
min_timezone,
max_timezone,
member_days,
min_members,
max_members
from dwd_events.dwd_users u
left join dwd_events.dwd_locale l
on lower(u.locale)=lower(l.locale)
cross join
(select min(calcage(birthyear)) min_age,max(calcage(birthyear)) max_age,min(timezone) min_timezone,max(timezone) max_timezone,min(member_days) min_members,max(member_days) max_members from dwd_events.dwd_users) b) c

create table dws_events.dws_temp_userEvent as
select
u.userid,
case when uf.friendnum is null then 0 else uf.friendnum end friendnum,
case when invite_event_count is null then 0 else invite_event_count end invite_event_count,
case when attended_count is null then 0 else attended_count end attended_count,
case when not_attended_count is null then 0 else not_attended_count end not_attended_count,
case when maybe_attended_count is null then 0 else maybe_attended_count end maybe_attended_count,
case when joinnum is null then 0 else joinnum end event_count
from
dwd_events.dwd_users u
left join
(select userid,count(friendid) friendnum from dwd_events.dwd_userfriends group by userid) uf
on u.userid=uf.userid
left join
(select
userid,
sum(case when statu='invited' then 1 else 0 end) as invite_event_count,
sum(case when statu='yes' then 1 else 0 end) as attended_count,
sum(case when statu='no' then 1 else 0 end) as not_attended_count,
sum(case when statu='maybe' then 1 else 0 end) as maybe_attended_count
from
dwd_events.dwd_eventAttendees group by userid ) ea on u.userid=ea.userid
left join
(select
userid,
count(eventid) joinnum
from dwd_events.dwd_train group by userid) dt
on u.userid=dt.userid;