1. 程式人生 > 其它 >離線數倉(四)

離線數倉(四)

5章 數倉搭建-DIM

5.1 商品維度表(全量)

  1.建表語句

drop table if exists `dim_sku_info`;
create external table dim_sku_info (
    --來自ods_sku_info
    `id` string comment '商品id',
    `price` decimal(16,2) comment '商品價格',
    `sku_name` string comment '商品名稱',
    `sku_desc` string comment '商品描述',
    `weight` decimal
(16,2) comment '重量', `is_sale` boolean comment '是否在售', `spu_id` string comment 'spu編號', `category3_id` string comment '三級分類id', `tm_id` string comment '品牌id', `create_time` string comment '建立時間', --來自ods_spu_info `spu_name` string comment 'spu名稱', --來自ods_base_category3 `category3_name`
string comment '三級分類名稱', `category2_id` string comment '二級分類id', --來自ods_base_category2 `category2_name` string comment '二級分類名稱', `category1_id` string comment '一級分類id', --ods_base_category1 `category1_name` string comment '一級分類名稱', --來自ods_base_trademark `tm_name` string
comment '品牌名稱', --來自ods_sku_attr_value `sku_attr_values` ARRAY<STRUCT<attr_id:STRING,value_id:STRING,attr_name:STRING,value_name:STRING>> comment '平臺屬性', --來自ods_sku_sale_attr_value `sku_sale_attr_values` ARRAY<STRUCT<sale_attr_id:STRING,sale_attr_value_id:STRING,sale_attr_name:STRING,sale_attr_value_name:STRING>> comment '銷售屬性' ) comment '商品維度表' partitioned by (`dt` string) stored as parquet --指定檔案格式parquet location '/warehouse/gmall/dim/dim_sku_info/' --指定HDFS資料儲存路徑 tblproperties ("parquet.compression"="lzo"); --指定壓縮格式為lzo,但檔案字尾為parquet

  2.資料裝載

    1)Hive讀取索引檔案問題

      (1)兩種方式,分別查詢資料有多少行

select * from ods_log;
Time taken: 1.285 seconds, Fetched: 8838 row(s)
select count(*) from ods_log;

      (2)兩次查詢結果不一致

        原因是select * from ods_log不執行MR操作,直接採用的是ods_log建表語句中指定的DeprecatedLzoTextInputFormat,能夠識別lzo.index為索引檔案。

        select count(*) from ods_log執行MR操作,會先經過hive.input.format,其預設值為CombineHiveInputFormat,其會先將索引檔案當成小檔案合併,將其當做普通檔案處理。更嚴重的是,這會導致LZO檔案無法切片。

set hive.input.format;

        解決辦法:修改CombineHiveInputFormat為HiveInputFormat

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

      (3)再次查詢

select count(*) from ods_log;

    2)首日裝載

with sku_info as (
    select id sku_id,
           price,
           sku_name,
           sku_desc,
           weight,
           is_sale,
           spu_id,
           category3_id,
           tm_id,
           create_time
    from ods_sku_info
    where dt = '2021-06-08'
),
     spu_info as (
         select id spu_id, spu_name
         from ods_spu_info
         where dt = '2021-06-08'
     ),
     base_category3 as (
         select id category3_id, name category3_name, category2_id
         from ods_base_category3
         where dt = '2021-06-08'
     ),
     base_category2 as (
         select id category2_id, name category2_name, category1_id
         from ods_base_category2
         where dt = '2021-06-08'
     ),
     base_category1 as (
         select id category1_id, name category1_name
         from ods_base_category1
         where dt = '2021-06-08'
     ),
     base_trademark as (
         select id tm_id, tm_name
         from ods_base_trademark
         where dt = '2021-06-08'
     ),
     sku_attr_info as (
        select sku_id,collect_list(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,
            'value_name',value_name)) sku_attr_values
        from ods_sku_attr_value
        where dt = '2021-06-08'
        group by sku_id
     ),
     sku_sale_attr_info as (
         select sku_id,collect_list(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,
             'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sku_sale_attr_values
         from ods_sku_sale_attr_value
         where dt = '2021-06-08'
         group by sku_id
     )
insert overwrite table dim_sku_info partition (dt = '2021-06-08')
select sku_info.sku_id,
       price,
       sku_name,
       sku_desc,
       weight,
       is_sale,
       sku_info.spu_id,
       sku_info.category3_id,
       sku_info.tm_id,
       create_time,
       spu_name,
       category3_name,
       base_category3.category2_id,
       category2_name,
       base_category2.category1_id,
       category1_name,
       tm_name,
       sku_attr_values,
       sku_sale_attr_values
from sku_info left join spu_info on spu_info.spu_id = sku_info.spu_id
left join base_category3 on base_category3.category3_id = sku_info.category3_id
left join base_category2 on base_category2.category1_id = base_category3.category2_id
left join base_category1 on base_category1.category1_id = base_category2.category1_id
left join base_trademark on base_trademark.tm_id = sku_info.tm_id
left join sku_attr_info on sku_attr_info.sku_id = sku_info.sku_id
left join sku_sale_attr_info on sku_sale_attr_info.sku_id = sku_info.sku_id;

    3)每日裝載(和首日處理一樣,只需要修改分割槽欄位日期)

5.2優惠券維度表(全量)

  1.建表語句

drop table if exists dim_coupon_info;
create external table dim_coupon_info(
    --來自ods_coupon_info
    `id` string comment '購物券編號',
    `coupon_name` string comment '購物券名稱',
    `coupon_type` string comment '購物券型別 1 現金券 2 折扣券 3 滿減券 4 滿件打折券',
    `condition_amount` decimal(16,2) comment '滿額數',
    `condition_num` bigint comment '滿件數',
    `activity_id` string comment '活動編號',
    `benefit_amount` decimal(16,2) comment '減金額',
    `benefit_discount` decimal(16,2) comment '折扣',
    `create_time` string comment '建立時間',
    `range_type` string comment '範圍型別 1、商品 2、品類 3、品牌',
    `limit_num` bigint comment '最多領取次數',
    `taken_count` bigint comment '已領取次數',
    `start_time` string comment '可以領取的開始日期',
    `end_time` string comment '可以領取的結束日期',
    `operate_time` string comment '修改時間',
    `expire_time` string comment '過期時間'
) comment '優惠券維度表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dim/dim_coupon_info/'
tblproperties ("parquet.compression"="lzo");

  2.資料裝載

    1)首日裝載

#設定動態分割槽
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table dim_coupon_info partition (dt)
select id,
       coupon_name,
       coupon_type,
       condition_amount,
       condition_num,
       activity_id,
       benefit_amount,
       benefit_discount,
       create_time,
       range_type,
       limit_num,
       taken_count,
       start_time,
       end_time,
       operate_time,
       expire_time,
       dt
from ods_coupon_info
where dt = '2021-06-08';

    2)每日裝載(和首日匯入相同,只需要修改分割槽欄位時間)

    3)靜態分割槽 和 動態分割槽 作用相同,都是為了向分割槽表匯入資料

      靜態分割槽: 分割槽欄位是固定的,在 partition (dt=分割槽欄位),只能將資料寫入到同一個分割槽!

      動態分割槽: 查詢的資料,需要插入到不同的分割槽,分割槽欄位的值由select語句的最後一列的值決定,partition (dt),既可以將資料寫入到同一個分割槽,也可以寫入到不同的分割槽

5.3活動維度表(全量)

  1.建表語句

drop table if exists dim_activity_rule_info;
create external table dim_activity_rule_info(
    --來自ods_activity_rule
    `activity_rule_id` string comment '活動規則ID',
    `activity_id` string comment '活動ID',
    `condition_amount` decimal(16,2) comment '滿減金額',
    `condition_num` bigint comment '滿減件數',
    `benefit_amount` decimal(16,2) comment '優惠金額',
    `benefit_discount` decimal(16,2) comment '優惠折扣',
    `benefit_level` string comment '優惠級別',
    --來自ods_activity_info
    `activity_name` string comment '活動名稱',
    `activity_type` string comment '活動型別',
    `start_time` string comment '開始時間',
    `end_time` string comment '結束時間',
    `create_time` string comment '建立時間'
) comment '活動資訊表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dim/dim_activity_rule_info/'
tblproperties ("parquet.compression"="lzo");

  2.資料裝載

    1)首日裝載

insert overwrite table dim_activity_rule_info partition (dt = '2021-06-08')
select
    t1.id activity_rule_id,
        t1.activity_id,
        t1.condition_amount,
        t1.condition_num,
        t1.benefit_amount,
        t1.benefit_discount,
        t1.benefit_level,
        t2.activity_name,
        t2.activity_type,
        t2.start_time,
        t2.end_time,
        t2.create_time
from
(
    select id,
           activity_id,
           activity_type,
           condition_amount,
           condition_num,
           benefit_amount,
           benefit_discount,
           benefit_level
    from ods_activity_rule
    where dt = '2021-06-08'
) t1
left join
(
    select id, activity_name, activity_type, start_time, end_time, create_time
    from ods_activity_info
    where dt = '2021-06-08'
) t2
on t1.activity_id = t2.id;

    2)每日轉載(同首日匯入,修改分割槽欄位值)

5.4地區維度表(特殊)

  1.建表語句

drop table if exists dim_base_province;
create external table dim_base_province (
    --來自ods_base_province
    `id` string comment 'id',
    `province_name` string comment '省市名稱',
    `area_code` string comment '地區編碼',
    `iso_code` string comment 'ISO-3166編碼,供視覺化使用',
    `iso_3166_2` string comment 'IOS-3166-2編碼,供視覺化使用',
    `region_id` string comment '地區id',
    --來自ods_base_region
    `region_name` string comment '地區名稱'
) comment '地區維度表'
stored as parquet
location '/warehouse/gmall/dim/dim_base_province/'
tblproperties("parquet.compression"="lzo");

  2.資料裝載

    地區維度表資料相對穩定,變化概率較低,故無需每日裝載

insert overwrite table dim_base_province
select pro.id,
       name,
       area_code,
       iso_code,
       iso_3166_2,
       region_id,
       region_name
from ods_base_province pro
join ods_base_region reg on pro.region_id = reg.id;

5.5 時間維度表(特殊)

  1.建表語句

drop table if exists dim_date_info;
create external table dim_date_info(
    `date_id` string comment '',
    `week_id` string comment '周ID',
    `week_day` string comment '周幾',
    `day` string comment '每月的第幾天',
    `month` string comment '第幾月',
    `quarter` string comment '第幾季度',
    `year` string comment '',
    `is_workday` string comment '是否是工作日',
    `holiday_id` string comment '節假日'
) comment '時間維度表'
stored as parquet
location '/warehouse/gmall/dim/dim_date_info/'
tblproperties("parquet.compression"="lzo");

  2.資料裝載

    通常情況下,時間維度表的資料並不是來自於業務系統,而是手動寫入,並且由於時間維度表資料的可預見性,無須每日匯入,一般可一次性匯入一年的資料

    1)建立臨時表

CREATE EXTERNAL TABLE dim_date_info_tmp(
    `date_id` STRING COMMENT '',
    `week_id` STRING COMMENT '周ID',
    `week_day` STRING COMMENT '周幾',
    `day` STRING COMMENT '每月的第幾天',
    `month` STRING COMMENT '第幾月',
    `quarter` STRING COMMENT '第幾季度',
    `year` STRING COMMENT '',
    `is_workday` STRING COMMENT '是否是工作日',
    `holiday_id` STRING COMMENT '節假日'
) COMMENT '時間維度表'
    row format delimited
    fields terminated by '\t'
LOCATION '/warehouse/gmall/dim/dim_date_info_tmp/';

    2)將資料檔案上傳到HFDS上臨時表指定路徑/warehouse/gmall/dim/dim_date_info_tmp/

    3)執行以下語句將其匯入時間維度表

insert overwrite table dim_date_info select * from dim_date_info_tmp;

    4)檢查資料是否匯入成功

select * from dim_date_info;

5.6 使用者維度表(拉鍊表)

5.6.1 拉鍊表概述

  1)什麼是拉鍊表

    拉鍊表,記錄每條資訊的生命週期,一旦一條記錄的生命週期結束,就重新開始一條新的記錄,並把當前日期放入生效開始日期

    若當前資訊至今有效,在生效結束日期中填入一個極大值(如:9999-99-99)

  2)為什麼要做拉鍊表

    拉鍊表適合於資料會發生變化,但是變化頻率並不是很高的維度,即緩慢變化維,比如:使用者資訊會發生變化,但是每天變化的比例不高,若資料量有一定規模,按照每日全量的方式儲存,效率很低,例如:一億使用者 * 365天,每天一份使用者資料資訊,這樣子每日全量效率很低

  3)如何使用拉鍊表

    通過生效開始日期<=某個日期且生效結束日期>=某個日期,能夠得到某個時間點的資料全量切片

    (1)拉鍊表資料

    (2)獲取2019-01-01的歷史切片:select * from user_info where start_date <= '2019-01-01' and end_date >= '2019-01-01';

    (3)獲取2019-01-02的歷史切片:select * from user_info where start_date <= '2019-01-02' and end_date >= '2019-01-02';

  4)拉鍊表形成過程

5.6.2 製作拉鍊表

  1.建表語句

drop table if exists dim_user_info;
create external table dim_user_info(
    --來自ods_user_info
    `id` string comment '使用者id',
    `login_name` string comment '使用者名稱稱',
    `nick_name` string comment '使用者暱稱',
    `name` string comment '使用者姓名',
    `phone_num` string comment '手機號碼',
    `email` string comment '郵箱',
    `user_level` string comment '使用者等級',
    `birthday` string comment '生日',
    `gender` string comment '性別',
    `create_time` string comment '建立時間',
    `operate_time` string comment '操作時間',
    --自己建立的欄位
    `start_date` string comment '開始日期',
    `end_date` string comment '結束日期'
) comment '使用者表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dim/dim_user_info/'
tblproperties("parquet.compression"="lzo");

  2.資料裝載

    1)首日裝載

      拉鍊表首日裝載,需要進行初始化操作,具體工作為將截止到初始化當日的全部歷史使用者一次性匯入到拉鍊表中,目前的ods_order_info表的第一個分割槽,即2021-06-08分割槽中就是全部的歷史使用者,故將該分割槽資料進行一定處理後匯入拉鍊表的9999-99-99分割槽即可

insert overwrite table dim_user_info partition (dt='9999-99-99')
select id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time,
dt,
'9999-99-99' end_date
from ods_user_info
where dt = '2021-06-08';

    2)每日更新

insert overwrite table dim_user_info partition(dt)
select
    --優先取ods表中的資料,如果ods表中的欄位為null,取dim表中的資料
    if(ods_data.id is not null, ods_data.id , dim_data.id) id,
    if(ods_data.id is not null, ods_data.login_name , dim_data.login_name) login_name,
    if(ods_data.id is not null, ods_data.nick_name , dim_data.nick_name) nick_name,
    if(ods_data.id is not null, ods_data.name , dim_data.name) name,
    if(ods_data.id is not null, ods_data.phone_num , dim_data.phone_num) phone_num,
    if(ods_data.id is not null, ods_data.email , dim_data.email) email,
    if(ods_data.id is not null, ods_data.user_level , dim_data.user_level) user_level,
    if(ods_data.id is not null, ods_data.birthday , dim_data.birthday) birthday,
    if(ods_data.id is not null, ods_data.gender , dim_data.gender) gender,
    if(ods_data.id is not null, ods_data.create_time , dim_data.create_time) create_time,
    if(ods_data.id is not null, ods_data.operate_time , dim_data.operate_time) operate_time,
    if(ods_data.id is not null, ods_data.dt , dim_data.start_date) start_date,
    if(ods_data.id is not null, '9999-99-99' , dim_data.end_date) end_date,
      '9999-99-99' dt
from
(
    select *
    from dim_user_info
    where dt = '9999-99-99'
) dim_data
full join
(
    select *
    from ods_user_info
    --存放的是 2021-06-09當天mysql中user_info表新增和變化的資料
    where dt='2021-06-09'
) ods_data on dim_data.id = ods_data.id
union all
select
    dim_data.id,
    dim_data.login_name,
    dim_data.nick_name,
    dim_data.name,
    dim_data.phone_num,
    dim_data.email,
    dim_data.user_level,
    dim_data.birthday,
    dim_data.gender,
    dim_data.create_time,
    dim_data.operate_time,
    dim_data.start_date,
    --修改end_date
    cast(date_sub(ods_data.dt,1) as string) end_date ,
    cast(date_sub(ods_data.dt,1) as string) dt
from
(
    select *
    from dim_user_info
    where dt = '9999-99-99'
) dim_data
join
(
    select *
    from ods_user_info
    --存放的是 2021-06-09當天mysql中user_info表新增和變化的資料
    where dt='2021-06-09'
) ods_data
on dim_data.id = ods_data.id;

5.7DIM層首日資料裝載指令碼

  1)編寫指令碼

    (1)在/home/atguigu/bin目錄下建立指令碼ods_to_dim_db_init.sh

vim ods_to_dim_db_init.sh

    (2)增加執行許可權

chmod +x ods_to_dim_db_init.sh

  2)指令碼使用

    (1)執行指令碼(該指令碼不包含時間維度表的裝載,時間維度表需手動裝載資料,參考5.5

ods_to_dim_db_init.sh all 2021-06-08

    (2)檢視資料是否匯入成功

5.8 DIM層每日資料裝載指令碼

  1)編寫指令碼

    (1)在/home/atguigu/bin目錄下建立指令碼ods_to_dim_db.sh

vim ods_to_dim_db.sh
#!/bin/bash

APP=gmall

if [ -n "$2" ]; then
    do_date=$2
else
    echo "請傳入日期引數"
    exit
fi

dim_user_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_user_info partition(dt = '9999-99-99')
select
    id,
    login_name,
    nick_name,
    md5(name),
    md5(phone_num),
    md5(email),
    user_level,
    birthday,
    gender,
    create_time,
    operate_time,
    '$do_date',
    '9999-99-99'
from ${APP}.ods_user_info
where dt = '$do_date';"

dim_sku_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with sku_info as (
    select id sku_id,
           price,
           sku_name,
           sku_desc,
           weight,
           is_sale,
           spu_id,
           category3_id,
           tm_id,
           create_time
    from ${APP}.ods_sku_info
    where dt = '$do_date'
),
     spu_info as (
         select id spu_id, spu_name
         from ${APP}.ods_spu_info
         where dt = '$do_date'
     ),
     base_category3 as (
         select id category3_id, name category3_name, category2_id
         from ${APP}.ods_base_category3
         where dt = '$do_date'
     ),
     base_category2 as (
         select id category2_id, name category2_name, category1_id
         from ${APP}.ods_base_category2
         where dt = '$do_date'
     ),
     base_category1 as (
         select id category1_id, name category1_name
         from ${APP}.ods_base_category1
         where dt = '$do_date'
     ),
     base_trademark as (
         select id tm_id, tm_name
         from ${APP}.ods_base_trademark
         where dt = '$do_date'
     ),
     sku_attr_info as (
        select sku_id,
            collect_list(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,'value_name',value_name)) sku_attr_values
        from ${APP}.ods_sku_attr_value
        where dt = '$do_date'
        group by sku_id
     ),
     sku_sale_attr_info as (
         select sku_id,
             collect_list(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,
            'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sku_sale_attr_values
         from ${APP}.ods_sku_sale_attr_value
         where dt = '$do_date'
         group by sku_id
     )
insert overwrite table ${APP}.dim_sku_info partition (dt = '$do_date')
select sku_info.sku_id,
       sku_info.price,
       sku_info.sku_name,
       sku_info.sku_desc,
       sku_info.weight,
       sku_info.is_sale,
       sku_info.spu_id,
       sku_info.category3_id,
       sku_info.tm_id,
       sku_info.create_time,
       spu_info.spu_name,
       base_category3.category3_name,
       base_category3.category2_id,
       base_category2.category2_name,
       base_category2.category1_id,
       base_category1.category1_name,
       base_trademark.tm_name,
       sku_attr_info.sku_attr_values,
       sku_sale_attr_info.sku_sale_attr_values
from sku_info left join spu_info on spu_info.spu_id = sku_info.spu_id
left join base_category3 on base_category3.category3_id = sku_info.category3_id
left join base_category2 on base_category2.category1_id = base_category3.category2_id
left join base_category1 on base_category1.category1_id = base_category2.category1_id
left join base_trademark on base_trademark.tm_id = sku_info.tm_id
left join sku_attr_info on sku_attr_info.sku_id = sku_info.sku_id
left join sku_sale_attr_info on sku_sale_attr_info.sku_id = sku_info.sku_id;
"

dim_base_province="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_base_province
select pro.id,
       pro.name,
       pro.area_code,
       pro.iso_code,
       pro.iso_3166_2,
       pro.region_id,
       reg.region_name
from ${APP}.ods_base_province pro
join ${APP}.ods_base_region reg 
on pro.region_id = reg.id;
"

dim_coupon_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dim_coupon_info partition (dt)
select id,
       coupon_name,
       coupon_type,
       condition_amount,
       condition_num,
       activity_id,
       benefit_amount,
       benefit_discount,
       create_time,
       range_type,
       limit_num,
       taken_count,
       start_time,
       end_time,
       operate_time,
       expire_time,
       dt
from ${APP}.ods_coupon_info
where dt = '$do_date';
"

dim_activity_rule_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_activity_rule_info partition (dt = '$do_date')
select
    t1.id activity_rule_id,
        t1.activity_id,
        t1.condition_amount,
        t1.condition_num,
        t1.benefit_amount,
        t1.benefit_discount,
        t1.benefit_level,
        t2.activity_name,
        t2.activity_type,
        t2.start_time,
        t2.end_time,
        t2.create_time
from
(
    select id,
           activity_id,
           condition_amount,
           condition_num,
           benefit_amount,
           benefit_discount,
           benefit_level
    from ${APP}.ods_activity_rule
    where dt = '$do_date'
) t1
left join
(
    select id, activity_name, activity_type, start_time, end_time, create_time,dt
    from ${APP}.ods_activity_info
    where dt = '$do_date'
) t2
on t1.activity_id = t2.id;"

case $1 in
"dim_user_info"){
    hive -e "$dim_user_info"
};;
"dim_sku_info"){
    hive -e "$dim_sku_info"
};;
"dim_base_province"){
    hive -e "$dim_base_province"
};;
"dim_coupon_info"){
    hive -e "$dim_coupon_info"
};;
"dim_activity_rule_info"){
    hive -e "$dim_activity_rule_info"
};;
"all"){
    hive -e "$dim_user_info$dim_sku_info$dim_coupon_info$dim_activity_rule_info$dim_base_province"
};;
esac

    (2)增加執行許可權

chmod +x ods_to_dim_db.sh

  2)指令碼使用

    (1)執行指令碼

ods_to_dim_db.sh all 2021-06-09

    (2)檢視資料是否匯入成功