1. 程式人生 > 其它 >數倉常用表設計與實現

數倉常用表設計與實現

一:di表、ds表

           di表多用於事實表,例如:從資料庫抽取的交易記錄表,購買詳單表等

           實現邏輯較為簡單,一般直接ETL即可。

            ds表一般就是簡單匯聚,例如:使用者每天使用的pkg的數量表,一般直接group by 就行。

二:dd表,全量表

            dd表多用於記錄每日的全量狀態表例如:使用者賬號密碼錶、使用者通訊錄表

            實現邏輯一般為:取今天上報的全量的資料覆蓋昨天表中的資料,完成更新

            SQL例子:

            insert into table dwd.dwd_user_info_dd(dt='20220503')
            select coalesce(b.user_id,a.user_id) as user_id,coalesce(b.username,a.username) as username,coalesce(b.passsword,a.passsword) as passsword from
            (select user_id,username,passsword from ods.ods_user_info_di where dt='20220503')a full join
            (select user_id,username,passsword from dwd.dwd_user_info_dd where dt='20220502')b on a.user_id = b.user_id;

三:trace_dd表,軌跡表

            trace_dd表多用於記錄活躍軌跡:使用者活躍軌跡表(一張表記錄用戶從歷史到現在所有的活躍軌跡)

            實現邏輯一般為:一個欄位記錄01串,表示使用者是否活躍,一般附加兩個欄位,初始活躍日期和最近活躍日期

           SQL例子:

           insert overwrite table dws.dws_user_active_trace_dd(dt='20220504')
           select coalesce(a.user_id,b.user_id) as user_id,if(b.user_id is not null,init_date,'20220504') as init_date,
                      if(a.user_id is not null,'20220504',init_date) as active_date,
                      if(a.user_id is not null and b.user_id is not null,concat(active_trace,'1'),
                      if(a.user_id is not null and b.user_id is null,'1',concat(active_trace,'0'))) as active_trace from
           (select user_id from dwd.dwd_user_active_di)a full join
           (select user_id,init_date,active_date,active_trace from dws.dws_user_active_trace_dd)b on a.user_id = b.user_id;

四:周表、雙週表、月表

            周表,雙週表,月表:一般有最近一週兩週月表、累計一週兩週月表兩種形式。

            實現邏輯一般為:周表一般為直接計算7天的資料,雙週表一般為單週表+計算7天的資料,月表一般為三週表+最近7天的資料(具體實現為一個公共函式,對外提供計算口徑及方法)

            例子:使用者pkg的使用時長表

            

object parseDate {

  //輸出三個欄位
  def deal3days(dealdate:String) = {
    import java.text.SimpleDateFormat
    val simpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val formatdate = simpleDateFormat.parse(dealdate)
    if(formatdate.getTime/1000/60/60/24%3 ==0){
      import java.util.Calendar
      val calendar = Calendar.getInstance
      calendar.setTime(formatdate)
      calendar.add(Calendar.DAY_OF_YEAR,-2)
      "true"+"|"+simpleDateFormat.format(calendar.getTime)+"|"+dealdate
    }
    else {
      "false"+"||"
    }
  }
  //第二個欄位表示有一組三天的日期還是兩組三天的日期
  def dealweek(dealdate:String) = {
    import java.text.SimpleDateFormat
    import java.util.Calendar
    val calendar = Calendar.getInstance
    val simpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val formatdate = simpleDateFormat.parse(dealdate)
    calendar.setTime(formatdate)

    if(calendar.get(Calendar.DAY_OF_WEEK) == 1){
      val datethree = Array("","","","")
      var datethreeindex = 0
      while(calendar.getTimeInMillis/1000/60/60/24%3 !=0){
        datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
        datethreeindex = datethreeindex+1
        calendar.add(Calendar.DAY_OF_YEAR,-1)
      }
      val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期
      calendar.add(Calendar.DAY_OF_YEAR,-2)
      val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
      calendar.add(Calendar.DAY_OF_YEAR,-1)

      if(datethreeindex == 2){
        while(calendar.get(Calendar.DAY_OF_WEEK) != 1){  //拿到週日期的起點
          datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
          datethreeindex = datethreeindex+1
          calendar.add(Calendar.DAY_OF_YEAR,-1)
        }
        "true"+"|"+1+"|"+first3daystart+"|"+first3daysend+"|"+datethree(0)+"|"+datethree(1)+"|"+datethree(2)+"|"+datethree(3)
      }
      else{
        val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
        calendar.add(Calendar.DAY_OF_YEAR,-2)
        val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
        while(calendar.get(Calendar.DAY_OF_WEEK) != 1){  //拿到週日期的起點
          calendar.add(Calendar.DAY_OF_YEAR,-1)
          datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
          datethreeindex = datethreeindex+1
        }
        "true"+"|"+2+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0)
      }
    }
    else {
      "false"+"||||||"
    }
  }
  //第二個欄位表示有一組三天的日期還是兩組三天的日期
  def dealastweek(dealdate:String) = {
    import java.text.SimpleDateFormat
    import java.util.Calendar
    val calendar = Calendar.getInstance
    val simpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val formatdate = simpleDateFormat.parse(dealdate)
    calendar.setTime(formatdate)

    val datethree = Array("","","","")
    var datethreeindex = 0
    while(calendar.getTimeInMillis/1000/60/60/24%3 !=0){
      datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
      datethreeindex = datethreeindex+1
      calendar.add(Calendar.DAY_OF_YEAR,-1)
    }
    val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期
    calendar.add(Calendar.DAY_OF_YEAR,-2)
    val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
    calendar.add(Calendar.DAY_OF_YEAR,-1)

    if(datethreeindex == 2){
      while(calendar.get(Calendar.DAY_OF_WEEK) != 1){  //拿到週日期的起點
        datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
        datethreeindex = datethreeindex+1
        calendar.add(Calendar.DAY_OF_YEAR,-1)
      }
      dealdate+"true"+"|"+1+"|"+first3daystart+"|"+first3daysend+"|"+datethree(0)+"|"+datethree(1)+"|"+datethree(2)+"|"+datethree(3)
    }
    else{
      val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
      calendar.add(Calendar.DAY_OF_YEAR,-2)
      val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
      while(calendar.get(Calendar.DAY_OF_WEEK) != 1){  //拿到週日期的起點
        calendar.add(Calendar.DAY_OF_YEAR,-1)
        datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
        datethreeindex = datethreeindex+1
      }
      dealdate+"true"+"|"+2+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0)
    }
  }
  def dealtwoweek(dealdate:String) = {
    import java.text.SimpleDateFormat
    import java.util.Calendar
    val calendar = Calendar.getInstance
    val simpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val formatdate = simpleDateFormat.parse(dealdate)
    calendar.setTime(formatdate)

    if(calendar.get(Calendar.DAY_OF_WEEK_IN_MONTH)%2 == 0 && calendar.get(Calendar.DAY_OF_WEEK)==1){
      val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
      calendar.add(Calendar.DAY_OF_YEAR,-6)
      val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

      calendar.add(Calendar.DAY_OF_YEAR,-1)
      val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期
      calendar.add(Calendar.DAY_OF_YEAR,-6)
      val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

      "true"+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend
    }
    else{
      "false"+"||||"
    }
  }
  def dealastwoweek(dealdate:String) = {
    import java.text.SimpleDateFormat
    import java.util.Calendar
    val calendar = Calendar.getInstance
    val simpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val formatdate = simpleDateFormat.parse(dealdate)
    calendar.setTime(formatdate)

    val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
    calendar.add(Calendar.DAY_OF_YEAR,-6)
    val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

    calendar.add(Calendar.DAY_OF_YEAR,-1)
    val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期
    calendar.add(Calendar.DAY_OF_YEAR,-6)
    val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

    "true"+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend
  }
  def dealmonth(dealdate:String) = {
    import java.text.SimpleDateFormat
    import java.util.Calendar
    val calendar = Calendar.getInstance
    val simpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val formatdate = simpleDateFormat.parse(dealdate)
    calendar.setTime(formatdate)

    val calendarcopy = Calendar.getInstance
    calendarcopy.setTime(formatdate)
    calendarcopy.add(Calendar.DAY_OF_YEAR,1)

    if(calendarcopy.get(Calendar.DAY_OF_MONTH)==1){
      val datethree = Array("","","")
      var datethreeindex = 0
      while(calendar.get(Calendar.DAY_OF_WEEK_IN_MONTH)%2 != 0 && calendar.get(Calendar.DAY_OF_WEEK)!=1){
        datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
        datethreeindex = datethreeindex+1
        calendar.add(Calendar.DAY_OF_YEAR,-1)
      }
      val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
      calendar.add(Calendar.DAY_OF_YEAR,-13)
      val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

      calendar.add(Calendar.DAY_OF_YEAR,-1)
      val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期
      calendar.add(Calendar.DAY_OF_YEAR,-13)
      val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

      while(calendar.get(Calendar.DAY_OF_MONTH)!=1 ){
        calendar.add(Calendar.DAY_OF_YEAR,-1)
        datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
        datethreeindex = datethreeindex+1
      }
      "true"+"|"+datethreeindex+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0)+"|"+datethree(1)+"|"+datethree(2)
    }
    else{
      "false"+"||||||||"
    }

  }
  def dealastmonth(dealdate:String) = {
    import java.text.SimpleDateFormat
    import java.util.Calendar
    val calendar = Calendar.getInstance
    val simpleDateFormat = new SimpleDateFormat("yyyyMMdd")
    val formatdate = simpleDateFormat.parse(dealdate)
    calendar.setTime(formatdate)

    val calendarcopy = Calendar.getInstance
    calendarcopy.setTime(formatdate)
    calendarcopy.add(Calendar.DAY_OF_YEAR,1)

    val datethree = Array("","")
    var datethreeindex = 0
    val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期
    calendar.add(Calendar.DAY_OF_YEAR,-13)
    val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

    calendar.add(Calendar.DAY_OF_YEAR,-1)
    val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期
    calendar.add(Calendar.DAY_OF_YEAR,-13)
    val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期

    while(datethreeindex <2){
      calendar.add(Calendar.DAY_OF_YEAR,-1)
      datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime))
      datethreeindex = datethreeindex+1
    }
    "true"+"|"+datethreeindex+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0)+"|"+datethree(1)
  }
  def main(args: Array[String]):Unit= {
    val dealtype = 7
//    val dealdatelist = Array("20220109","20220116","20220123","20220130")
    val dealdatelist = Array("20220227","20220228","20220330","20220331")

    for(dealdate <- dealdatelist) {
      dealtype match{
        case 1 => println(deal3days(dealdate))
        case 2 => println(dealweek(dealdate))
        case 3 => println(dealastweek(dealdate))
        case 4 => println(dealtwoweek(dealdate))
        case 5 => println(dealastwoweek(dealdate))
        case 6 => println(dealmonth(dealdate))
        case 7 => println(dealastmonth(dealdate))
      }
    }
  }
}

 

            SQL例子:

                        天表:(每天執行)
                        insert overwrite table dws.dws_user_usedur_ds(dt='20220109')
                        select user_id,sum(usedur) as usedur from dwd.dwd_user_usedur_di where dt='20220109' group by user_id;

                        三天表:(每隔三天執行)
                        System.currentTimeMillis/1000/60/60/24%3 ==0
                        insert overwrite table dws.dws_user_usedur_4ds(dt='20220103_20220105')
                        select user_id,sum(usedur) as usedur from dws.dws_user_usedur_ds where dt>='20220103' and dt<='20220105' group by user_id;
                        insert overwrite table dws.dws_user_usedur_4ds(dt='20220106_20220108')
                        select user_id,sum(usedur) as usedur from dws.dws_user_usedur_ds where dt>='20220106' and dt<='20220108' group by user_id;

                        周表:(每週日執行,依賴三天表天表): 
                        insert overwrite table dws.dws_user_usedur_ws(dt='20220103_20220109')
                        select user_id,sum(usedur) as usedur from (
                                select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220103_20220105'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220106_20220108'
                                union all
                                select user_id,usedur from dwd.dwd_user_usedur_di where dt='20220109'
                        )t group by user_id;
                        雙週表:(每雙週日執行,依賴周表):
                        insert overwrite table dws.dws_user_usedur_2ws(dt='20220103_20220116')
                        select user_id,sum(usedur) as usedur from (
                                select user_id,usedur from dws_user_usedur_ws where dt='20220103_20220109'
                                union all
                                select user_id,usedur from dws_user_usedur_ws where dt='20220110_20220116'
                        )t group by user_id;
                        月表:(每月底執行,依賴雙週表天表):
                        insert overwrite table dws.dws_user_usedur_ms(dt='20220101_20220131')
                        select user_id,sum(usedur) as usedur from (
                                select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220103_20220116'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220117_20220130'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_ds where dt='20220101'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_ds where dt='20220103'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_ds where dt='20220131'
                        )t group by user_id;

 

                        周表:(每天執行,最近七天資料,依賴三天表天表):
                        insert overwrite table dws.dws_user_usedur_ws(dt='20220102_20220108')
                        select user_id,sum(usedur) as usedur from (
                                select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220103_20220105'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220106_20220108'
                                union all
                                select user_id,usedur from dwd.dwd_user_usedur_di where dt='20220102'
                        )t group by user_id;
                        雙週表:(每天執行,最近十四天資料,依賴周表):
                        insert overwrite table dws.dws_user_usedur_2ws(dt='20220104_20220117')
                        select user_id,sum(usedur) as usedur from (
                                select user_id,usedur from dws.dws_user_usedur_ws where dt='20220104_20220110'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_ws where dt='20220111_20220117'
                        )t group by user_id;
                        月表:(每天執行,最近三十天資料,依賴雙週表天表):
                        insert overwrite table dws.dws_user_usedur_ms(dt='20220103_20220201')
                        select user_id,sum(usedur) as usedur from (
                                select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220103_20220116'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220117_20220130'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_ds where dt='20220131'
                                union all
                                select user_id,usedur from dws.dws_user_usedur_ds where dt='20220201'
                        )t group by user_id;

四:複雜型別操作:相加表

            一般的表會有複雜型別的資料,例如maplist,部分表會存在將map合併或者list相加減的表。

            實現邏輯一般為:建立UDAF

package com.transsion.bigdata.aggregate

import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, MapType, StringType, StructField, StructType,ArrayType}

import scala.collection.mutable

class AggList extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = StructType(StructField("input",ArrayType(StringType)):: Nil)

  // 快取區資料結構
  override def bufferSchema: StructType = StructType(StructField("buffer",ArrayType(StringType)):: Nil)

  // 聚合函式返回值資料結構
  override def dataType: DataType = ArrayType(StringType)

  // 聚合函式是否是冪等的,即相同輸入是否總是能得到相同輸出
  override def deterministic: Boolean = true

  // 初始化緩衝區
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new java.util.ArrayList[String]()
  }

  // 給聚合函式傳入一條新資料進行處理
  //傳入欄位做字串反轉
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    agg(buffer,input)
  }

  // 合併聚合函式緩衝區
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    agg(buffer1,buffer2)
  }

  // 計算最終結果
  override def evaluate(buffer: Row) = buffer.getList[String](0)

  def agg(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    val bufferdata:java.util.List[String] =new java.util.ArrayList[String]()
    bufferdata.addAll(buffer1.getList[String](0))
    if(buffer2.get(0) != null){
      bufferdata.removeAll(buffer2.getList[String](0))
      bufferdata.addAll(buffer2.getList[String](0))
    }
    bufferdata.remove("")

    buffer1.update(0,bufferdata)

    //      println("bufferdata:"+bufferdata)
    //      println()

  }
}

  

package com.transsion.bigdata.aggregate

import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.MapType

import scala.collection.mutable

class AggMap extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(StructField("input",MapType(StringType,StringType)):: Nil)

  // 快取區資料結構
  override def bufferSchema: StructType = StructType(StructField("buffer",MapType(StringType,StringType)):: Nil)

  // 聚合函式返回值資料結構
  override def dataType: DataType = MapType(StringType,StringType)

  // 聚合函式是否是冪等的,即相同輸入是否總是能得到相同輸出
  override def deterministic: Boolean = true

  // 初始化緩衝區
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = mutable.Map()
  }

  // 給聚合函式傳入一條新資料進行處理
  //傳入欄位做字串反轉
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    evaluate(buffer,input)
  }

  // 合併聚合函式緩衝區
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    evaluate(buffer1,buffer2)
  }

  // 計算最終結果
  override def evaluate(buffer: Row) = buffer.getMap[String,String](0)

  def evaluate(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    //    println("-----buffer1.size === "+buffer1.size)
    //    println("-----buffer2.size === "+buffer2.size)

    val bufferdata = buffer1.getMap[String,String](0)
    val inputdata = buffer2.getMap[String,String](0)
    if(inputdata != null){
      val keyset = bufferdata.keySet.toList.++:(inputdata.keySet.toList).distinct
      //    println("-----keySet === "+bufferdata.keySet.toList.++:(inputdata.keySet.toList).distinct)
      //    println("-----bufferdata|inputdata === "+bufferdata+"|"+inputdata)
      var finalMap:mutable.Map[String,String] =mutable.Map[String,String]()
      for (key <- keyset){
        if(!key.equals("")){
          //      println("-----key:"+(key.equals("")))
          //      if(bufferdata.contains(key) && inputdata.contains(key) )
          //        println("-----bufferdata + inputdata === " + bufferdata.apply(key).toString.toDouble + inputdata.apply(key).toString.toDouble)
          //      else if(bufferdata.contains(key))
          //        println("-----bufferdata === "+bufferdata.apply(key).toString.toDouble)
          //      else if(inputdata.contains(key)) println("-----inputdata === "+inputdata.apply(key).toString.toDouble)

          val values = {if(bufferdata.contains(key) && inputdata.contains(key) )   bufferdata.apply(key).toString.toDouble + inputdata.apply(key).toString.toDouble
          else if(bufferdata.contains(key)) bufferdata.apply(key).toString.toDouble
          else if(inputdata.contains(key)) inputdata.apply(key).toString.toDouble }
          finalMap.put(key,values.toString)
        }
      }
      //    println("-----finalMap.toMap === "+ finalMap.toMap)
      //    println()
      buffer1.update(0,finalMap.toMap)
    }
  }
}