1. 程式人生 > >基於視窗的實時統計

基於視窗的實時統計

視窗統計

實時資料是無邊界的,即不斷地有資料輸入,但我們的統計一般是有時間範圍的,離線統計以年月日為統計週期,最小能到小時週期,如果是分鐘甚至秒級別計算,則可認為是實時計算,我們把實時資料流按時間段分割成一個個視窗,則可基於視窗進行資料統計。

我司開源Pike支援三種視窗,結合各種UDAF,通過SQL就能能實現各種聚合統計:

  • 跳動視窗
  • 滑動視窗
  • 累計視窗

跳動視窗

跳動視窗是最直觀,最簡單的介面,如下圖,t1->t2為一個視窗,t2->t3為一個視窗,各個視窗之間沒有重疊,實時統計結果也是基於各個視窗內的資料,結果輸出頻率等於統計週期。

跳動視窗比較常用,一般適用於統計無交叉分鐘級別實時流量。

pike example:

 //以5m為輸出頻率,統計客戶端每5分鐘各視訊播放uv,並按uv排序,獲取uv最高的top 100視訊
 withperiod 5m 
 select  top 100 output(dt(outputctx())) as dt,
 output('pc客戶端') as plt,
 mjoin(dim_channelid,'dim_sync_channel','Series_TitleChinese') as title,
 count(distinct userid, ipvalue) as UV
 from dol_client
 group by title
 order by uv desc

滑動視窗

滑動視窗相對跳動視窗稍複雜,主要在於相鄰視窗間有重疊,如下圖,t1->t3為視窗w1,t2->t4為視窗w2,w2與w1重疊t2->t3,此時w2相對w1滑動視窗為t1->t2,w1統計視窗為t1->t3, 統計視窗必須為滑動視窗的整數倍,即(t3-t1)%(t2-t1)=0。若統計視窗等於滑動視窗,則滑動視窗轉化為跳動視窗,因此,可認為跳動視窗是滑動視窗的特例。

滑動視窗主要適用於統計最近m時間內資料,輸出結果間隔為n, n <= m 且 m % n = 0。

pike example:

//以5分鐘為輸出頻率,同時統計最近5分鐘,最近10分鐘,最近4小時網頁端點直播uv和vv
withperiod 5m
select output(dt(outputctx())) as dt,
'ikan' as plt,
case when Dim_LiveOndemand_C=102 then '點播' else '直播' end as type,
count(distinct userid,ipvalue) as uv_5m,
count(distinct userid,ipvalue,channelid) as vv_5m,
move('10m',count(distinct userid,ipvalue)) as uv_10m,
move('10m',count(distinct userid,ipvalue,channelid)) as vv_10m,
move('4h',linearcount(10000000,userid,ipvalue)) as uv_4h,
move('4h',linearcountEx(100,userid,ipvalue,channelid)) as vv_4h,
from dol_ikan
group by plt,type

累計視窗

累計視窗則是累計一個時間段內資料不斷輸出,例如w1為從t1開始累計到t2的資料,w2為從t1開始累計到t3的資料,w3為從t1開始累計到t4的資料,w1、w2、w3共享初始狀態;在一個完整的累計週期內,完整累計週期必須為輸出頻率的整數倍,t1為初始狀態,t2輸出w1統計結果,t3輸出w2統計結果,t4輸出w3統計結果,t2-t1=t3-t2=t4-t3,(t3-t1)%(t2-t1)=0,(t4-t1)%(t2-t1)=0;下一個完整累計週期則清零為初始化狀態重新開始統計,例如w6,w3都是一個完整累計視窗,且w6,w3無交集,w6,w3之間如同跳動視窗。

累計視窗主要適用於獲取從整點或整天開始,累計到當前時間的統計資料,一般完整累計視窗與離線週期對應,但卻需要獲取當前時刻的實時統計資料,例如實時獲取當天累計vv、當前小時累計uv。

pike examlpe:

//以5分鐘為輸出頻率,實時統計移動端各視訊當前小時uv、vv以及當天累計uv、vv
withperiod 5m 
select  output(dt(outputctx())) as dt,
channelid,
count(1) as LogCount,
count(distinct userid, ipvalue) as UV,
count(distinct if(StrIsNullOrEmpty(vvid), userid + channelid, vvid)) as VV,
accumulate('1h', count(1)) as LogCount_ThisHour,
accumulate('1h', linearcount(10000000, userid, ipvalue)) as UV_ThisHour
accumulate('1h', linearcountEx(100, if(StrIsNullOrEmpty(vvid), userid + channelid, vvid))) as VV_ThisHour,
accumulate('1d', count(1)) as LogCount_ThisDay,
accumulate('1d', hyperloglogcount(5, userid, ipvalue)) as UV_ThisDay,
accumulate('1d', loglogadaptivecount(5, if(StrIsNullOrEmpty(vvid), userid + channelid, vvid))) as VV_ThisDay
from dol_smart
group by channelid