Flink+Hologres億級使用者實時UV精確去重最佳實踐
阿新 • • 發佈:2021-06-28
簡介: Flink+Hologres億級使用者實時UV精確去重最佳實踐
UV、PV計算,因為業務需求不同,通常會分為兩種場景:
- 離線計算場景:以T+1為主,計算曆史資料
- 實時計算場景:實時計算日常新增的資料,對使用者標籤去重
針對離線計算場景,Hologres基於RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支援超高基數UV計算(基於RoaringBitmap實現)
對於實時計算場景,可以使用Flink+Hologres方式,並基於RoaringBitmap,實時對使用者標籤去重。這樣的方式,可以較細粒度的實時得到使用者UV、PV資料,同時便於根據需求調整最小統計視窗(如最近5分鐘的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較於以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,並且通過簡單的聚合,也可以得到較大時間單位的統計結果。
主體思想
- Flink將流式資料轉化為表與維表進行JOIN操作,再轉化為流式資料。此舉可以利用Hologres維表的insertIfNotExists特性結合自增欄位實現高效的uid對映。
- Flink把關聯的結果資料按照時間視窗進行處理,根據查詢維度使用RoaringBitmap進行聚合,並將查詢維度以及聚合的uid存放在聚合結果表,其中聚合出的uid結果放入Hologres的RoaringBitmap型別的欄位中。
- 查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,並對其中關鍵的RoaringBitmap欄位做or運算後並統計基數,即可得出對應使用者數。
- 處理流程如下圖所示
方案最佳實踐
1.建立相關基礎表
1)建立表uid_mapping為uid對映表,用於對映uid到32位int型別。
- RoaringBitmap型別要求使用者ID必須是32位int型別且越稠密越好(即使用者ID最好連續)。常見的業務系統或者埋點中的使用者ID很多是字串型別或Long型別,因此需要使用uid_mapping型別構建一張對映表。對映表利用Hologres的SERIAL型別(自增的32位int)來實現使用者對映的自動管理和穩定對映。
- 由於是實時資料, 設定該表為行存表,以提高Flink維表實時JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--將uid設為clustering_key和distribution_key便於快速查詢其對應的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;
2)建立表dws_app為基礎聚合表,用於存放在基礎維度上聚合後的結果。
- 使用RoaringBitmap前需要建立RoaringBitmap extention,同時也需要Hologres例項為0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
- 為了更好效能,建議根據基礎聚合表資料量合理的設定Shard數,但建議基礎聚合表的Shard數設定不超過計算資源的Core數。推薦使用以下方式通過Table Group來設定Shard數
--新建shard數為16的Table Group,
--因為測試資料量百萬級,其中後端計算資源為100core,設定shard數為16
BEGIN;
CREATE TABLE tg16 (a int); --Table Group哨兵表
call set_table_property('tg16', 'shard_count', '16');
COMMIT;
- 相比離線結果表,此結果表增加了時間戳欄位,用於實現以Flink視窗週期為單位的統計。結果表DDL如下:
BEGIN;
create table dws_app(
country text,
prov text,
city text,
ymd text NOT NULL, --日期欄位
timetz TIMESTAMPTZ, --統計時間戳,可以實現以Flink視窗週期為單位的統計
uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv
primary key(country, prov, city, ymd, timetz)--查詢維度和時間作為主鍵,防止重複插入資料
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期欄位設為clustering_key和event_time_column,便於過濾
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等價於將表放在shard數為16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by欄位設為distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;
2.Flink實時讀取資料並更新dws_app基礎聚合表
完整示例原始碼請見alibabacloud-hologres-connectors examples
1)Flink 流式讀取資料來源(DataStream),並轉化為源表(Table)
//此處使用csv檔案作為資料來源,也可以是kafka等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 與維表join需要新增proctime欄位,詳見https://help.aliyun.com/document_detail/62506.html
Table odsTable =
tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("proctime").proctime());
// 註冊到catalog環境
tableEnv.createTemporaryView("odsTable", odsTable);
2)將源表與Hologres維表(uid_mapping)進行關聯
其中維表使用insertIfNotExists引數,即查詢不到資料時自行插入,uid_int32欄位便可以利用Hologres的serial型別自增建立。
// 建立Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入
String createUidMappingTable =
String.format(
"create table uid_mapping_dim("
+ " uid string,"
+ " uid_int32 INT"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s'," //Hologres DB名
+ " 'tablename' = '%s',"//Hologres 表名
+ " 'username' = '%s'," //當前賬號access id
+ " 'password' = '%s'," //當前賬號access key
+ " 'endpoint' = '%s'," //Hologres endpoint
+ " 'insertifnotexists'='true'"
+ ")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表與維表join
String odsJoinDim =
"SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
+ " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
+ " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);
3)將關聯結果轉化為DataStream,通過Flink時間視窗處理,結合RoaringBitmap進行聚合
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
source
// 篩選需要統計的維度(country, prov, city, ymd)
.keyBy(0, 1, 2, 3)
// 滾動時間視窗;此處由於使用讀取csv模擬輸入流,採用ProcessingTime,實際使用中可使用EventTime
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// 觸發器,可以在視窗未結束時獲取聚合結果
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
.aggregate(
// 聚合函式,根據key By篩選的維度,進行聚合
new AggregateFunction<
Tuple5<String, String, String, String, Integer>,
RoaringBitmap,
RoaringBitmap>() {