1. 程式人生 > 其它 >7.Flink實時專案之獨立訪客開發

7.Flink實時專案之獨立訪客開發

在上6節當中,我們已經完成了從ods層到dwd層的轉換,包括日誌資料和業務資料,下面我們開始做dwm層的任務。 DWM 層主要服務 DWS,因為部分需求直接從 DWD 層到DWS 層中間會有一定的計算量,而且這部分計算的結果很有可能被多個 DWS 層主題複用,所以部分 DWD 會形成一層 DWM

1.架構說明

在上6節當中,我們已經完成了從ods層到dwd層的轉換,包括日誌資料和業務資料,下面我們開始做dwm層的任務。

DWM 層主要服務 DWS,因為部分需求直接從 DWD 層到DWS 層中間會有一定的計算量,而且這部分計算的結果很有可能被多個 DWS 層主題複用,所以部分 DWD 會形成一層 DWM,我們這裡主要涉及業務:

  • 訪問UV計算

  • 跳出明細計算

  • 訂單寬表

  • 支付寬表

因為實時計算與離線不同,實時計算的開發和運維成本都是非常高的,要結合實際情況考慮是否有必要象離線數倉一樣,建一個大而全的中間層。如果沒有必要大而全,這時候就需要大體規劃一下要實時計算出的指標需求了。把這些指標以主題寬表的形式輸出就是我們的 DWS 層。

統計主題 需求指標 輸出方式 計算來源 來源層級
訪客 pv 視覺化大屏 page_log直接可求 dwd
uv 視覺化大屏 需要用page_log過濾去重 dwm
跳出率 視覺化大屏 需要用page_log行為判斷 dwm
進入頁面數 視覺化大屏 需要識別開始訪問標識 dwd
連續訪問時長 視覺化大屏 page_log直接可求 dwd
商品 點選 多維分析 page_log直接可求 dwd
收藏 多維分析 收藏表 dwd
加入購物車 多維分析 購物車表 dwd
下單 視覺化大屏 訂單寬表 dwm
支付 多維分析 支付寬表 dwm
退款 多維分析 退款表 dwd
評論 多維分析 評論表 dwd
地區 pv 多維分析 page_log直接可求 dwd
uv 多維分析 需要page_log過濾去重 dwm
下單 視覺化大屏 訂單寬表 dwm
關鍵詞 搜尋關鍵詞 視覺化大屏 page_log直接可求 dwd
點選商品關鍵詞 視覺化大屏 商品主題下單再次聚合 dws
下單商品關鍵詞 視覺化大屏 商品主題下單再次聚合 dws

2. 訪客UV計算

UV,全稱是 Unique Visitor,即獨立訪客,對於實時計算中,也可以稱為 DAU(Daily Active User),即每日活躍使用者,因為實時計算中的uv通常是指當日的訪客數。那麼如何從使用者行為日誌中識別出當日的訪客,那麼有兩點:

  • 其一,是識別出該訪客開啟的第一個頁面,表示這個訪客開始進入我們的應用

  • 其二,由於訪客可以在一天中多次進入應用,所以我們要在一天的範圍內進行去重

程式碼,新建任務UniqueVisitApp.java,我們要從kafka的ods層消費資料,主題為:dwd_page_log

package com.zhangbao.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
​
/**
 * @author: zhangbao
 * @date: 2021/9/12 19:51
 * @desc: uv 計算
 **/
public class UniqueVisitApp {
  public static void main(String[] args) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    //設定並行度
    env.setParallelism(4);
    //設定檢查點
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));
    //指定哪個使用者讀取hdfs檔案
    System.setProperty("HADOOP_USER_NAME","zhangbao");
​
    //從kafka讀取資料來源
    String sourceTopic = "dwd_page_log";
    String group = "unique_visit_app_group";
    FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
    DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
​
    //資料轉換
    SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(obj -> JSON.parseObject(obj));
​
    jsonObjDs.print("jsonObjDs >>>");
    try {
      env.execute("task uniqueVisitApp");
     } catch (Exception e) {
      e.printStackTrace();
     }
   }
}

測試從kafka消費資料

  • 啟動服務:zk,kf,logger.sh ,hadoop

  • 執行任務:BaseLogTask.java,UniqueVisitApp.java

  • 執行日誌生成伺服器

  • 檢視控制檯輸出

目前任務執行流程

UniqueVisitApp程式接收到的資料

{
 "common": {
  "ar": "440000",
  "uid": "48",
  "os": "Android 11.0",
  "ch": "xiaomi",
  "is_new": "0",
  "md": "Sumsung Galaxy S20",
  "mid": "mid_9",
  "vc": "v2.1.134",
  "ba": "Sumsung"
  },
 "page": {
  "page_id": "login",
  "during_time": 4621,
  "last_page_id": "good_detail"
  },
 "ts": 1631460110000
}

3. 核心過濾流程

從kafka的ods層取出資料之後,就該做具體的uv處理了。

1.首先用 keyby 按照 mid 進行分組,每組表示當前裝置的訪問情況

2.分組後使用 keystate 狀態,記錄使用者進入時間,實現 RichFilterFunction 完成過濾

3.重寫 open 方法用來初始化狀態

4.重寫 filter 方法進行過濾

  • 可以直接篩掉 last_page_id 不為空的欄位,因為只要有上一頁,說明這條不是這個使用者進入的首個頁面。

  • 狀態用來記錄使用者的進入時間,只要這個 lastVisitDate 是今天,就說明使用者今天已經訪問過了所以篩除掉。如果為空或者不是今天,說明今天還沒訪問過,則保留。

  • 因為狀態值主要用於篩選是否今天來過,所以這個記錄過了今天基本上沒有用了,這裡 enableTimeToLive 設定了 1 天的過期時間,避免狀態過大。

package com.zhangbao.gmall.realtime.app.dwm;
​
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
​
import java.text.SimpleDateFormat;
import java.util.Date;
​
/**
 * @author: zhangbao
 * @date: 2021/9/12 19:51
 * @desc: uv 計算
 **/
​
public class UniqueVisitApp {
  public static void main(String[] args) {
    //webui模式,需要新增pom依賴
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//     StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
    //設定並行度
    env.setParallelism(4);
    //設定檢查點
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));
    //指定哪個使用者讀取hdfs檔案
    System.setProperty("HADOOP_USER_NAME","zhangbao");
​
    //從kafka讀取資料來源
    String sourceTopic = "dwd_page_log";
    String group = "unique_visit_app_group";
    FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
    DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
​
    //資料轉換
    SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(obj -> JSON.parseObject(obj));
​
    //按照裝置id分組
    KeyedStream<JSONObject, String> keyByMid = jsonObjDs.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
​
    //過濾
    SingleOutputStreamOperator<JSONObject> filterDs = keyByMid.filter(new RichFilterFunction<JSONObject>() {
      ValueState<String> lastVisitDate = null;
      SimpleDateFormat sdf = null;
      @Override
      public void open(Configuration parameters) throws Exception {
        //初始化時間
        sdf = new SimpleDateFormat("yyyyMMdd");
        //初始化狀態
        ValueStateDescriptor<String> lastVisitDateDesc = new ValueStateDescriptor<>("lastVisitDate", String.class);
        //統計日活dau,狀態資料儲存一天,過一天即失效
        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
        lastVisitDateDesc.enableTimeToLive(stateTtlConfig);
        this.lastVisitDate = getRuntimeContext().getState(lastVisitDateDesc);
​
       }
​
      @Override
      public boolean filter(JSONObject jsonObject) throws Exception {
        //上一個頁面如果有值,則不是首次訪問
        String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
        if(lastPageId != null && lastPageId.length()>0){
          return false;
         }
        //獲取使用者訪問日期
        Long ts = jsonObject.getLong("ts");
        String mid = jsonObject.getJSONObject("common").getString("mid");
        String lastDate = sdf.format(new Date(ts));
        //獲取狀態日期
        String lastDateState = lastVisitDate.value();
        if(lastDateState != null && lastDateState.length()>0 && lastDateState.equals(lastDate)){
          System.out.println(String.format("已訪問! mid:%s,lastDate:%s",mid,lastDate));
          return false;
         }else {
          lastVisitDate.update(lastDate);
          System.out.println(String.format("未訪問! mid:%s,lastDate:%s",mid,lastDate));
          return true;
         }
       }
     });
​
    filterDs.print("filterDs >>>");
​
    try {
      env.execute("task uniqueVisitApp");
     } catch (Exception e) {
      e.printStackTrace();
     }
   }
}

注:1.在測試時,發現uv沒有資料,所以把BaseLogTask任務的側輸出流改一下,如下圖所示:

2.webui模式新增pom依賴

<!-- flink webui -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime-web_2.12</artifactId>
  <version>1.12.0</version>
</dependency>

4. 測試

  • 啟動zk,kafka,logger.sh,hdfs,BaseLogTask,UniqueVisitApp

  • 執行流程

    • 模擬生成的日誌jar >> nginx >> 日誌採集服務 >> kafka(ods) >> baseLogApp(分流) >> kafka(dwd) >> UniqueVisitApp(獨立訪客) >> dwm_unique_visit

經測試,流程已通。