數倉常用表設計與實現
一:di表、ds表
di表多用於事實表,例如:從資料庫抽取的交易記錄表,購買詳單表等
實現邏輯較為簡單,一般直接ETL即可。
ds表一般就是簡單匯聚,例如:使用者每天使用的pkg的數量表,一般直接group by 就行。
二:dd表,全量表
dd表多用於記錄每日的全量狀態表例如:使用者賬號密碼錶、使用者通訊錄表
實現邏輯一般為:取今天上報的全量的資料覆蓋昨天表中的資料,完成更新
SQL例子:
insert into table dwd.dwd_user_info_dd(dt='20220503')
select coalesce(b.user_id,a.user_id) as user_id,coalesce(b.username,a.username) as username,coalesce(b.passsword,a.passsword) as passsword from
(select user_id,username,passsword from ods.ods_user_info_di where dt='20220503')a full join
(select user_id,username,passsword from dwd.dwd_user_info_dd where dt='20220502')b on a.user_id = b.user_id;
三:trace_dd表,軌跡表
trace_dd表多用於記錄活躍軌跡:使用者活躍軌跡表(一張表記錄用戶從歷史到現在所有的活躍軌跡)
實現邏輯一般為:一個欄位記錄01串,表示使用者是否活躍,一般附加兩個欄位,初始活躍日期和最近活躍日期
SQL例子:
insert overwrite table dws.dws_user_active_trace_dd(dt='20220504')
select coalesce(a.user_id,b.user_id) as user_id,if(b.user_id is not null,init_date,'20220504') as init_date,
if(a.user_id is not null,'20220504',init_date) as active_date,
if(a.user_id is not null and b.user_id is not null,concat(active_trace,'1'),
if(a.user_id is not null and b.user_id is null,'1',concat(active_trace,'0'))) as active_trace from
(select user_id from dwd.dwd_user_active_di)a full join
(select user_id,init_date,active_date,active_trace from dws.dws_user_active_trace_dd)b on a.user_id = b.user_id;
四:周表、雙週表、月表
周表,雙週表,月表:一般有最近一週兩週月表、累計一週兩週月表兩種形式。
實現邏輯一般為:周表一般為直接計算7天的資料,雙週表一般為單週表+計算7天的資料,月表一般為三週表+最近7天的資料(具體實現為一個公共函式,對外提供計算口徑及方法)
例子:使用者pkg的使用時長表
object parseDate { //輸出三個欄位 def deal3days(dealdate:String) = { import java.text.SimpleDateFormat val simpleDateFormat = new SimpleDateFormat("yyyyMMdd") val formatdate = simpleDateFormat.parse(dealdate) if(formatdate.getTime/1000/60/60/24%3 ==0){ import java.util.Calendar val calendar = Calendar.getInstance calendar.setTime(formatdate) calendar.add(Calendar.DAY_OF_YEAR,-2) "true"+"|"+simpleDateFormat.format(calendar.getTime)+"|"+dealdate } else { "false"+"||" } } //第二個欄位表示有一組三天的日期還是兩組三天的日期 def dealweek(dealdate:String) = { import java.text.SimpleDateFormat import java.util.Calendar val calendar = Calendar.getInstance val simpleDateFormat = new SimpleDateFormat("yyyyMMdd") val formatdate = simpleDateFormat.parse(dealdate) calendar.setTime(formatdate) if(calendar.get(Calendar.DAY_OF_WEEK) == 1){ val datethree = Array("","","","") var datethreeindex = 0 while(calendar.getTimeInMillis/1000/60/60/24%3 !=0){ datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 calendar.add(Calendar.DAY_OF_YEAR,-1) } val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期 calendar.add(Calendar.DAY_OF_YEAR,-2) val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-1) if(datethreeindex == 2){ while(calendar.get(Calendar.DAY_OF_WEEK) != 1){ //拿到週日期的起點 datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 calendar.add(Calendar.DAY_OF_YEAR,-1) } "true"+"|"+1+"|"+first3daystart+"|"+first3daysend+"|"+datethree(0)+"|"+datethree(1)+"|"+datethree(2)+"|"+datethree(3) } else{ val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-2) val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 while(calendar.get(Calendar.DAY_OF_WEEK) != 1){ //拿到週日期的起點 calendar.add(Calendar.DAY_OF_YEAR,-1) datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 } "true"+"|"+2+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0) } } else { "false"+"||||||" } } //第二個欄位表示有一組三天的日期還是兩組三天的日期 def dealastweek(dealdate:String) = { import java.text.SimpleDateFormat import java.util.Calendar val calendar = Calendar.getInstance val simpleDateFormat = new SimpleDateFormat("yyyyMMdd") val formatdate = simpleDateFormat.parse(dealdate) calendar.setTime(formatdate) val datethree = Array("","","","") var datethreeindex = 0 while(calendar.getTimeInMillis/1000/60/60/24%3 !=0){ datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 calendar.add(Calendar.DAY_OF_YEAR,-1) } val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期 calendar.add(Calendar.DAY_OF_YEAR,-2) val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-1) if(datethreeindex == 2){ while(calendar.get(Calendar.DAY_OF_WEEK) != 1){ //拿到週日期的起點 datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 calendar.add(Calendar.DAY_OF_YEAR,-1) } dealdate+"true"+"|"+1+"|"+first3daystart+"|"+first3daysend+"|"+datethree(0)+"|"+datethree(1)+"|"+datethree(2)+"|"+datethree(3) } else{ val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-2) val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 while(calendar.get(Calendar.DAY_OF_WEEK) != 1){ //拿到週日期的起點 calendar.add(Calendar.DAY_OF_YEAR,-1) datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 } dealdate+"true"+"|"+2+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0) } } def dealtwoweek(dealdate:String) = { import java.text.SimpleDateFormat import java.util.Calendar val calendar = Calendar.getInstance val simpleDateFormat = new SimpleDateFormat("yyyyMMdd") val formatdate = simpleDateFormat.parse(dealdate) calendar.setTime(formatdate) if(calendar.get(Calendar.DAY_OF_WEEK_IN_MONTH)%2 == 0 && calendar.get(Calendar.DAY_OF_WEEK)==1){ val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-6) val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-1) val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期 calendar.add(Calendar.DAY_OF_YEAR,-6) val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 "true"+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend } else{ "false"+"||||" } } def dealastwoweek(dealdate:String) = { import java.text.SimpleDateFormat import java.util.Calendar val calendar = Calendar.getInstance val simpleDateFormat = new SimpleDateFormat("yyyyMMdd") val formatdate = simpleDateFormat.parse(dealdate) calendar.setTime(formatdate) val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-6) val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-1) val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期 calendar.add(Calendar.DAY_OF_YEAR,-6) val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 "true"+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend } def dealmonth(dealdate:String) = { import java.text.SimpleDateFormat import java.util.Calendar val calendar = Calendar.getInstance val simpleDateFormat = new SimpleDateFormat("yyyyMMdd") val formatdate = simpleDateFormat.parse(dealdate) calendar.setTime(formatdate) val calendarcopy = Calendar.getInstance calendarcopy.setTime(formatdate) calendarcopy.add(Calendar.DAY_OF_YEAR,1) if(calendarcopy.get(Calendar.DAY_OF_MONTH)==1){ val datethree = Array("","","") var datethreeindex = 0 while(calendar.get(Calendar.DAY_OF_WEEK_IN_MONTH)%2 != 0 && calendar.get(Calendar.DAY_OF_WEEK)!=1){ datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 calendar.add(Calendar.DAY_OF_YEAR,-1) } val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-13) val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-1) val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期 calendar.add(Calendar.DAY_OF_YEAR,-13) val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 while(calendar.get(Calendar.DAY_OF_MONTH)!=1 ){ calendar.add(Calendar.DAY_OF_YEAR,-1) datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 } "true"+"|"+datethreeindex+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0)+"|"+datethree(1)+"|"+datethree(2) } else{ "false"+"||||||||" } } def dealastmonth(dealdate:String) = { import java.text.SimpleDateFormat import java.util.Calendar val calendar = Calendar.getInstance val simpleDateFormat = new SimpleDateFormat("yyyyMMdd") val formatdate = simpleDateFormat.parse(dealdate) calendar.setTime(formatdate) val calendarcopy = Calendar.getInstance calendarcopy.setTime(formatdate) calendarcopy.add(Calendar.DAY_OF_YEAR,1) val datethree = Array("","") var datethreeindex = 0 val second3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-13) val second3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 calendar.add(Calendar.DAY_OF_YEAR,-1) val first3daysend = simpleDateFormat.format(calendar.getTime)//提取3days的字尾日期 calendar.add(Calendar.DAY_OF_YEAR,-13) val first3daystart = simpleDateFormat.format(calendar.getTime)//提取3days的字首日期 while(datethreeindex <2){ calendar.add(Calendar.DAY_OF_YEAR,-1) datethree.update(datethreeindex,simpleDateFormat.format(calendar.getTime)) datethreeindex = datethreeindex+1 } "true"+"|"+datethreeindex+"|"+first3daystart+"|"+first3daysend+"|"+second3daystart+"|"+second3daysend+"|"+datethree(0)+"|"+datethree(1) } def main(args: Array[String]):Unit= { val dealtype = 7 // val dealdatelist = Array("20220109","20220116","20220123","20220130") val dealdatelist = Array("20220227","20220228","20220330","20220331") for(dealdate <- dealdatelist) { dealtype match{ case 1 => println(deal3days(dealdate)) case 2 => println(dealweek(dealdate)) case 3 => println(dealastweek(dealdate)) case 4 => println(dealtwoweek(dealdate)) case 5 => println(dealastwoweek(dealdate)) case 6 => println(dealmonth(dealdate)) case 7 => println(dealastmonth(dealdate)) } } } }
SQL例子:
天表:(每天執行)
insert overwrite table dws.dws_user_usedur_ds(dt='20220109')
select user_id,sum(usedur) as usedur from dwd.dwd_user_usedur_di where dt='20220109' group by user_id;
三天表:(每隔三天執行)
System.currentTimeMillis/1000/60/60/24%3 ==0
insert overwrite table dws.dws_user_usedur_4ds(dt='20220103_20220105')
select user_id,sum(usedur) as usedur from dws.dws_user_usedur_ds where dt>='20220103' and dt<='20220105' group by user_id;
insert overwrite table dws.dws_user_usedur_4ds(dt='20220106_20220108')
select user_id,sum(usedur) as usedur from dws.dws_user_usedur_ds where dt>='20220106' and dt<='20220108' group by user_id;
周表:(每週日執行,依賴三天表天表):
insert overwrite table dws.dws_user_usedur_ws(dt='20220103_20220109')
select user_id,sum(usedur) as usedur from (
select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220103_20220105'
union all
select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220106_20220108'
union all
select user_id,usedur from dwd.dwd_user_usedur_di where dt='20220109'
)t group by user_id;
雙週表:(每雙週日執行,依賴周表):
insert overwrite table dws.dws_user_usedur_2ws(dt='20220103_20220116')
select user_id,sum(usedur) as usedur from (
select user_id,usedur from dws_user_usedur_ws where dt='20220103_20220109'
union all
select user_id,usedur from dws_user_usedur_ws where dt='20220110_20220116'
)t group by user_id;
月表:(每月底執行,依賴雙週表天表):
insert overwrite table dws.dws_user_usedur_ms(dt='20220101_20220131')
select user_id,sum(usedur) as usedur from (
select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220103_20220116'
union all
select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220117_20220130'
union all
select user_id,usedur from dws.dws_user_usedur_ds where dt='20220101'
union all
select user_id,usedur from dws.dws_user_usedur_ds where dt='20220103'
union all
select user_id,usedur from dws.dws_user_usedur_ds where dt='20220131'
)t group by user_id;
周表:(每天執行,最近七天資料,依賴三天表天表):
insert overwrite table dws.dws_user_usedur_ws(dt='20220102_20220108')
select user_id,sum(usedur) as usedur from (
select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220103_20220105'
union all
select user_id,usedur from dws.dws_user_usedur_4ds where dt='20220106_20220108'
union all
select user_id,usedur from dwd.dwd_user_usedur_di where dt='20220102'
)t group by user_id;
雙週表:(每天執行,最近十四天資料,依賴周表):
insert overwrite table dws.dws_user_usedur_2ws(dt='20220104_20220117')
select user_id,sum(usedur) as usedur from (
select user_id,usedur from dws.dws_user_usedur_ws where dt='20220104_20220110'
union all
select user_id,usedur from dws.dws_user_usedur_ws where dt='20220111_20220117'
)t group by user_id;
月表:(每天執行,最近三十天資料,依賴雙週表天表):
insert overwrite table dws.dws_user_usedur_ms(dt='20220103_20220201')
select user_id,sum(usedur) as usedur from (
select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220103_20220116'
union all
select user_id,usedur from dws.dws_user_usedur_2ws where dt='20220117_20220130'
union all
select user_id,usedur from dws.dws_user_usedur_ds where dt='20220131'
union all
select user_id,usedur from dws.dws_user_usedur_ds where dt='20220201'
)t group by user_id;
四:複雜型別操作:相加表
一般的表會有複雜型別的資料,例如maplist,部分表會存在將map合併或者list相加減的表。
實現邏輯一般為:建立UDAF
package com.transsion.bigdata.aggregate import org.apache.hadoop.hive.ql.exec.UDF import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, MapType, StringType, StructField, StructType,ArrayType} import scala.collection.mutable class AggList extends UserDefinedAggregateFunction { override def inputSchema: StructType = StructType(StructField("input",ArrayType(StringType)):: Nil) // 快取區資料結構 override def bufferSchema: StructType = StructType(StructField("buffer",ArrayType(StringType)):: Nil) // 聚合函式返回值資料結構 override def dataType: DataType = ArrayType(StringType) // 聚合函式是否是冪等的,即相同輸入是否總是能得到相同輸出 override def deterministic: Boolean = true // 初始化緩衝區 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = new java.util.ArrayList[String]() } // 給聚合函式傳入一條新資料進行處理 //傳入欄位做字串反轉 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { agg(buffer,input) } // 合併聚合函式緩衝區 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { agg(buffer1,buffer2) } // 計算最終結果 override def evaluate(buffer: Row) = buffer.getList[String](0) def agg(buffer1: MutableAggregationBuffer, buffer2: Row) = { val bufferdata:java.util.List[String] =new java.util.ArrayList[String]() bufferdata.addAll(buffer1.getList[String](0)) if(buffer2.get(0) != null){ bufferdata.removeAll(buffer2.getList[String](0)) bufferdata.addAll(buffer2.getList[String](0)) } bufferdata.remove("") buffer1.update(0,bufferdata) // println("bufferdata:"+bufferdata) // println() } }
package com.transsion.bigdata.aggregate import org.apache.hadoop.hive.ql.exec.UDF import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ import org.apache.spark.sql.types.MapType import scala.collection.mutable class AggMap extends UserDefinedAggregateFunction { override def inputSchema: StructType = StructType(StructField("input",MapType(StringType,StringType)):: Nil) // 快取區資料結構 override def bufferSchema: StructType = StructType(StructField("buffer",MapType(StringType,StringType)):: Nil) // 聚合函式返回值資料結構 override def dataType: DataType = MapType(StringType,StringType) // 聚合函式是否是冪等的,即相同輸入是否總是能得到相同輸出 override def deterministic: Boolean = true // 初始化緩衝區 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = mutable.Map() } // 給聚合函式傳入一條新資料進行處理 //傳入欄位做字串反轉 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { evaluate(buffer,input) } // 合併聚合函式緩衝區 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { evaluate(buffer1,buffer2) } // 計算最終結果 override def evaluate(buffer: Row) = buffer.getMap[String,String](0) def evaluate(buffer1: MutableAggregationBuffer, buffer2: Row) = { // println("-----buffer1.size === "+buffer1.size) // println("-----buffer2.size === "+buffer2.size) val bufferdata = buffer1.getMap[String,String](0) val inputdata = buffer2.getMap[String,String](0) if(inputdata != null){ val keyset = bufferdata.keySet.toList.++:(inputdata.keySet.toList).distinct // println("-----keySet === "+bufferdata.keySet.toList.++:(inputdata.keySet.toList).distinct) // println("-----bufferdata|inputdata === "+bufferdata+"|"+inputdata) var finalMap:mutable.Map[String,String] =mutable.Map[String,String]() for (key <- keyset){ if(!key.equals("")){ // println("-----key:"+(key.equals(""))) // if(bufferdata.contains(key) && inputdata.contains(key) ) // println("-----bufferdata + inputdata === " + bufferdata.apply(key).toString.toDouble + inputdata.apply(key).toString.toDouble) // else if(bufferdata.contains(key)) // println("-----bufferdata === "+bufferdata.apply(key).toString.toDouble) // else if(inputdata.contains(key)) println("-----inputdata === "+inputdata.apply(key).toString.toDouble) val values = {if(bufferdata.contains(key) && inputdata.contains(key) ) bufferdata.apply(key).toString.toDouble + inputdata.apply(key).toString.toDouble else if(bufferdata.contains(key)) bufferdata.apply(key).toString.toDouble else if(inputdata.contains(key)) inputdata.apply(key).toString.toDouble } finalMap.put(key,values.toString) } } // println("-----finalMap.toMap === "+ finalMap.toMap) // println() buffer1.update(0,finalMap.toMap) } } }