1. 程式人生 > 其它 >Flink+Hologres億級使用者實時UV精確去重最佳實踐

Flink+Hologres億級使用者實時UV精確去重最佳實踐

簡介: Flink+Hologres億級使用者實時UV精確去重最佳實踐

UV、PV計算,因為業務需求不同,通常會分為兩種場景:

  • 離線計算場景:以T+1為主,計算曆史資料
  • 實時計算場景:實時計算日常新增的資料,對使用者標籤去重

針對離線計算場景,Hologres基於RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支援超高基數UV計算(基於RoaringBitmap實現)

對於實時計算場景,可以使用Flink+Hologres方式,並基於RoaringBitmap,實時對使用者標籤去重。這樣的方式,可以較細粒度的實時得到使用者UV、PV資料,同時便於根據需求調整最小統計視窗(如最近5分鐘的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較於以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,並且通過簡單的聚合,也可以得到較大時間單位的統計結果。

主體思想

  1. Flink將流式資料轉化為表與維表進行JOIN操作,再轉化為流式資料。此舉可以利用Hologres維表的insertIfNotExists特性結合自增欄位實現高效的uid對映。
  2. Flink把關聯的結果資料按照時間視窗進行處理,根據查詢維度使用RoaringBitmap進行聚合,並將查詢維度以及聚合的uid存放在聚合結果表,其中聚合出的uid結果放入Hologres的RoaringBitmap型別的欄位中。
  3. 查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,並對其中關鍵的RoaringBitmap欄位做or運算後並統計基數,即可得出對應使用者數。
  4. 處理流程如下圖所示

方案最佳實踐

1.建立相關基礎表

1)建立表uid_mapping為uid對映表,用於對映uid到32位int型別。

  • RoaringBitmap型別要求使用者ID必須是32位int型別且越稠密越好(即使用者ID最好連續)。常見的業務系統或者埋點中的使用者ID很多是字串型別或Long型別,因此需要使用uid_mapping型別構建一張對映表。對映表利用Hologres的SERIAL型別(自增的32位int)來實現使用者對映的自動管理和穩定對映。
  • 由於是實時資料, 設定該表為行存表,以提高Flink維表實時JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--將uid設為clustering_key和distribution_key便於快速查詢其對應的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;

2)建立表dws_app為基礎聚合表,用於存放在基礎維度上聚合後的結果。

  • 使用RoaringBitmap前需要建立RoaringBitmap extention,同時也需要Hologres例項為0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
  • 為了更好效能,建議根據基礎聚合表資料量合理的設定Shard數,但建議基礎聚合表的Shard數設定不超過計算資源的Core數。推薦使用以下方式通過Table Group來設定Shard數
--新建shard數為16的Table Group,
--因為測試資料量百萬級,其中後端計算資源為100core,設定shard數為16
BEGIN;
CREATE TABLE tg16 (a int);                             --Table Group哨兵表
call set_table_property('tg16', 'shard_count', '16'); 
COMMIT;
  • 相比離線結果表,此結果表增加了時間戳欄位,用於實現以Flink視窗週期為單位的統計。結果表DDL如下:
BEGIN;
create table dws_app(
  country text,
  prov text,
  city text, 
  ymd text NOT NULL,  --日期欄位
  timetz TIMESTAMPTZ,  --統計時間戳,可以實現以Flink視窗週期為單位的統計
  uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv
  primary key(country, prov, city, ymd, timetz)--查詢維度和時間作為主鍵,防止重複插入資料
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期欄位設為clustering_key和event_time_column,便於過濾
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等價於將表放在shard數為16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by欄位設為distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;

2.Flink實時讀取資料並更新dws_app基礎聚合表

完整示例原始碼請見alibabacloud-hologres-connectors examples

1)Flink 流式讀取資料來源(DataStream),並轉化為源表(Table)

//此處使用csv檔案作為資料來源,也可以是kafka等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 與維表join需要新增proctime欄位,詳見https://help.aliyun.com/document_detail/62506.html
Table odsTable =
    tableEnv.fromDataStream(
    odsStream,
    $("uid"),
    $("country"),
    $("prov"),
    $("city"),
    $("ymd"),
    $("proctime").proctime());
// 註冊到catalog環境
tableEnv.createTemporaryView("odsTable", odsTable);

2)將源表與Hologres維表(uid_mapping)進行關聯

其中維表使用insertIfNotExists引數,即查詢不到資料時自行插入,uid_int32欄位便可以利用Hologres的serial型別自增建立。

// 建立Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入
String createUidMappingTable =
    String.format(
    "create table uid_mapping_dim("
    + "  uid string,"
    + "  uid_int32 INT"
    + ") with ("
    + "  'connector'='hologres',"
    + "  'dbname' = '%s'," //Hologres DB名
    + "  'tablename' = '%s',"//Hologres 表名
    + "  'username' = '%s'," //當前賬號access id
    + "  'password' = '%s'," //當前賬號access key
    + "  'endpoint' = '%s'," //Hologres endpoint
    + "  'insertifnotexists'='true'"
    + ")",
    database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表與維表join
String odsJoinDim =
    "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
    + "  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
    + "  ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);

3)將關聯結果轉化為DataStream,通過Flink時間視窗處理,結合RoaringBitmap進行聚合

DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
    source
    // 篩選需要統計的維度(country, prov, city, ymd)
    .keyBy(0, 1, 2, 3)
    // 滾動時間視窗;此處由於使用讀取csv模擬輸入流,採用ProcessingTime,實際使用中可使用EventTime
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    // 觸發器,可以在視窗未結束時獲取聚合結果
    .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
    .aggregate(
    // 聚合函式,根據key By篩選的維度,進行聚合
    new AggregateFunction<
        Tuple5<String, String, String, String, Integer>,
        RoaringBitmap,
        RoaringBitmap>() {
            @Override
            public RoaringBitmap createAccumulator() {
                return new RoaringBitmap();
            }
            @Override
            public RoaringBitmap add(
                Tuple5<String, String, String, String, Integer> in,
                RoaringBitmap acc) {
                // 將32位的uid新增到RoaringBitmap進行去重
                acc.add(in.f4);
                return acc;
            }
            @Override
            public RoaringBitmap getResult(RoaringBitmap acc) {
                return acc;
            }
            @Override
            public RoaringBitmap merge(
                RoaringBitmap acc1, RoaringBitmap acc2) {
                return RoaringBitmap.or(acc1, acc2);
            }
     },
    //視窗函式,輸出聚合結果
    new WindowFunction<
        RoaringBitmap,
        Tuple6<String, String, String, String, Timestamp, byte[]>,
        Tuple,
        TimeWindow>() {
            @Override
            public void apply(
                Tuple keys,
                TimeWindow timeWindow,
                Iterable<RoaringBitmap> iterable,
                Collector<
                Tuple6<String, String, String, String, Timestamp, byte[]>> out)
                throws Exception {
                RoaringBitmap result = iterable.iterator().next();
                // 優化RoaringBitmap
                result.runOptimize();
                // 將RoaringBitmap轉化為位元組陣列以存入Holo中
                byte[] byteArray = new byte[result.serializedSizeInBytes()];
                result.serialize(ByteBuffer.wrap(byteArray));
                // 其中 Tuple6.f4(Timestamp) 欄位表示以視窗長度為週期進行統計,以秒為單位
                out.collect(
                    new Tuple6<>(
                        keys.getField(0),
                        keys.getField(1),
                        keys.getField(2),
                        keys.getField(3),
                        new Timestamp(
                            timeWindow.getEnd() / 1000 * 1000),
                        byteArray));
        }
    });

4)寫入結果表

需要注意的是,Hologres中RoaringBitmap型別在Flink中對應Byte陣列型別

// 計算結果轉換為表
Table resTable =
    tableEnv.fromDataStream(
        processedSource,
        $("country"),
        $("prov"),
        $("city"),
        $("ymd"),
        $("timest"),
        $("uid32_bitmap"));
// 建立Hologres結果表, 其中Hologres的RoaringBitmap型別通過Byte陣列存入
String createHologresTable =
    String.format(
        "create table sink("
        + "  country string,"
        + "  prov string,"
        + "  city string,"
        + "  ymd string,"
        + "  timetz timestamp,"
        + "  uid32_bitmap BYTES"
        + ") with ("
        + "  'connector'='hologres',"
        + "  'dbname' = '%s',"
        + "  'tablename' = '%s',"
        + "  'username' = '%s',"
        + "  'password' = '%s',"
        + "  'endpoint' = '%s',"
        + "  'connectionSize' = '%s',"
        + "  'mutatetype' = 'insertOrReplace'"
        + ")",
    database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 寫入計算結果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);

3.資料查詢

查詢時,從基礎聚合表(dws_app)中按照查詢維度做聚合計算,查詢bitmap基數,得出group by條件下的使用者數

  • 查詢某天內各個城市的uv
--執行下面RB_AGG運算查詢,可執行引數先關閉三階段聚合開關(預設關閉),效能更好
set hg_experimental_enable_force_three_stage_agg=off  

SELECT  country
        ,prov
        ,city
        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   ymd = '20210329'
GROUP BY country
         ,prov
         ,city
;

  • 查詢某段時間內各個省份的uv
--執行下面RB_AGG運算查詢,可執行引數先關閉三階段聚合開關(預設關閉),效能更好
set hg_experimental_enable_force_three_stage_agg=off 

SELECT  country
        ,prov
        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country
         ,prov
;


原文連結
本文為阿里雲原創內容,未經允許不得轉載。