離線資料分析之 人物興趣取向分析(二)離線/實時專案構建思路
阿新 • • 發佈:2020-12-22
一、離線 vs 實時流框架
以下用spark資料清洗的過程見:日誌分析 https://www.cnblogs.com/sabertobih/p/14070357.html
二、離線(本專案)處理思路
①flume
資料採集,實時監控新增資料,傳輸到kfk (見:https://www.cnblogs.com/sabertobih/p/14115501.html)
②kafka
削峰,實時資料監控命令:https://www.cnblogs.com/sabertobih/p/14024011.html
③kafka-> sparkstreaming -> kafka
格式轉換:https://www.cnblogs.com/sabertobih/p/14136154.html
④kafka-> HBase
由於Rowkey唯一,重複的Rowkey自動覆蓋,可以完成去重
⑤HBase-> hive
建立外部對映表(資料存放在hdfs上,hive用於大批資料的複雜查詢,hbase用於資料的有序對映)
create database if not exists events_db SET hivevar:db=events_db use ${db}; create external table ${db}.users( userid string, birthday string, gender string, locale string, location string, timezone string, joinedAt string ) storedby 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,profile:birthday,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt') tblproperties('hbase.table.name'='event_db:users') ---userfriends create external table event_db.userfriends( ukey string, userid string, friendid string ) storedby 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,uf:userid,uf:friendid') tblproperties('hbase.table.name'='event_db:user_friends') ---events_db create external table event_db.events( eventid string, startTime string, city string, dstate string, zip string, country string, lat string, lng string, userId string, features string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,schedule:startTime,location:city,location:state,location:zip,location:country,location:lat,location:lng,creator:userId,remark:features') tblproperties('hbase.table.name'='event_db:events') ---eventAttendees create external table event_db.eventAttendees( ekey string, eventId string, userId string, status string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,euat:eventId,euat:userId,euat:status') tblproperties('hbase.table.name'='event_db:event_attendees') ---train create external table event_db.train( tkey string, userId string, eventId string, invitedId string, etimestamp string, interested string, notinterested string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,eu:userId,eu:eventId,eu:invitedId,eu:timestamp,eu:interested,eu:notinterested') tblproperties('hbase.table.name'='event_db:train') ---test create external table event_db.test( tkey string, tuser string, event string, invited string, ttimestamp string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,eu:user,eu:event,eu:invited,eu:timestamp') tblproperties('hbase.table.name'='event_db:test')
⑥hive中構建數倉
(一)ods層:資料存在hdfs中的內部表
1. 以ORC格式,查詢速度快(詳見:https://www.jianshu.com/p/91f793a1bdb3)
2. 順帶清洗工作
create table ${db}.users stored as ORC AS SELECT * from ${db}.hb_users drop database if exists ods_events cascade; create database ods_events; use ods_events; create table ods_events.ods_users stored as ORC as select * from event_db.users; create table ods_events.ods_eventsTable stored as ORC as select * from event_db.events; create table ods_events.ods_eventAttendees stored as ORC as select * from event_db.eventAttendees; create table ods_events.ods_userfriends stored as ORC as select * from event_db.userfriends; create table ods_events.ods_train stored as ORC as select * from event_db.train; create table ods_events.ods_test stored as ORC as select * from event_db.test;
(二) dwd層:資料歸一化
//資料歸一化 drop database if exists dwd_events; create database dwd_events; use dwd_events; //birthyear空賦值 create table dwd_users as select userid,locale,location, case when cast(birthyear as int) is null then avg_year else birthyear end as birthyear, case gender when 'male' then -1 when 'female' then 1 else 0 end as gender, case when cast(timezone as int) is null then avg_timezone else timezone end as timezone, case when trim(joinedat)='' or lower(joinedat)='none' then avg_member else unix_timestamp(translate(joinedat,'TZ',' ')) end as members from (select * from ods_events.ods_users cross join ( select floor(avg(cast(birthyear as int))) avg_year, floor(avg(cast(timezone as int))) avg_timezone, floor(avg(unix_timestamp(translate(joinedat,'TZ',' ')))) avg_member from ods_events.ods_users )tmp )a; create table dwd_events.dwd_events as select eventid,unix_timestamp(translate(starttime,'TZ',' '))starttime, city,country, case when cast(lat as float) is null then avg_lat else lat end lat, case when cast(lng as float) is null then avg_lng else lng end lng, userid,features from (ods_events.ods_events cross join (select round(avg(cast(lat as float)),3)avg_lat, round(avg(cast(lng as float)),3)avg_lng from ods_events.ods_events )tmp )a; create table dwd_events.dwd_eventAttendees as select * from ods_events.ods_eventattendees; create table dwd_events.dwd_usersFriends as select * from ods_events.ods_userfriends; create table dwd_events.dwd_train as select trid,userid,eventid,invited,tr_timestamp ttime,interested lable from ods_events.ods_train;
(三)dws層:輕聚合維度表
drop database if exists dws_events; create database dws_events; use dws_events; create temporary macro maxandmin(cdata int ,maxdata int,mindata int) (cdata-mindata)/(maxdata-mindata); create temporary macro calcage(y int) year(current_date())-y; create table dws_events.dws_temp_users as select userid, locale, gender, maxandmin(cage,max_age,min_age) age, maxandmin(timezone,max_timezone,min_timezone) timezone, maxandmin(member_days,max_members,min_members) members from (select userid, case when l.locale is null then 0 else l.localeid end locale, gender, calcage(birthyear) cage, min_age, max_age, timezone, min_timezone, max_timezone, member_days, min_members, max_members from dwd_events.dwd_users u left join dwd_events.dwd_locale l on lower(u.locale)=lower(l.locale) cross join (select min(calcage(birthyear)) min_age,max(calcage(birthyear)) max_age,min(timezone) min_timezone,max(timezone) max_timezone,min(member_days) min_members,max(member_days) max_members from dwd_events.dwd_users) b) c create table dws_events.dws_temp_userEvent as select u.userid, case when uf.friendnum is null then 0 else uf.friendnum end friendnum, case when invite_event_count is null then 0 else invite_event_count end invite_event_count, case when attended_count is null then 0 else attended_count end attended_count, case when not_attended_count is null then 0 else not_attended_count end not_attended_count, case when maybe_attended_count is null then 0 else maybe_attended_count end maybe_attended_count, case when joinnum is null then 0 else joinnum end event_count from dwd_events.dwd_users u left join (select userid,count(friendid) friendnum from dwd_events.dwd_userfriends group by userid) uf on u.userid=uf.userid left join (select userid, sum(case when statu='invited' then 1 else 0 end) as invite_event_count, sum(case when statu='yes' then 1 else 0 end) as attended_count, sum(case when statu='no' then 1 else 0 end) as not_attended_count, sum(case when statu='maybe' then 1 else 0 end) as maybe_attended_count from dwd_events.dwd_eventAttendees group by userid ) ea on u.userid=ea.userid left join (select userid, count(eventid) joinnum from dwd_events.dwd_train group by userid) dt on u.userid=dt.userid;