shell 時間戳_pyspark之填充缺失的時間資料
技術標籤:shell 時間戳
這裡的場景是,原始資料有兩個表示時間的欄位:日期和小時,以及對應時間的資料值
(比如某個網站的訪問量,在凌晨一般沒有,白天有)。只有資料值不為0的時候才會記錄,
因此資料值為0的時間段是沒有的。但我們可能需要這些資料,因此就要用到填充功能。
下面會舉一個例子來說明。
首先匯入需要用到的包,這裡的pyspark版本是2.2.0,python版本是2.7。
import sys reload(sys) sys.setdefaultencoding('utf-8') from pyspark.sql import SparkSession, SQLContext import pyspark.sql.functions as F from pyspark.sql.types import * from pyspark.sql.window import Window
建立一個spark會話(如果使用的是shell,不需要此步驟):
spark = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
建立一個dataframe,有5列6行資料:
df = spark.createDataFrame( [(1, "a", 10, "2019-09-20", "1"), (2, "a", 20, "2019-09-20", "3"), (3, "a", 5, "2019-09-21", "5"), (4, "b", 8, "2019-09-20", "7"), (5, "b", 9, "2019-09-21", "9"), (6, "b", 16, "2019-09-21", "11")], ["id", "category", "num", "d", "h"])
原始資料表示時間的兩列是日期(d)和小時(h),將其轉換為時間戳,在此之前,先將h列轉換成整數型別:
df = df.withColumn("h", df["h"].cast(IntegerType()))
df = df.withColumn("time", F.from_unixtime((F.unix_timestamp(F.col("d"), "yyyy-MM-dd")+F.col("h")*3600)).cast("timestamp"))
看一看資料:
df.take(6)
輸出:
[Row(id=1, category=u'a', num=10, d=u'2019-09-20', h=1, time=datetime.datetime(2019, 9, 20, 9, 0)),
Row(id=2, category=u'a', num=20, d=u'2019-09-20', h=3, time=datetime.datetime(2019, 9, 20, 11, 0)),
Row(id=3, category=u'a', num=5, d=u'2019-09-21', h=5, time=datetime.datetime(2019, 9, 21, 13, 0)),
Row(id=4, category=u'b', num=8, d=u'2019-09-20', h=7, time=datetime.datetime(2019, 9, 20, 15, 0)),
Row(id=5, category=u'b', num=9, d=u'2019-09-21', h=9, time=datetime.datetime(2019, 9, 21, 17, 0)),
Row(id=6, category=u'b', num=16, d=u'2019-09-21', h=11, time=datetime.datetime(2019, 9, 21, 19, 0))]
可以發現,time列顯示的時間並不是我們希望得到的時間,不過沒關係,這個不影響我們資料的填充。
下面介紹兩種方法來填充資料。
第一種方法,根據資料中的最小時間和最大時間,生成這個時間段的所有時間資料,再和原始表做left outer join。
先獲取最小時間和最大時間
# 得到資料中的最小時間和最大時間,這裡得到的minp和maxp是(1568941200, 1569063600),可以用python程式碼轉換一下
minp, maxp = df.select(F.min("time").cast("long"), F.max("time").cast("long")).first()
# print(datetime.datetime.utcfromtimestamp(1568941200))
# 2019-09-20 01:00:00
# 結果和原始時間一樣!神奇不!
根據最小時間和最大時間,以小時為單位,生成這個時間段的所有時間資料:
# 時間間隔,這裡是以小時為單位,所以是60*60,即3600秒
step = 60 * 60
reference = spark.range((minp / step) * step, ((maxp / step) + 1) * step, step)
.select(F.col("id").cast("timestamp").alias("time"))
reference.take(3)
輸出:
[Row(time=datetime.datetime(2019, 9, 20, 9, 0)),
Row(time=datetime.datetime(2019, 9, 20, 10, 0)),
Row(time=datetime.datetime(2019, 9, 20, 11, 0))]
這裡有兩個category,a和b,假如我們希望對於每個category,都有完整的時間資料,要怎麼做呢?那就要用到笛卡爾積了:
# 我們希望對於每個category,都有每個時間段的資料,因此需要將時間與category做笛卡爾積
cate = dftest.select('category').distinct()
reference2 = cate.crossJoin(reference) # 笛卡爾積
笛卡爾積的結果就是所有我們需要的時間段資料,再將其與原始表做left outer join,就能得到我們想要的結果
df1 = reference2.join(df, ["category", "time"], "leftouter")
此時df1的前幾行是這樣的:
[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=1, num=10, d=u'2019-09-20', h=1),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=None, num=None, d=None, h=None),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=2, num=20, d=u'2019-09-20', h=3),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=None, num=None, d=None, h=None),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=None, num=None, d=None, h=None)]
發現填充的資料的id,num,d,h都是空的,那麼就需要補充這些資料的值了:
# 補id、num、d、h
df1 = df1.withColumn("d", F.to_date(F.col("time")).cast(StringType()))
df1 = df1.withColumn("h", F.hour(F.col("time")).cast(IntegerType()))
df1 = df1.fillna(0, subset=['num'])
df1 = df1.fillna(0, subset=['id'])
再來看看df1的前5行:
[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=1, num=10, d=u'2019-09-20', h=1),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=0, num=0, d=u'2019-09-20', h=2),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=2, num=20, d=u'2019-09-20', h=3),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=0, num=0, d=u'2019-09-20', h=4),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=0, num=0, d=u'2019-09-20', h=5)]
可以發現,轉換為d和h後,時間就變成我們想要的了,time和d、h看起來不是一個時間。。。
事實上,寫入hive表後,time也會變成我們想要的時間,這裡顯示的時間不準確,可能是有別的未可知原因。
用df1.count()檢視,有70列資料。可以想一下,為什麼是70行。
方法一需要生成所有category的所有時間段的資料,再和原始表join,在資料量很大的時候,效率比較低。
方法二則是生成原始表所沒有的資料,再和原始表做union。
方法二的主要思想是,通過每個category下time的排序,找出相鄰兩個time之間的缺失time,然後生成缺失time的資料。
首先針對每個category下的資料,得到該行資料對應的time的上一個已有的time:
# 同一個category的上一個時間
tempDf = df.withColumn('pre_time', F.lag(df['time']).over(Window.partitionBy("category").orderBy("time")))
得到此時間與上一個時間的時間差:
# 時間差
tempDf = tempDf.withColumn('diff', F.unix_timestamp(F.col("time"), "yyyy-MM-dd HH:mm:ss")
-F.unix_timestamp(F.col("pre_time"), "yyyy-MM-dd HH:mm:ss"))
這裡的時間差是以秒為單位的,當時間差為3600秒時,說明兩個時間之間沒有缺失的時間,大於3600秒時才有。
因此,要針對這部分資料找出缺失的時間:
fill_dates = F.udf(lambda x,z:[x-y for y in range(3600, z, 3600)], ArrayType(IntegerType()))
tempDf = tempDf.filter(F.col("diff") > 3600)
.withColumn("next_dates", fill_dates(F.unix_timestamp(F.col("time")), F.col("diff")))
這裡的fill_dates是一個udf函式,輸入當前時間x,以及時間差z,以3600秒為步長,
得到當前時間與上一個時間之間缺失的那些時間,即這裡的next_dates,它是一個list。
可以用explode函式將這個list拆分,得到多行資料:
tempDf = tempDf.withColumn("time", F.explode(F.col("next_dates")))
再做一些格式轉換,以及d和h的生成,num和id的補充:
tempDf = tempDf.withColumn("time", F.col("time").cast(TimestampType()))
.withColumn("d", F.to_date(F.col("time")).cast(StringType()))
.withColumn("h", F.hour(F.col("time")).cast(IntegerType()))
.withColumn("num", F.lit("0")).withColumn("id", F.lit("0"))
看兩行資料:
[Row(id=u'0', category=u'a', num=u'0', d=u'2019-09-20', h=2, time=datetime.datetime(2019, 9, 20, 10, 0),
pre_time=datetime.datetime(2019, 9, 20, 9, 0), diff=7200, next_dates=[1568944800]),
Row(id=u'0', category=u'a', num=u'0', d=u'2019-09-21', h=4, time=datetime.datetime(2019, 9, 21, 12, 0),
pre_time=datetime.datetime(2019, 9, 20, 11, 0), diff=93600, next_dates=[1569038400, 1569034800, 1569031200,
1569027600, 1569024000, 1569020400, 1569016800, 1569013200, 1569009600, 1569006000, 1569002400, 1568998800,
1568995200, 1568991600, 1568988000, 1568984400, 1568980800, 1568977200, 1568973600, 1568970000, 1568966400,
1568962800, 1568959200, 1568955600, 1568952000])]
next_dates是時間戳格式。
再將這個表和原始表union一下就好了,注意要drop不需要的列:
tempDf = tempDf.drop(*['next_dates', 'diff', 'pre_time'])
df2 = df.union(tempDf)
df2.orderBy('category', 'time').select('category', 'time','id','num','d','h').take(5)
輸出:
[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=u'1', num=u'10', d=u'2019-09-20', h=1),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=u'0', num=u'0', d=u'2019-09-20', h=2),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=u'2', num=u'20', d=u'2019-09-20', h=3),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=u'0', num=u'0', d=u'2019-09-20', h=4),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=u'0', num=u'0', d=u'2019-09-20', h=5)]
df2.count()後發現只有58行。
原來這裡會針對每個category有一個最大時間和最小時間,所以得到的結果數是比方法一少的。疏忽了!
如果想得到和方法一一樣的結果,可以這麼寫:
minp, maxp = df.select(F.min("time").cast("long"), F.max("time").cast("long")).first()
newRow = spark.createDataFrame([(minp,),(maxp,)], ["time"])
newRow = newRow.withColumn('time', F.col("time").cast("timestamp"))
cate = df.select('category').distinct()
newRow = cate.crossJoin(newRow) # 笛卡爾積
newRow.take(10)
先針對每個category,生成最小時間和最大時間的資料。這裡有兩個category,所以會有2*2=4行資料
輸出:
[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0)),
Row(category=u'b', time=datetime.datetime(2019, 9, 20, 9, 0)),
Row(category=u'a', time=datetime.datetime(2019, 9, 21, 19, 0)),
Row(category=u'b', time=datetime.datetime(2019, 9, 21, 19, 0))]
然後將生成的資料和原始表left join,得到其他欄位(id, num, d, h)的值,這是為了保證對於df中已有的資料,
newRow的相應行是一樣的,後續union的時候可以去掉重複資料:
newRow = newRow.join(df, ['category', 'time'], "left")
newdf = df.select('category', 'time', 'id', 'num', 'd', 'h').union(newRow.select('category', 'time', 'id', 'num', 'd', 'h'))
newdf = newdf.distinct()
newdf = newdf.fillna(0, subset=['num'])
newdf = newdf.fillna(0, subset=['id'])
newdf = newdf.withColumn("d", F.to_date(F.col("time")).cast(StringType()))
.withColumn("h", F.hour(F.col("time")).cast(IntegerType()))
newdf.take(10)
輸出:
[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=2, num=20, d=u'2019-09-20', h=3),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=1, num=10, d=u'2019-09-20', h=1),
Row(category=u'b', time=datetime.datetime(2019, 9, 20, 15, 0), id=4, num=8, d=u'2019-09-20', h=7),
Row(category=u'a', time=datetime.datetime(2019, 9, 21, 13, 0), id=3, num=5, d=u'2019-09-21', h=5),
Row(category=u'b', time=datetime.datetime(2019, 9, 21, 17, 0), id=5, num=9, d=u'2019-09-21', h=9),
Row(category=u'b', time=datetime.datetime(2019, 9, 21, 19, 0), id=6, num=16, d=u'2019-09-21', h=11),
Row(category=u'a', time=datetime.datetime(2019, 9, 21, 19, 0), id=0, num=0, d=u'2019-09-21', h=11),
Row(category=u'b', time=datetime.datetime(2019, 9, 20, 9, 0), id=0, num=0, d=u'2019-09-20', h=1)]
這樣,每個category下都有了最小時間和最大時間的資料了。
再用和之前一樣的方法:
fill_dates = F.udf(lambda x,z:[x-y for y in range(3600, z, 3600)], ArrayType(IntegerType()))
# 同一個category的上一個時間
tempDf = newdf.withColumn('pre_time', F.lag(newdf['time']).over(Window.partitionBy("category").orderBy("time")))
#時間差
tempDf = tempDf.withColumn('diff', F.unix_timestamp(F.col("time"), "yyyy-MM-dd HH:mm:ss")
-F.unix_timestamp(F.col("pre_time"), "yyyy-MM-dd HH:mm:ss"))
tempDf = tempDf.filter(F.col("diff") > 3600)
.withColumn("next_dates", fill_dates(F.unix_timestamp(F.col("time")), F.col("diff")))
.withColumn("time", F.explode(F.col("next_dates")))
.withColumn("time", F.col("time").cast(TimestampType()))
.withColumn("d", F.to_date(F.col("time")).cast(StringType()))
.withColumn("h", F.hour(F.col("time")).cast(IntegerType()))
.withColumn("num", F.lit("0")).withColumn("id", F.lit("0"))
tempDf = tempDf.drop(*['next_dates', 'diff', 'pre_time'])
df3 = newdf.select('category', 'time', 'id', 'num', 'd', 'h').union(tempDf.select('category', 'time', 'id', 'num', 'd', 'h'))
此時df3.count()就是70行啦!
如果我們想要計算每個category的每一個時間點的前後1小時這個時間段(一共3個小時)的平均num,就可以這麼做:
# 計算前後各1小時的平均num值,必須嚴格前後1小時
windowSpec = Window.partitionBy("category").orderBy("d", "h").rowsBetween(-1, 1)
df3 = df3.withColumn("movavg_sameday", F.avg("num").over(windowSpec))
.withColumn("movavg_sameday_data", F.collect_list("num").over(windowSpec))
df3.take(5)
注意這裡要partitionBy,也就是分割槽計算,不然會出現兩個category的時間混在一起被計算。
輸出:
[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=u'1', num=u'10', d=u'2019-09-20', h=1,
movavg_sameday=5.0, movavg_sameday_data=[u'10', u'0']),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=u'0', num=u'0', d=u'2019-09-20', h=2,
movavg_sameday=10.0, movavg_sameday_data=[u'10', u'0', u'20']),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=u'2', num=u'20', d=u'2019-09-20', h=3,
movavg_sameday=6.666666666666667, movavg_sameday_data=[u'0', u'20', u'0']),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=u'0', num=u'0', d=u'2019-09-20', h=4,
movavg_sameday=6.666666666666667, movavg_sameday_data=[u'20', u'0', u'0']),
Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=u'0', num=u'0', d=u'2019-09-20', h=5,
movavg_sameday=0.0, movavg_sameday_data=[u'0', u'0', u'0'])]
Window是一個很有用的函式,可以用於取想要的視窗資料。
上述程式碼中的rowsBetween是指從當前行算起(當前行是第0行),某兩行之間的視窗,比如這裡是-1和1,
也就是當前行的前一行和後一行之間的這三行。
還有一個方法是rangeBetween(x,y),是指當前行的某個欄位,比如這裡的num,取這個欄位某個區間的那些資料,
即num值處於[num+x, num+y]這個區間的那些行。
參考資料:
https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark
歡迎關注我的公眾號~