資料倉庫(七)——DWD 層
DWD層內容
1)對使用者行為資料解析。
2)對業務資料採用維度模型重新建模。
第一章 使用者行為日誌
1.1 日誌解析思路
1)日誌結構回顧
(1)頁面埋點日誌
(2)啟動日誌
2)日誌解析思路
1.2 get_json_object函式使用
1)資料
[{"name":"大郎","sex":"男","age":"25"},{"name":"西門慶","sex":"男","age":"47"}]
2)取出第一個json物件
hive (gmall)>
select get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西門慶","sex":"男","age":"47"}]','$[0]');
結果是:{"name":"大郎","sex":"男","age":"25"}
3)取出第一個json的age欄位的值
hive (gmall)>
SELECT get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西門慶","sex":"男","age":"47"}]',"$[0].age");
結果是:25
1.3 啟動日誌表
啟動日誌解析思路:啟動日誌表中每行資料對應一個啟動記錄,一個啟動記錄應該包含日誌中的公共資訊和啟動資訊。先將所有包含start欄位的日誌過濾出來,然後使用get_json_object函式解析每個欄位。
1)建表語句
DROP TABLE IF EXISTS dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '裝置id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`entry` STRING COMMENT 'icon手機圖示 notice 通知 install 安裝後啟動',
`loading_time` BIGINT COMMENT '啟動載入時間',
`open_ad_id` STRING COMMENT '廣告頁ID ',
`open_ad_ms` BIGINT COMMENT '廣告總共播放時間',
`open_ad_skip_ms` BIGINT COMMENT '使用者跳過廣告時點',
`ts` BIGINT COMMENT '時間'
) COMMENT '啟動日誌表'
PARTITIONED BY (`dt` STRING) -- 按照時間建立分割槽
STORED AS PARQUET -- 採用parquet列式儲存
LOCATION '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上儲存位置
TBLPROPERTIES('parquet.compression'='lzo') -- 採用LZO壓縮
;
2)資料匯入
insert overwrite table dwd_start_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.start') is not null;
3)檢視資料
select * from dwd_start_log where dt='2020-06-14' limit 2;
1.4 頁面日誌表
頁面日誌解析思路:頁面日誌表中每行資料對應一個頁面訪問記錄,一個頁面訪問記錄應該包含日誌中的公共資訊和頁面資訊。先將所有包含page欄位的日誌過濾出來,然後使用get_json_object函式解析每個欄位。
1)建表語句
DROP TABLE IF EXISTS dwd_page_log;
CREATE EXTERNAL TABLE dwd_page_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '裝置id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`during_time` BIGINT COMMENT '持續時間毫秒',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面ID ',
`source_type` STRING COMMENT '來源型別',
`ts` bigint
) COMMENT '頁面日誌表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_page_log'
TBLPROPERTIES('parquet.compression'='lzo');
2)資料匯入
insert overwrite table dwd_page_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.page') is not null;
3)檢視資料
select * from dwd_page_log where dt='2020-06-14' limit 2;
1.5 動作日誌表
動作日誌解析思路:動作日誌表中每行資料對應使用者的一個動作記錄,一個動作記錄應當包含公共資訊、頁面資訊以及動作資訊。先將包含action欄位的日誌過濾出來,然後通過UDTF函式,將action陣列“炸開”(類似於explode函式的效果),然後使用get_json_object函式解析每個欄位。
1)建表語句
DROP TABLE IF EXISTS dwd_action_log;
CREATE EXTERNAL TABLE dwd_action_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '裝置id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`during_time` BIGINT COMMENT '持續時間毫秒',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面id ',
`source_type` STRING COMMENT '來源型別',
`action_id` STRING COMMENT '動作id',
`item` STRING COMMENT '目標id ',
`item_type` STRING COMMENT '目標型別',
`ts` BIGINT COMMENT '時間'
) COMMENT '動作日誌表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_action_log'
TBLPROPERTIES('parquet.compression'='lzo');
2)建立UDTF函式——設計思路
3)建立UDTF函式——編寫程式碼
(1)建立一個maven工程:hivefunction
(2)建立包名:com.atguigu.hive.udtf
(3)引入如下依賴
<dependencies>
<!--新增hive依賴-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
(4)編碼
package com.atguigu.gmall.hive.udtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import java.util.ArrayList;
import java.util.List;
public class ExplodeJSONArray extends GenericUDTF {
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
// 1 引數合法性檢查
if (argOIs.length != 1) {
throw new UDFArgumentException("explode_json_array 只需要一個引數");
}
// 2 第一個引數必須為string
//判斷引數是否為基礎資料型別
if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("explode_json_array 只接受基礎型別引數");
}
//將引數物件檢查器強轉為基礎型別物件檢查器
PrimitiveObjectInspector argumentOI = (PrimitiveObjectInspector) argOIs[0];
//判斷引數是否為String型別
if (argumentOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentException("explode_json_array 只接受string型別的引數");
}
// 3 定義返回值名稱和型別
List<String> fieldNames = new ArrayList<String>();
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("items");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
public void process(Object[] objects) throws HiveException {
// 1 獲取傳入的資料
String jsonArray = objects[0].toString();
// 2 將string轉換為json陣列
JSONArray actions = new JSONArray(jsonArray);
// 3 迴圈一次,取出陣列中的一個json,並寫出
for (int i = 0; i < actions.length(); i++) {
String[] result = new String[1];
result[0] = actions.getString(i);
forward(result);
}
}
public void close() throws HiveException {
}
}
4)建立函式
(1)打包
(2)將hivefunction-1.0-SNAPSHOT.jar上傳到hadoop102的/opt/module,然後再將該jar包上傳到HDFS的/user/hive/jars路徑下
[atguigu@hadoop102 module]$ hadoop fs -mkdir -p /user/hive/jars
[atguigu@hadoop102 module]$ hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars
(3)建立永久函式與開發好的java class關聯
create function explode_json_array as 'com.atguigu.gmall.hive.udtf.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';
(4)注意:如果修改了自定義函式重新生成jar包怎麼處理?只需要替換HDFS路徑上的舊jar包,然後重啟Hive客戶端即可。
5)資料匯入
insert overwrite table dwd_action_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ods_log lateral view explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='2020-06-14'
and get_json_object(line,'$.actions') is not null;
3)檢視資料
select * from dwd_action_log where dt='2020-06-14' limit 2;
1.6 曝光日誌表
曝光日誌解析思路:曝光日誌表中每行資料對應一個曝光記錄,一個曝光記錄應當包含公共資訊、頁面資訊以及曝光資訊。先將包含display欄位的日誌過濾出來,然後通過UDTF函式,將display陣列“炸開”(類似於explode函式的效果),然後使用get_json_object函式解析每個欄位。
1)建表語句
DROP TABLE IF EXISTS dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '裝置id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`during_time` BIGINT COMMENT 'app版本號',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面ID ',
`source_type` STRING COMMENT '來源型別',
`ts` BIGINT COMMENT 'app版本號',
`display_type` STRING COMMENT '曝光型別',
`item` STRING COMMENT '曝光物件id ',
`item_type` STRING COMMENT 'app版本號',
`order` BIGINT COMMENT '曝光順序',
`pos_id` BIGINT COMMENT '曝光位置'
) COMMENT '曝光日誌表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_display_log'
TBLPROPERTIES('parquet.compression'='lzo');
2)資料匯入
insert overwrite table dwd_display_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts'),
get_json_object(display,'$.display_type'),
get_json_object(display,'$.item'),
get_json_object(display,'$.item_type'),
get_json_object(display,'$.order'),
get_json_object(display,'$.pos_id')
from ods_log lateral view explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='2020-06-14'
and get_json_object(line,'$.displays') is not null;
3)檢視資料
select * from dwd_display_log where dt='2020-06-14' limit 2;
1.7 錯誤日誌表
錯誤日誌解析思路:錯誤日誌表中每行資料對應一個錯誤記錄,為方便定位錯誤,一個錯誤記錄應當包含與之對應的公共資訊、頁面資訊、曝光資訊、動作資訊、啟動資訊以及錯誤資訊。先將包含err欄位的日誌過濾出來,然後使用get_json_object函式解析所有欄位。
1)建表語句
DROP TABLE IF EXISTS dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '裝置id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面ID ',
`source_type` STRING COMMENT '來源型別',
`entry` STRING COMMENT ' icon手機圖示 notice 通知 install 安裝後啟動',
`loading_time` STRING COMMENT '啟動載入時間',
`open_ad_id` STRING COMMENT '廣告頁ID ',
`open_ad_ms` STRING COMMENT '廣告總共播放時間',
`open_ad_skip_ms` STRING COMMENT '使用者跳過廣告時點',
`actions` STRING COMMENT '動作',
`displays` STRING COMMENT '曝光',
`ts` STRING COMMENT '時間',
`error_code` STRING COMMENT '錯誤碼',
`msg` STRING COMMENT '錯誤資訊'
) COMMENT '錯誤日誌表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_error_log'
TBLPROPERTIES('parquet.compression'='lzo');
說明:此處為對動作陣列和曝光陣列做處理,如需分析錯誤與單個動作或曝光的關聯,可先使用explode_json_array函式將陣列“炸開”,再使用get_json_object函式獲取具體欄位。
4)資料匯入
insert overwrite table dwd_error_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.err') is not null;
5)檢視資料
select * from dwd_error_log where dt='2020-06-14' limit 2;
1.8 DWD層使用者行為資料載入指令碼
1)編寫指令碼
(1)在hadoop102的/home/atguigu/bin目錄下建立指令碼
[atguigu@hadoop102 bin]$ vim ods_to_dwd_log.sh
在指令碼中編寫如下內容
#!/bin/bash
APP=gmall
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
dwd_start_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_start_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.start') is not null;"
dwd_page_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_page_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;"
dwd_action_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_action_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,