數倉流程的報表設計及應用
1.背景:先用sqoop全量抽取資料,再增量抽取,整合到全量表中,作為ods層;進行業務處理到作為dwd層,結果到dm層;再sqoop將結果抽到mysql。
ods層:
#!/bin/bash
# ********************************************************************************
# 程式名稱: online_tab_user_order
# 功能描述: 將mysql中online_tab_user_order表資料sqoop全量抽取到hive中
# 輸入引數:
#
# 輸入資源:
# 輸出資源:
#
# 中間資源:
# 建立人員: csq
# 建立日期:
# 版本說明:
# 修改人員:
# 修改日期:
# 修改原因:
# 版本說明:
#
# ********************************************************************************
# ********************************************************************************
VC_DBLIST='10.68.21.92,3306,hue,"xxx",user_online'
VC_DBLIST1='10.68.21.92,3306,hue,"xxx",user_online_other'
HIVE_SERVER='10.68.25.198:10000'
export HADOOP_USER_NAME=hue頁面賬號(hadoop叢集賬號)
dblist=${VC_DBLIST}
dbhost=`echo "${dblist}" |awk -F, '{print $1}'`
dbport=`echo "${dblist}" |awk -F, '{print $2}'`
dbuser=`echo "${dblist}" |awk -F, '{print $3}'`
dbpw=`echo "${dblist}" |awk -F, '{print $4}'`
dbname=`echo "${dblist}" |awk -F, '{print $5}'`
if [ $# -eq 0 ];
then
p_in_time_str=`date -d @date -d today +'%Y-%m-%d'' 00:00:00'`
p_in_time_end=$p_in_time_str
elif [ $# -eq 1 ];
then
p_in_time_str=$1
p_in_time_end=$1
elif [ $# -eq 2 ];
then
p_in_time_str=$1
p_in_time_end=$2
else
p_in_time_str=$1
p_in_time_end=$2
fi
vc_stat_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_start=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_ed=`date -d "$p_in_time_end next-day" +'%Y-%m-%d'' 00:00:00'`
vi_load_st=`date -d "$p_in_time_str" +'%Y%m%d'`
vi_load_ed=`date -d "$p_in_time_end" +'%Y%m%d'`
vc_load_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vi_stat_st=`date -d "$p_in_time_str 1 day ago" +'%Y-%m-%d'' 00:00:00'`
vi_stat_ed=`date -d "$p_in_time_end" +'%Y-%m-%d'' 00:00:00'`
vi_stat=`date -d "$p_in_time_str 1 day ago" +'%Y%m%d'`
vi_part_drop=`date -d "$p_in_time_str 1080 day ago" +'%Y%m%d'`
echo $p_in_time_str","$p_in_time_end","$vc_stat_st","$vc_stat_ed","$vi_stat_st","$vi_stat_ed
sqoop_time=`date -d @date -d today +'%Y-%m-%d'`
qt=`date -d "2 days ago" +'%Y-%m-%d'' 24:00:00'`
ye=`date -d yesterday +'%Y-%m-%d'' 24:00:00'`
{#先建表,其實也可以不建表,因為sqoop會自主對應對映mysql表到hive。但要想用自己指定的欄位型別,先建表比較好
beeline -u jdbc:hive2://${HIVE_SERVER} -n ${HADOOP_USER_NAME} -e "
drop table online_ods.online_all_tab_user_order;
create table online_ods.online_all_tab_user_order(
order_id string,
brand_name string,
channel_name string,
county_name string,
create_date string,
customer_type_name string,
customer_type_name_lv1 string,
des_branch_name string,
name_city string,
name_province string,
price string,
product_mode_name string,
product_name string,
product_spec_name string,
product_type_name string,
quantity string,
report_date string,
salesmoney string,
shop_id string,
shop_name string,
shopper_addr string,
shopper_name string,
shopper_phone string,
subcompany_name string,
user_id string,
coupons1 string,
coupons2 string,
coupons3 string,
confirm_date string,
work_create_date string)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\n' STORED AS TEXTFILE;"
}&&{#mysql中是分庫分表的,分表就只能挨個迴圈讀取了
for table in tab_user_order_0 tab_user_order_1 tab_user_order_2 tab_user_order_3 tab_user_order_4 tab_user_order_5 tab_user_order_6 tab_user_order_7 tab_user_order_8 tab_user_order_9
do
{#注意:出現斷裂一般是--引數之間不止一個空格,如果出現warehouse中有表,但hue頁面無表一般是缺少--hive-import這個參
#數,\$CONDITIONS這個關鍵字一定不能忽略。--target-dir:sqoop抽取時先把資料放到hdfs,再put過去,中間會建一個臨時的
#表,這個就是hdfs的臨時表放的位置,如果要寫成warehouse的位置,你會發現只有最後一張表的對於分表來說。
sudo -u hdfs sqoop import --hive-drop-import-delims --connect jdbc:mysql://${dbhost}:${dbport}/${dbname} --username ${dbuser} --password ${dbpw} --delete-target-dir --lines-terminated-by '\n' --fields-terminated-by '\001' --null-string '\\N' --null-non-string '\\N' --hive-import --num-mappers 1 --query "SELECT * from $table where \$CONDITIONS " --target-dir /tmp/hive-root/ --delete-target-dir --hive-table online_ods.online_all_tab_user_order
time=`date "+%Y-%m-%d %H:%M:%S"`
echo $tabe $time "is done"
echo "--------------------------finish----------------------------------"
}
done
}
#增量
#!/bin/bash
# ********************************************************************************
# 程式名稱: online_tab_user_order
# 功能描述: 將mysql中online_tab_user_order表資料sqoop抽取到hive中
# 輸入引數:
#
# 輸入資源:
# 輸出資源:
#
# 中間資源:
# 建立人員: csq
# 建立日期:
# 版本說明:
# 修改人員:
# 修改日期:
# 修改原因:
# 版本說明:
#
# ********************************************************************************
# ********************************************************************************
#mysql中online_tab_user_order表資料sqoop抽取到hive中
VC_DBLIST='10.68.21.92,3306,hue,"xxx",user_online'
VC_DBLIST1='10.68.21.92,3306,hue,"xxxx",user_online_other'
export HADOOP_USER_NAME=
HIVE_SERVER='10.68.25.198:10000'
dblist=${VC_DBLIST}
dbhost=`echo "${dblist}" |awk -F, '{print $1}'`
dbport=`echo "${dblist}" |awk -F, '{print $2}'`
dbuser=`echo "${dblist}" |awk -F, '{print $3}'`
dbpw=`echo "${dblist}" |awk -F, '{print $4}'`
dbname=`echo "${dblist}" |awk -F, '{print $5}'`
if [ $# -eq 0 ];
then
p_in_time_str=`date -d @date -d today +'%Y-%m-%d'' 00:00:00'`
p_in_time_end=$p_in_time_str
elif [ $# -eq 1 ];
then
p_in_time_str=$1
p_in_time_end=$1
elif [ $# -eq 2 ];
then
p_in_time_str=$1
p_in_time_end=$2
else
p_in_time_str=$1
p_in_time_end=$2
fi
vc_stat_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_start=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_ed=`date -d "$p_in_time_end next-day" +'%Y-%m-%d'' 00:00:00'`
vi_load_st=`date -d "$p_in_time_str" +'%Y%m%d'`
vi_load_ed=`date -d "$p_in_time_end" +'%Y%m%d'`
vc_load_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vi_stat_st=`date -d "$p_in_time_str 1 day ago" +'%Y-%m-%d'' 00:00:00'`
vi_stat_ed=`date -d "$p_in_time_end" +'%Y-%m-%d'' 00:00:00'`
vi_stat=`date -d "$p_in_time_str 1 day ago" +'%Y%m%d'`
vi_part_drop=`date -d "$p_in_time_str 1080 day ago" +'%Y%m%d'`
echo $p_in_time_str","$p_in_time_end","$vc_stat_st","$vc_stat_ed","$vi_stat_st","$vi_stat_ed
sqoop_time=`date -d @date -d today +'%Y-%m-%d'`
qt=`date -d "2 days ago" +'%Y-%m-%d'' 24:00:00'`
ye=`date -d yesterday +'%Y-%m-%d'' 24:00:00'`
{
beeline -u jdbc:hive2://${HIVE_SERVER} -n ${HADOOP_USER_NAME} -e "
drop table online_ods.online_tab_user_order;#注意要先刪除,因為每天抽增量
create table online_ods.online_tab_user_order(
order_id string,
brand_name string,
channel_name string,
county_name string,
create_date string,
customer_type_name string,
customer_type_name_lv1 string,
des_branch_name string,
name_city string,
name_province string,
price string,
product_mode_name string,
product_name string,
product_spec_name string,
product_type_name string,
quantity string,
report_date string,
salesmoney string,
shop_id string,
shop_name string,
shopper_addr string,
shopper_name string,
shopper_phone string,
subcompany_name string,
user_id string,
coupons1 string,
coupons2 string,
coupons3 string,
confirm_date string,
work_create_date string)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\n' STORED AS TEXTFILE;"
}&&{
for table in tab_user_order_0 tab_user_order_1 tab_user_order_2 tab_user_order_3 tab_user_order_4 tab_user_order_5 tab_user_order_6 tab_user_order_7 tab_user_order_8 tab_user_order_9
do
{
sudo -u hdfs sqoop import --hive-drop-import-delims --connect jdbc:mysql://${dbhost}:${dbport}/${dbname} --username ${dbuser} --password ${dbpw} --delete-target-dir --lines-terminated-by '\n' --fields-terminated-by '\001' --null-string '\\N' --null-non-string '\\N' --hive-import --num-mappers 1 --query "SELECT * from $table where work_create_date> '${qt}' and work_create_date <='${ye}' and \$CONDITIONS " --target-dir /tmp/hive-root/ --delete-target-dir --hive-table online_ods.online_tab_user_order
time=`date "+%Y-%m-%d %H:%M:%S"`
echo $tabe $time "is done"
echo "--------------------------finish----------------------------------"
}
done
}&&{
beeline -u jdbc:hive2://${HIVE_SERVER} -n ${HADOOP_USER_NAME} -e "
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=false;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=10000;
INSERT into TABLE online_ods.online_all_tab_user_order
SELECT
order_id ,
brand_name ,
channel_name ,
county_name ,
create_date ,
customer_type_name ,
customer_type_name_lv1 ,
des_branch_name ,
name_city ,
name_province ,
price ,
product_mode_name ,
product_name ,
product_spec_name ,
product_type_name ,
quantity ,
report_date ,
salesmoney ,
shop_id ,
shop_name ,
shopper_addr ,
shopper_name ,
shopper_phone ,
subcompany_name ,
user_id ,
coupons1 ,
coupons2 ,
coupons3 ,
confirm_date ,
work_create_date
FROM online_ods.online_tab_user_order;"
}
dwd:
#!/bin/bash
# ********************************************************************************
# 程式名稱: 意向使用者-促銷活動使用者數
# 功能描述: Tlink使用者資產管理應用中,截止當前銷售代表主動所新增的意向使用者數(來源:使用者資料中心)
# 輸入引數:
#
# 輸入資源:
# 輸出資源:
#
# 中間資源:
# 建立人員: csq
# 建立日期:
# 版本說明:
# 修改人員:
# 修改日期:
# 修改原因:
# 版本說明:
#
# ********************************************************************************
# ********************************************************************************
#mysql中online_tab_user_order表資料sqoop抽取到hive中
VC_DBLIST='10.68.25.201,3306,admin,"123$#sadY23",user_online'
VC_DBLIST1='10.68.25.201,3306,admin,"123$#sadY23",user_online_other'
HIVE_SERVER='10.68.25.198:10000'
export HADOOP_USER_NAME=chensiqing
dblist=${VC_DBLIST}
dbhost=`echo "${dblist}" |awk -F, '{print $1}'`
dbport=`echo "${dblist}" |awk -F, '{print $2}'`
dbuser=`echo "${dblist}" |awk -F, '{print $3}'`
dbpw=`echo "${dblist}" |awk -F, '{print $4}'`
dbname=`echo "${dblist}" |awk -F, '{print $5}'`
if [ $# -eq 0 ];
then
p_in_time_str=`date -d @date -d today +'%Y-%m-%d'' 00:00:00'`
p_in_time_end=$p_in_time_str
elif [ $# -eq 1 ];
then
p_in_time_str=$1
p_in_time_end=$1
elif [ $# -eq 2 ];
then
p_in_time_str=$1
p_in_time_end=$2
else
p_in_time_str=$1
p_in_time_end=$2
fi
vc_stat_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_start=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_ed=`date -d "$p_in_time_end next-day" +'%Y-%m-%d'' 00:00:00'`
vi_load_st=`date -d "$p_in_time_str" +'%Y%m%d'`
vi_load_ed=`date -d "$p_in_time_end" +'%Y%m%d'`
vc_load_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vi_stat_st=`date -d "$p_in_time_str 1 day ago" +'%Y-%m-%d'' 00:00:00'`
vi_stat_ed=`date -d "$p_in_time_end" +'%Y-%m-%d'' 00:00:00'`
vi_stat=`date -d "$p_in_time_str 1 day ago" +'%Y%m%d'`
vi_part_drop=`date -d "$p_in_time_str 1080 day ago" +'%Y%m%d'`
echo $p_in_time_str","$p_in_time_end","$vc_stat_st","$vc_stat_ed","$vi_stat_st","$vi_stat_ed
createDate=`date -d @date -d today +'%Y-%m-%d'`
qt=`date -d "2 days ago" +'%Y-%m-%d'' 24:00:00'`
ye=`date -d yesterday +'%Y-%m-%d'' 24:00:00'`
{
beeline -u jdbc:hive2://${HIVE_SERVER} -n ${HADOOP_USER_NAME} -e "
drop table online_dw.actionUserResult_tab;
create TABLE online_dw.actionUserResult_tab(
subcompany_name string,
number int
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
INSERT into TABLE online_dw.actionUserResult_tab
select w.sn,count(w.intention_id) from
(
SELECT distinct s.subcompany_name sn,s.user_id ,s.user_name,ia.intention_id
FROM online_ods.online_all_crm_user_shop s
left JOIN
(
select i.user_id,i.intention_id FROM online_ods.online_all_tab_user_intention i
where i.intention_source ='2' and i.user_stage in ('0','2')
) ia on ia.user_id=s.user_id
WHERE s.subcompany_name LIKE '%分公司%'
) w
group by w.sn;"
time=`date "+%Y-%m-%d %H:%M:%S"`
echo $time "table online_dw.actionUserResult_tab drop create insert is done"
echo "--------------------------finish----------------------------------"
}
#將所有結果表合成想要的報表
#!/bin/bash
# ********************************************************************************
# 程式名稱: 復購使用者-累計已購
# 功能描述: 截止當前訂單數大於等於2的使用者人數
# 輸入引數:
#
# 輸入資源:
# 輸出資源:
#
# 中間資源:
# 建立人員: csq
# 建立日期:
# 版本說明:
# 修改人員:
# 修改日期:
# 修改原因:
# 版本說明:
#
# ********************************************************************************
# ********************************************************************************
#mysql中online_tab_user_order表資料sqoop抽取到hive中
VC_DBLIST='10.68.25.201,3306,admin,"XXXX",user_online'
VC_DBLIST1='10.68.25.201,3306,admin,"XXXXX",user_online_other'
HIVE_SERVER='10.68.25.198:10000'
export HADOOP_USER_NAME=
dblist=${VC_DBLIST}
dbhost=`echo "${dblist}" |awk -F, '{print $1}'`
dbport=`echo "${dblist}" |awk -F, '{print $2}'`
dbuser=`echo "${dblist}" |awk -F, '{print $3}'`
dbpw=`echo "${dblist}" |awk -F, '{print $4}'`
dbname=`echo "${dblist}" |awk -F, '{print $5}'`
if [ $# -eq 0 ];
then
p_in_time_str=`date -d @date -d today +'%Y-%m-%d'' 00:00:00'`
p_in_time_end=$p_in_time_str
elif [ $# -eq 1 ];
then
p_in_time_str=$1
p_in_time_end=$1
elif [ $# -eq 2 ];
then
p_in_time_str=$1
p_in_time_end=$2
else
p_in_time_str=$1
p_in_time_end=$2
fi
vc_stat_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_start=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_ed=`date -d "$p_in_time_end next-day" +'%Y-%m-%d'' 00:00:00'`
vi_load_st=`date -d "$p_in_time_str" +'%Y%m%d'`
vi_load_ed=`date -d "$p_in_time_end" +'%Y%m%d'`
vc_load_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vi_stat_st=`date -d "$p_in_time_str 1 day ago" +'%Y-%m-%d'' 00:00:00'`
vi_stat_ed=`date -d "$p_in_time_end" +'%Y-%m-%d'' 00:00:00'`
vi_stat=`date -d "$p_in_time_str 1 day ago" +'%Y%m%d'`
vi_part_drop=`date -d "$p_in_time_str 1080 day ago" +'%Y%m%d'`
echo $p_in_time_str","$p_in_time_end","$vc_stat_st","$vc_stat_ed","$vi_stat_st","$vi_stat_ed
createDate=`date -d @date -d today +'%Y-%m-%d'`
qt=`date -d "2 days ago" +'%Y-%m-%d'' 24:00:00'`
ye=`date -d yesterday +'%Y-%m-%d'' 24:00:00'`
{
beeline -u jdbc:hive2://${HIVE_SERVER} -n ${HADOOP_USER_NAME} -e "
drop table online_dm.Report;
create TABLE online_dm.Report(
subcompany_name string,
sum_add_user int,
sum_week_add_user int,
sum_action_add_user int,
sum_week_action_add_user int,
sum_ordered_add_user int,
sum_week_ordered_add_user int,
sum_transform_add_user int,
sum_week_transform_add_user int,
sum_old_order_add_user int,
sum_week_old_order_user int
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
INSERT into TABLE online_dm.Report
SELECT a.subcompany_name,a.number,b.number,c.number,d.number,e.number,f.number,g.number,h.number,i.number,j.number
from
online_dw.adduserresult_tab a
left join
online_dw.addweekuserresult_tab b on a.subcompany_name=b.subcompany_name
left join
online_dw.actionuserresult_tab c on b.subcompany_name=c.subcompany_name
left join
online_dw.actionweekuserresult_tab d on c.subcompany_name=d.subcompany_name
left join
online_dw.orderedadduserresult_tab e on d.subcompany_name=e.subcompany_name
left join
online_dw.orderedaddweekuserresult_tab f on e.subcompany_name=f.subcompany_name
left join
online_dw.transformadduserresult_tab g on f.subcompany_name=g.subcompany_name
left join
online_dw.transformweekadduserresult_tab h on g.subcompany_name=h.subcompany_name
left join
online_dw.oldorderedthantwo i on h.subcompany_name=i.subcompany_name
left join
online_dw.oldweekorderedthantwo j on i.subcompany_name=j.subcompany_name ;"
time=`date "+%Y-%m-%d %H:%M:%S"`
echo $time "online_dm.Report drop create insert is done"
echo "--------------------------finish----------------------------------"
}
dm層
#!/bin/bash
# ********************************************************************************
# 程式名稱: 報表匯出到mysql
# 功能描述: 將dm層的report資料sqoop抽取到hive中
# 輸入引數:
#
# 輸入資源:
# 輸出資源:
#
# 中間資源:
# 建立人員: csq
# 建立日期:
# 版本說明:
# 修改人員:
# 修改日期:
# 修改原因:
# 版本說明:
#
# ********************************************************************************
# ********************************************************************************
VC_DBLIST='10.68.25.201,3306,admin,"xxxx",user_online'
VC_DBLIST1='10.68.25.201,3306,admin,"xxxxx",user_online_other'
HIVE_SERVER='10.68.25.198:10000'
export HADOOP_USER_NAME=
dblist=${VC_DBLIST}
dbhost=`echo "${dblist}" |awk -F, '{print $1}'`
dbport=`echo "${dblist}" |awk -F, '{print $2}'`
dbuser=`echo "${dblist}" |awk -F, '{print $3}'`
dbpw=`echo "${dblist}" |awk -F, '{print $4}'`
dbname=`echo "${dblist}" |awk -F, '{print $5}'`
if [ $# -eq 0 ];
then
p_in_time_str=`date -d @date -d today +'%Y-%m-%d'' 00:00:00'`
p_in_time_end=$p_in_time_str
elif [ $# -eq 1 ];
then
p_in_time_str=$1
p_in_time_end=$1
elif [ $# -eq 2 ];
then
p_in_time_str=$1
p_in_time_end=$2
else
p_in_time_str=$1
p_in_time_end=$2
fi
vc_stat_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_start=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vc_stat_ed=`date -d "$p_in_time_end next-day" +'%Y-%m-%d'' 00:00:00'`
vi_load_st=`date -d "$p_in_time_str" +'%Y%m%d'`
vi_load_ed=`date -d "$p_in_time_end" +'%Y%m%d'`
vc_load_st=`date -d "$p_in_time_str" +'%Y-%m-%d'' 00:00:00'`
vi_stat_st=`date -d "$p_in_time_str 1 day ago" +'%Y-%m-%d'' 00:00:00'`
vi_stat_ed=`date -d "$p_in_time_end" +'%Y-%m-%d'' 00:00:00'`
vi_stat=`date -d "$p_in_time_str 1 day ago" +'%Y%m%d'`
vi_part_drop=`date -d "$p_in_time_str 1080 day ago" +'%Y%m%d'`
echo $p_in_time_str","$p_in_time_end","$vc_stat_st","$vc_stat_ed","$vi_stat_st","$vi_stat_ed
sqoop_time=`date -d @date -d today +'%Y-%m-%d'`
qt=`date -d "2 days ago" +'%Y-%m-%d'' 24:00:00'`
ye=`date -d yesterday +'%Y-%m-%d'' 24:00:00'`
{#將生成的報表sqoop倒到mysql中,注意加上?useUnicode=true&characterEncoding=utf-8防止中文亂碼,還要提前mysql中建
#好表。每次sqoop倒出時,要注意truncat清空表,覆蓋不了的
sudo -u hdfs sqoop export --connect "jdbc:mysql://${dbhost}:${dbport}/${dbname}?useUnicode=true&characterEncoding=utf-8" --username ${dbuser} --password ${dbpw} --num-mappers 1 --export-dir /user/hive/warehouse/online_dm.db/report --table report --input-fields-terminated-by '\001' --input-null-string '\\N' --input-null-non-string '\\N'
time=`date "+%Y-%m-%d %H:%M:%S"`
echo " report at " $time "is done"
echo "--------------------------finish----------------------------------"
}