大資料下的實時熱點功能實現討論(實時流的TopN)
我司內部有個基於jstorm的實時流程式設計框架,文件裡有提到實時Topn,但是還沒有實現。。。。這是一個挺常見挺重要的功能,但仔細想想實現起來確實有難度。實時流的TopN其實離大家很近,比如下圖百度和微博的實時熱搜榜,還有各種資訊類的實時熱點,他們具體實現方式不清楚,甚至有可能是半小時離線跑出來的。今天不管他們怎麼實現的,我們討論下實時該怎麼實現(基於storm)。
加入我們現在有一些微博搜尋記錄,求某個時間視窗(以5分鐘為例)Topn其實就是對微博搜尋記錄做word count,然後排序擷取前N個。離線情況下可以這麼簡單的解決了,但在實時流資料下,你每個時刻都會有新資料流進來,當前時刻你拿到資料裡的topn在下一時刻就不一定對了。
一個時間視窗的TopN結果必須是建立在該時間視窗的全量資料上的才能保證100%的正確性,然而在實時流情況下,由於各種不確定性的因素,你很難在一個時間視窗內拿到上個時間視窗的資料。以5分鐘視窗為例,你想這個5分鐘內就求出上個5分鐘的熱點Topn,但是很可能正常情況下5分鐘只能拿到99%的資料,10分鐘拿到99.9%的資料,甚至30分鐘才能幾乎拿到100%的資料。如果資料來源異常,很可能幾個小時你都不大可能拿到某個5分鐘視窗內的全部資料。百度熱搜和微博熱搜都沒有說這些結果是哪個時間段的,就是想給自己留條後路,萬一內部出故障了,好幾個小時資料不更新,使用者也看不出,哈哈。
在現實情況下,這個5分鐘的熱點資料可能下個5分鐘就得展示給使用者,當然異常情況除外,那我們就可以退一步,稍微犧牲一點資料準確性來給出一個堪用的結果。這裡有個非常簡單可行的方案,實時流計算只做word count,然後把計算結果儲存起來後有個旁路程式掃結果資料,排序後擷取TopN,我估計好多人就是怎麼做的,架構如下。
這個解決方案優點很明顯,因為前面storm就是一個word count,簡單可靠。但是如果面對大量的TopN呼叫,旁路Topn程式就會成為效能瓶頸,因為要涉及到大量的資料查詢和排序,而且多一個系統,維護成本也變高了。如果我們想通過純storm的方式去解決Topn應該怎麼做?
在實時熱點topn計算過程中,整個計算包含word count和TopN兩部分。另外有幾個需要特別注意的地方,首先Topn是建立在全量資料上的,最後肯定只能由一個節點輸出TopN值,所以需要在前面的計算節點儘可能的減少對後面傳送的資料量。其次,像這種業務資料統計基本上都會出現資料熱點的問題。
Word count
先來看下word count的部分如何做,如何解決資料熱點,如何減少對後面的壓力,我直接上topo圖。
其實在bolt中加cache就可以大大減少發出去的訊息量,這裡我還有個step2的bolt,是因為我們在實踐中發現,如果多個bolt對hbase同可以key寫入,雖然可以通過hbase的Increment來保證資料的一直性,但在其過程中要對行加鎖,高併發的情況下寫入效能會受影響。所以可以先資料流隨機shuffle到step1,然後對流量做洩壓,然後按key fieldsGrouping到Step2,由step2中的bolt對hbase的資料做get add put的三步操作,沒有hbase的加鎖操作,grouping後也沒併發寫一個rowkey的問題。
TopN
如果我們已經有count好的<詞,詞頻>資料,其實我們並不需要全域性排序後擷取前n個來實現topn,其實用最小堆就可以大幅度減少TopN的時間複雜度。假設資料量為M,排序的時間複雜度可能到O(MlogM),用最小堆求TopN時間複雜度為O(MlogN),實際情況下N遠小於M,這個優化還是非常大的。在實時流TopN中我們也可以用最小堆做效能優化,topo圖如下。
在spout方法資料的時候做fieldsGrouping,然後step1中的每個bolt就會維護一部分資料的TopN最小堆,緩衝一段時間後把minHeap裡的資料全量發給finalTopN,finalTopn拿到資料後和自己的minHeap已合併就可以拿到正真的topn了。
最終實現
要實現實時熱點功能,其實講上面兩個 word count和topn的topo合併起來後就好了,最終的topo如下。
spout收到訊息後隨機shuffle,step1中的bolt講一部分統計結果寫入cache,待cache失效的時候按key fieldsGrouping到step2,這樣可以減少對發出的訊息量。 step2中的bolt不僅有個cache還有個minheap,cache中存的是每個key的wordcount,minheap其實是維護的改bolt拿到部分資料的topn。step2中cache資料會失效,失效的時候需要資料更新到hbase中,同時也更新minheap。step2中的minheap超時後,全量資料丟到finalTopn中,再由finalTopn彙總。
在最後一步finalTopn中,同一個key可能由step2多次傳下來,所以finalTopn更新其minheap的時候不能只是簡單的和根節點做對比,heap中有的話要更新其值。
其它問題
1. 各過程中cache和minheap的失效機制什麼設?
cache失效機制我有遇到過相關的例子,一般是資料量或是cache大小觸發的,其實這個是做成引數配置的,在不同的業務環境下可能有不同的適合配置。minheap只能以超時時間為觸發條件,超時事件設多少得看具體情況了。感覺都是超參,需要調。
2. 如何保證資料不丟?
storm的acker機制就可以保證資料at least once,就是保證資料不丟,但不保證資料不重複,如果真的需要exactly once,還是放棄jstorm吧,可以用flink試試。但我覺得其實在這種非資料強一致性的情況下,ack機制都不需要開,比較storm的ack還是要消耗一定效能的(有看過別人的資料,開啟acker要消耗10%以上效能,參考下)。 在我上圖topo設計下,如果叢集中有一臺機器宕了,cache裡的資料就全丟了,其實可能損失也不小,但是機器宕機畢竟還是個小概率事件。
3. 最終資料如何儘可能準確?
對一個時間視窗的維護時間越長,越可能拿到全量資料,結果就越準確。這個肯定的最後那個finalTopn的bolt來做了,資料量越大,finalTopnbolt 的挑戰也就越大,我覺得可以起多個bolt,按時間視窗Grouping,然後還得對minheap所有資料做持久化,還得支援持久化後的更新,可以對minheap序列化後放到hbase裡。