1. 程式人生 > 其它 >數倉建模—OneID

數倉建模—OneID

今天是我在上海租房的小區被封的第三天,由於我的大意,沒有屯吃的,外賣今天完全點不到了,中午的時候我找到了一包快過期的肉鬆餅,才補充了1000焦耳的能量。但是中午去做核酸的時候,我感覺走路有點不穩,我看到大白的棉籤深入我的嘴裡,我竟然以為是吃的,差點咬住了,還好我有僅存的一點意識。下午我收到女朋友給我點的外賣——麵包(我不知道她是怎麼點到的外賣,我很感動),很精緻的麵包,擱平時我基本不喜歡吃麵包,但是已經到了這個份上,我大口吃起來,竟然覺得這是世界上最好吃的食物了。明天早晨5:50的鬧鐘,去叮咚和美團買菜,看能不能搶幾桶泡麵吧。願神保佑,我暗暗下著決心並祈禱著,胸前畫著十字。。。

資料倉庫系列文章(持續更新)

  1. 數倉架構發展史
  2. 數倉建模方法論
  3. 數倉建模分層理論
  4. 數倉建模—寬表的設計
  5. 數倉建模—指標體系
  6. 資料倉庫之拉鍊表
  7. 數倉—資料整合
  8. 數倉—資料集市
  9. 數倉—商業智慧系統
  10. 數倉—埋點設計與管理
  11. 數倉—ID Mapping
  12. 數倉—OneID
  13. 數倉—AARRR海盜模型
  14. 數倉—匯流排矩陣
  15. 數倉—資料安全
  16. 數倉—資料質量
  17. 數倉—數倉建模和業務建模

關注公眾號:大資料技術派,回覆: 資料,領取1024G資料。

OneID

前面我們學習了ID Mapping,包括ID Mapping 的背景介紹和業務場景,以及如何使用Spark 實現ID Mapping,這個過程中涉及到了很多東西,當然我們都通過文章的形式介紹給大家了,所以你再學習今天這一節之前,可以先看一下前面的文章

  1. Spark實戰—GraphX程式設計指南
  2. 數倉建模—ID Mapping

在上一節我們介紹ID Mapping 的時候我們就說過ID Mapping 是為了打通使用者各個維度的資料,從而消除資料孤島、避免資料歧義,從而更好的刻畫使用者,所以說ID Mapping是手段不是目的,目的是為了打通資料體系,ID Mapping最終的產出就是我們今天的主角OneID,也就是說資料收集過來之後通過ID Mapping 打通,從而產生OneID,這一步之後我們的整個資料體系就將使用OneID作為使用者的ID,這樣我們整個資料體系就得以打通

OneData

開始之前我們先看一下阿里的OneData 資料體系,從而更好認識一下OneID,前面我們說過ID Mapping 只是手段不是目的,目的是為了打通資料體系,ID Mapping最終的產出就是OneID

其實OneID在我們整個資料服務體系中,也只是起點不是終點或者說是手段,我們最終的目的是為了建設統一的資料資產體系。

沒有建設統一的資料資產體系之前,我們的資料體系建設存在下面諸多問題

  1. 資料孤島:各產品、業務的資料相互隔離,難以通過共性ID打通
  2. 重複建設:重複的開發、計算、儲存,帶來高昂的資料成本
  3. 資料歧義:指標定義口徑不一致,造成計算偏差,應用困難

在阿里巴巴 OneData 體系中,OneID 指統一資料萃取,是一套解決資料孤島問題的思想和方法。資料孤島是企業發展到一定階段後普遍遇到的問題。各個部門、業務、產品,各自定義和儲存其資料,使得這些資料間難以關聯,變成孤島一般的存在。

OneID的做法是通過統一的實體識別和連線,打破資料孤島,實現資料通融。簡單來說,使用者、裝置等業務實體,在對應的業務資料中,會被對映為唯一識別(UID)上,其各個維度的資料通過這個UID進行關聯。

各個部門、業務、產品對業務實體的UID的定義和實現不一樣,使得資料間無法直接關聯,成為了資料孤島。基於手機號、身份證、郵箱、裝置ID等資訊,結合業務規則、機器學習、圖演算法等演算法,進行 ID-Mapping,將各種 UID 都對映到統一ID上。通過這個統一ID,便可關聯起各個資料孤島的資料,實現資料通融,以確保業務分析、使用者畫像等資料應用的準確和全面。

OneModel 統一資料構建和管理

將指標定位細化為:

1. 原子指標
2. 時間週期
3. 修飾詞(統計粒度、業務限定, etc)

通過這些定義,設計出各類派生指標 基於資料分層,設計出維度表、明細事實表、彙總事實表,其實我們看到OneModel 其實沒有什麼新的內容,其實就是我們數倉建模的那一套東西

OneService 統一資料服務

OneService 基於複用而不是複製資料的思想,指得是我們的統一的資料服務,因為我們一直再提倡複用,包括我們數倉的建設,但是我們的資料服務這一塊卻是空白,所以OneService核心是服務的複用,能力包括:

  • 利用主題邏輯表遮蔽複雜物理表的主題式資料服務
  • 一般查詢+ OLAP 分析+線上服務的統一且多樣化資料服務
  • 遮蔽多種異構資料來源的跨源資料服務

OneID 統一資料萃取

基於統一的實體識別、連線和標籤生產,實現資料通融,包括:

  • ID自動化識別與連線
  • 行為元素和行為規則
  • 標籤生產

OneID基於超強ID識別技術連結資料,高效生產標籤;業務驅動技術價值化,消除資料孤島,提升資料質量,提升資料價值。

而ID的打通,必須有ID-ID之間的兩兩對映打通關係通過ID對映關係表,才能將多種ID之間的關聯打通,完全孤立的兩種ID是無法打通的

打通整個ID體系,看似簡單,實則計算複雜,計算量非常大。假如某種物件有數億個個體,每個個體又有數十種不同的ID標識,任意兩種ID之間都有可能打通關係,想要完成這類物件的所有個體ID打通需要數億次計算,一般的機器甚至大資料叢集都無法完成。

大資料領域中的ID-Mapping技術就是用機器學習演算法類來取代野蠻計算,解決物件資料打通的問題。基於輸入的ID關係對,利用機器學習演算法做穩定性和收斂性計算,輸出關係穩定的ID關係對,並生成一個UID作為唯一識別該物件的標識碼。

OneID實現過程中存在的問題

前面我們知道我們的ID Mapping 是通過圖計算實現,核心就是連通圖,其實實現OneID我們在打通ID 之後,我們就可以為一個個連通圖生成一個ID, 因為一個連通圖 就代表一個使用者,這樣我們生成的ID就是使用者的OneID,這裡的使用者指的是自然人,而不是某一個平臺上的使用者。

OneID 的生成問題

首先我們需要一個ID 生成演算法,因為我們需要為大量使用者生成ID,我們的ID 要求是唯一的,所以在演算法設計的時候就需要考慮到這一點,我們並不推薦使用UUID,原因是UUID了可能會出現重複,而且UUID 沒有含義,所以我們不推薦使用UUID,我們這裡使用的是MD5 演算法,所以我們的MD5 演算法的引數是我們的圖的標示ID。

OneID 的更新問題

這裡的更新問題主要就是我們的資料每天都在更新,也就是說我們的圖關係在更新,也就是說我們要不要給這個自然人重新生成OneID ,因為他的圖關係可能發生了變化。

其實這裡我們不能為該自然人生成新的OneID ,否則我們數倉裡的歷史資料可能無法關聯使用,所以我們的策略就是如果該自然人已經有OneID了,則不需要重新生成,其實這裡我們就是判斷該圖中的所有的頂點是否存在OneID,我們後面在程式碼中體現著一點。

OneID 的選擇問題

這個和上面的更新問題有點像,上面更新問題我們可以保證一個自然人的OneID不發生變化,但是選擇問題會導致發生變化,但是這個問題是圖計算中無法避免的,我們舉個例子,假設我們有使用者的兩個ID(A_ID,C_ID),但是這兩個ID 在當前是沒有辦法打通的,所以我們就會為這個兩個ID 生成兩個OneID,也就是(A_OneID,B_OneID),所以這個時候我們知道因為ID Mapping 不上,所以我們認為這兩個ID 是兩個人。

後面我們有了另外一個ID(B_ID),這個ID可以分別和其他的兩個ID 打通,也就是B_ID<——>A_ID , B_ID<——>C_ID 這樣我們就打通這個三個ID,這個時候我們知道

這個使用者存在三個ID,並且這個時候已經存在了兩個OneID,所以這個時候我們需要在這兩個OneID中選擇一個作為使用者的OneID,簡單粗暴點就可以選擇最小的或者是最大的。

我們選擇了之後,要將另外一個OneID對應的資料,對應到選擇的OneID 下,否則沒有被選擇的OneID的歷史資料就無法追溯了

OneID 程式碼實現

這個程式碼相比ID Mapping主要是多了OneID 的生成邏輯和更新邏輯 ,需要注意的是關於頂點集合的構造我們不是直接使用字串的hashcode ,這是因為hashcode 很容易重複

object OneID  {
    val spark = SparkSession
      .builder()
      .appName("OneID")
      .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {
    val bizdate=args(0)
    val c = Calendar.getInstance
    val format = new SimpleDateFormat("yyyyMMdd")
    c.setTime(format.parse(bizdate))

    c.add(Calendar.DATE, -1)
    val bizlastdate = format.format(c.getTime)

    println(s" 時間引數  ${bizdate}    ${bizlastdate}")
    // dwd_patient_identity_info_df 就是我們使用者的各個ID ,也就是我們的資料來源
    // 獲取欄位,這樣我們就可以擴充套件新的ID 欄位,但是不用更新程式碼
    val columns = spark.sql(
      s"""
         |select
         |   *
         |from
         |   lezk_dw.dwd_patient_identity_info_df
         |where
         |   ds='${bizdate}'
         |limit
         |   1
         |""".stripMargin)
      .schema.fields.map(f => f.name).filterNot(e=>e.equals("ds")).toList

    // 獲取資料
    val dataFrame = spark.sql(
      s"""
        |select
        |   ${columns.mkString(",")}
        |from
        |   lezk_dw.dwd_patient_identity_info_df
        |where
        |   ds='${bizdate}'
        |""".stripMargin
    )

    // 資料準備
    val data = dataFrame.rdd.map(row => {
      val list = new ListBuffer[String]()
      for (column <- columns) {
        val value = row.getAs[String](column)
        list.append(value)
      }
      list.toList
    })
    import spark.implicits._
    // 頂點集合
    val veritx= data.flatMap(list => {
      for (i <- 0 until columns.length if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i))))
        yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i))

    }).distinct

    val veritxDF=veritx.toDF("id_hashcode","id")
    veritxDF.createOrReplaceTempView("veritx")

    // 生成邊的集合
    val edges = data.flatMap(list => {
      for (i <- 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i)))
           ; j <- i + 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) && (!"null".equals(list(j))))
      yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, "")
    }).distinct


    // 開始使用點集合與邊集合進行圖計算訓練
    val graph = Graph(veritx, edges)
    val connectedGraph=graph.connectedComponents()

    // 連通節點
    val  vertices = connectedGraph.vertices.toDF("id_hashcode","guid_hashcode")
    vertices.createOrReplaceTempView("to_graph")

    // 載入昨日的oneid 資料 (oneid,id,id_hashcode) 
    val ye_oneid = spark.sql(
      s"""
        |select
        |   oneid,id,id_hashcode
        |from
        |   lezk_dw.dwd_patient_oneid_info_df
        |where
        |   ds='${bizlastdate}'
        |""".stripMargin
    )
    ye_oneid.createOrReplaceTempView("ye_oneid")

    // 關聯獲取 已經存在的 oneid,這裡的min 函式就是我們說的oneid 的選擇問題
    val exists_oneid=spark.sql(
      """
        |select
        |   a.guid_hashcode,min(b.oneid) as oneid
        |from
        |   to_graph a
        |inner join
        |   ye_oneid b
        |on
        |   a.id_hashcode=b.id_hashcode
        |group by
        |   a.guid_hashcode
        |""".stripMargin
    )
    exists_oneid.createOrReplaceTempView("exists_oneid")
    // 不存在則生成 存在則取已有的 這裡nvl 就是oneid  的更新邏輯,存在則獲取 不存在則生成
    val today_oneid=spark.sql(
      s"""
        |insert overwrite table dwd_patient_oneid_info_df partition(ds='${bizdate}')
        |select
        |   nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode
        |from
        |   to_graph a
        |left join
        |   exists_oneid b
        |on
        |   a.guid_hashcode=b.guid_hashcode
        |left join
        |   veritx c
        |on
        |   a.id_hashcode=c.id_hashcode
        |left join
        |   veritx d
        |on
        |   a.guid_hashcode=d.id_hashcode
        |""".stripMargin
    )
    sc.stop
  }

}

這個程式碼中我們使用了SparkSQL,其實你如果更加擅長RDD的API,也可以使用RDD 優化,需要注意的是網上的很多程式碼中使用了廣播變數,將vertices 變數廣播了出去,其實這個時候存在一個風險那就是如果你的vertices 變數非常大,你廣播的時候存在OOM 的風險,但是如果你使用了SparkSQL的話,Spark 就會根據實際的情況,幫你自動優化。

優化點 增量優化

我們看到我們每次都是全量的圖,其實我們可以將我們的OneID 表載入進來,然後將我們的增量資料和已有的圖資料進行合併,然後再去生成圖

val veritx = ye_veritx.union(to_veritx)
val edges = ye_edges.union(to_edges)

val graph = Graph(veritx, edges)

總結

  1. ID MappingOneID 的提前,OneIDID Mapping 的結果,所以要想做OneID必須先做ID Mapping;
  2. OneID 是為了打通整個資料體系的資料,所以OneID 需要以服務的方式對外提供服務,在數倉裡面就是作為基礎表使用,對外的話我們就需要提供介面對外提供服務