基於視窗的實時統計
視窗統計
實時資料是無邊界的,即不斷地有資料輸入,但我們的統計一般是有時間範圍的,離線統計以年月日為統計週期,最小能到小時週期,如果是分鐘甚至秒級別計算,則可認為是實時計算,我們把實時資料流按時間段分割成一個個視窗,則可基於視窗進行資料統計。
我司開源Pike支援三種視窗,結合各種UDAF,通過SQL就能能實現各種聚合統計:
- 跳動視窗
- 滑動視窗
- 累計視窗
跳動視窗
跳動視窗是最直觀,最簡單的介面,如下圖,t1->t2為一個視窗,t2->t3為一個視窗,各個視窗之間沒有重疊,實時統計結果也是基於各個視窗內的資料,結果輸出頻率等於統計週期。
跳動視窗比較常用,一般適用於統計無交叉分鐘級別實時流量。
pike example:
//以5m為輸出頻率,統計客戶端每5分鐘各視訊播放uv,並按uv排序,獲取uv最高的top 100視訊 withperiod 5m select top 100 output(dt(outputctx())) as dt, output('pc客戶端') as plt, mjoin(dim_channelid,'dim_sync_channel','Series_TitleChinese') as title, count(distinct userid, ipvalue) as UV from dol_client group by title order by uv desc
滑動視窗
滑動視窗相對跳動視窗稍複雜,主要在於相鄰視窗間有重疊,如下圖,t1->t3為視窗w1,t2->t4為視窗w2,w2與w1重疊t2->t3,此時w2相對w1滑動視窗為t1->t2,w1統計視窗為t1->t3, 統計視窗必須為滑動視窗的整數倍,即(t3-t1)%(t2-t1)=0。若統計視窗等於滑動視窗,則滑動視窗轉化為跳動視窗,因此,可認為跳動視窗是滑動視窗的特例。
滑動視窗主要適用於統計最近m時間內資料,輸出結果間隔為n, n <= m 且 m % n = 0。
pike example:
//以5分鐘為輸出頻率,同時統計最近5分鐘,最近10分鐘,最近4小時網頁端點直播uv和vv withperiod 5m select output(dt(outputctx())) as dt, 'ikan' as plt, case when Dim_LiveOndemand_C=102 then '點播' else '直播' end as type, count(distinct userid,ipvalue) as uv_5m, count(distinct userid,ipvalue,channelid) as vv_5m, move('10m',count(distinct userid,ipvalue)) as uv_10m, move('10m',count(distinct userid,ipvalue,channelid)) as vv_10m, move('4h',linearcount(10000000,userid,ipvalue)) as uv_4h, move('4h',linearcountEx(100,userid,ipvalue,channelid)) as vv_4h, from dol_ikan group by plt,type
累計視窗
累計視窗則是累計一個時間段內資料不斷輸出,例如w1為從t1開始累計到t2的資料,w2為從t1開始累計到t3的資料,w3為從t1開始累計到t4的資料,w1、w2、w3共享初始狀態;在一個完整的累計週期內,完整累計週期必須為輸出頻率的整數倍,t1為初始狀態,t2輸出w1統計結果,t3輸出w2統計結果,t4輸出w3統計結果,t2-t1=t3-t2=t4-t3,(t3-t1)%(t2-t1)=0,(t4-t1)%(t2-t1)=0;下一個完整累計週期則清零為初始化狀態重新開始統計,例如w6,w3都是一個完整累計視窗,且w6,w3無交集,w6,w3之間如同跳動視窗。
累計視窗主要適用於獲取從整點或整天開始,累計到當前時間的統計資料,一般完整累計視窗與離線週期對應,但卻需要獲取當前時刻的實時統計資料,例如實時獲取當天累計vv、當前小時累計uv。
pike examlpe:
//以5分鐘為輸出頻率,實時統計移動端各視訊當前小時uv、vv以及當天累計uv、vv
withperiod 5m
select output(dt(outputctx())) as dt,
channelid,
count(1) as LogCount,
count(distinct userid, ipvalue) as UV,
count(distinct if(StrIsNullOrEmpty(vvid), userid + channelid, vvid)) as VV,
accumulate('1h', count(1)) as LogCount_ThisHour,
accumulate('1h', linearcount(10000000, userid, ipvalue)) as UV_ThisHour
accumulate('1h', linearcountEx(100, if(StrIsNullOrEmpty(vvid), userid + channelid, vvid))) as VV_ThisHour,
accumulate('1d', count(1)) as LogCount_ThisDay,
accumulate('1d', hyperloglogcount(5, userid, ipvalue)) as UV_ThisDay,
accumulate('1d', loglogadaptivecount(5, if(StrIsNullOrEmpty(vvid), userid + channelid, vvid))) as VV_ThisDay
from dol_smart
group by channelid