1. 程式人生 > 實用技巧 >圖計算實現ID_Mapping、Oneid打通資料孤島

圖計算實現ID_Mapping、Oneid打通資料孤島

圖計算實現ID_Mapping、Oneid打通資料孤島

ID_Mapping與Oneid的作用

大神告訴我們Oneid能用來做什麼

輸入資料來源格式樣例

樣例資料圖1

整理後資料圖2

實現原理

聯通圖

生成最大聯通圖

留下耀總的資料給大家練習了

當日程式碼生成


import java.util.UUID
import cn.scfl.ebt.util.UtilTool
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
import org.spark_project.jetty.util.StringUtil

/**
  * @Author: baierfa
  * @version: v1.0
  * @description: id_mapping 單天實現暫時不加入多天滾動計算 多天計算需要看另一檔案YeAndTodayGraphx
  * @Date: 2020-07-05 10:24
  */
object TodayGraphx {
  def main(args: Array[String]): Unit = {

    //宣告環境變數
    val spark = SparkSession
      .builder
      .appName(s"${this.getClass.getName}")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext
    val todayPath = "D:\\TESTPATH\\inputpath\\today\\dt=202-07-13"
    val outPutPath="D:\\TESTPATH\\outtpath\\today\\dt=202-07-13"
    val edgeoutPutPath="D:\\TESTPATH\\edgepath\\today\\dt=202-07-13"

   todayIdMapping(spark,sc,todayPath,outPutPath,edgeoutPutPath)
    spark.close()
  }
  
/**
 * 功能描述: <輸入今天資料路徑 按照檔案形式輸出到指定路徑中 並推出今日圖計算點與邊集合總個數>
 * 〈使用今日輸入資料轉換成唯一數字值 圖計算之後再將數值轉換回明文 生成唯一uuid〉
 * @Param: [spark, sc, todayPath, outPutPath, edgeoutPutPath]
 * @Return: void
 * @Author: baierfa
 * @Date: 2020-08-05 10:18
 */
  def todayIdMapping(spark:SparkSession,sc: SparkContext,todayPath: String,outPutPath:String ,edgeoutPutPath:String )={
    //  一、資料載入

    //    今天資料載入
    val todaydf = spark.read.textFile(todayPath)
    //  二、處理資料為生成圖做準備
    //    生成今日點集合
    val to_veritx = todaydf.rdd.flatMap(line => {
    //  將資料來源進行分割
      val field = line.split("\t")
    //把資料轉換成(long,值)要想long值不重複 可以使用hashcode
    //本文用於生產環境 使用了md5加密 詳細檔案請看其他篇章
      for (ele <- field if StringUtil.isNotBlank(ele)&&(!"\\N".equals(ele))) yield (UtilTool.getMD5(ele), ele)
    })
    //    生成今日邊集合
    val to_edges = todaydf.rdd.flatMap(line => {
      //  將資料來源進行分割
      val field = line.split("\t")
      //將資料轉換 將值轉換成邊 用於連線 連線值這邊用""想更換看個人意願
      for (i <- 0 to field.length - 2 if StringUtil.isNotBlank(field(i))&&(!"\\N".equals(field(i)))
           ;j <- i + 1 to field.length - 1 if StringUtil.isNotBlank(field(j))&&(!"\\N".equals(field(j)))) 
           yield Edge(UtilTool.getMD5(field(i)), UtilTool.getMD5(field(j)), "")
    })
//    在資料不做多次etl資料操作下可以使用共同出現次數來判定是否歸併為同一個使用者
//    例如 合併起來使用者 mobile 與 device_id 同時出現兩次以上才被記入同一個
//      .map(edge => (edge, 1))
//      .reduceByKey(_ + _)
//      .filter(tp => tp._2 > 2)
//      .map(tp => tp._1)

    //   三、彙總各個節點使用圖計算生成圖
    // 單將資料重新賦值適用於以後多資料來源合併
    val veritx = to_veritx
    val edges = to_edges
    //   開始使用點集合與邊集合進行圖計算訓練
    val graph = Graph(veritx, edges)
    //   四、生成最大連通圖
    val graph2 = graph.connectedComponents()
    val vertices = graph2.vertices
    //   五、將最小圖計算值替換成uuid
    val uidRdd = vertices.map(tp => (tp._2, tp._1))
      .groupByKey()
      .map(tp => (StringUtil.replace(UUID.randomUUID().toString, "-", ""), tp._2))
    //   對點與邊進行統計作為記錄輸出 可以用於後期統計檢查生成報表警報資料是否異常
    val uu = veritx.map(lin=>("vertices",1)).union(edges.map(lin=>("edges",1))).reduceByKey(_ + _)
      .map(tp=>tp._1+"\t"+tp._2)
    //    將現有的資料轉換成銘文識別後展示
    //    將各個點的資料彙總到driver端
    val idmpMap = veritx.collectAsMap()
    //    按照map方式廣播出去做轉換
    val bc = sc.broadcast(idmpMap)
    //  將資料的id轉換成明文
    val ss = uidRdd.mapPartitions(itemap => {
      val vert_id_map = bc.value
      itemap.map(tp => {
        //從廣播變數中獲取id值的資訊並轉換
        val t2 = for (ele <- tp._2)  yield vert_id_map.get(ele).get
        //按照將要輸出的資料格式進行排版 (uuid   mobile1,mobile2,mobile3,device_id1,device_2)
        tp._1+"\t"+t2.mkString(",")
      })
    })
//  資料輸出
    ss.saveAsTextFile(outPutPath)
    uu.saveAsTextFile(edgeoutPutPath)
  }
}

引用jar包

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.4.0</spark.version>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        
        <dependency>
            <groupId>com.thoughtworks.paranamer</groupId>
            <artifactId>paranamer</artifactId>
            <version>2.8</version>
        </dependency>

    </dependencies>

啟動命令
spark-submit \
--class IdMapping \
--master yarn \
--deploy-mode cluster \
--num-executors 40 \
--driver-memory 4g \
--executor-memory 6g \
--executor-cores 3 \
--conf spark.default.parallelism=400 \
--conf spark.shuffle.memoryFraction=0.3 \
ID_Mapping_Spark.jar \
hdfs://user/hive/oneid_data/data_origindata_di/dt=2020-07-13 \
hdfs://user/hive/oneid_data/data_sink_id_mapping/dt=2020-07-14 \
hdfs://user/hive/oneid_data/data_sink_edge_vertex/dt=2020-07-14

辛苦碼字如有轉載請標明出處謝謝!——拜耳法

都看到這裡了非常感謝!
本片章暫未完結 有疑問請+vx :baierfa

PS:我要在下一章在我心中不完美的你打一個淋漓盡致的標籤

將大神掛在那片白雲下:oneid與使用者標籤之間的相互打通 實現使用者標籤