1. 程式人生 > 其它 >美團點評基於Storm的實時資料處理實踐

美團點評基於Storm的實時資料處理實踐

背景

目前美團點評已累計了豐富的線上交易與使用者行為資料,為商家賦能需要我們有更強大的專業化資料加工能力,來幫助商家做出正確的決策從而提高使用者體驗。目前商家端產品在資料應用上主要基於離線資料加工,資料生產排程以“T+1”為主,伴隨著越來越深入的精細化運營,實時資料應用訴求逾加強烈。

本文將從目前主流實時資料處理引擎的特點和我們面臨的問題出發,簡單的介紹一下我們是如何搭建實時資料處理系統。

設計框架

目前比較流行的實時處理引擎有 Storm,Spark Streaming,Flink。每個引擎都有各自的特點和應用場景。 下表是對這三個引擎的簡單對比:

(表1)

考慮到每個引擎的特點、商家端應用的特點和系統的高可用性,我們最終選擇了 Storm 作為本系統的實時處理引擎。

面臨的問題

  1. 資料量的不穩定性,導致對機器需求的不確定性。使用者的行為資料會受到時間的影響,比如半夜時刻和用餐高峰時段每分鐘產生的資料量有兩個數量級的差異。
  2. 上游資料質量的不確定性。
  3. 資料計算時,資料的落地點應該放到哪裡來保證計算的高效性。
  4. 如何保證資料在多執行緒處理時資料計算的正確性。
  5. 計算好的資料以什麼樣的方式提供給應用方。

具體的實施方案

實時攝入資料完整性保障

資料完整性保證層:如何保證資料攝入到計算引擎的完整性呢?正如表1比較的那樣,Storm 框架的語義為 At Least Once,至少攝入一次。這個語義的存在正好保證了資料的完整性,所以只需要根據自己的需求編寫 Spout 即可。好訊息是我們的技術團隊已經開發好了一個滿足大多數需求的 Spout,可以直接拿來使用。特別需要注意的一點,在資料處理的過程中需要我們自己來剔除已經處理過的資料,因為 Storm 的語義會可能導致同一條資料攝入兩次。灰度釋出期間(一週)對資料完整性進行驗證,資料完整性為100%。

實時資料平滑處理

資料預測層:實時的資料預測可以幫助我們對到達的資料進行有效的平滑,從而可以減少在某一時刻對叢集的壓力。 在資料預測方面,我們採用了在數學上比較簡單的多元線性迴歸模型(如果此模型不滿足業務需求,可以選用一些更高級別的預測模型),預測下一分鐘可能到來的資料的量。在資料延遲可接受的範圍內,對資料進行平滑,並完成對資料的計算。通過對該方案的使用,減輕了對叢集約33%的壓力。具體步驟如下:

  • 步驟一:將多個業務的實時資料進行抽象化,轉換為(Y_i,X_1i,X_2i,X_3i,... ,X_ni),其中Y_i為在(X_1i...X_ni)屬性下的資料量,(X_1i...X_ni)為n個不同的屬性,比如時間、業務、使用者的性別等等。
  • 步驟二:因為考慮到實時資料的特殊性,不同業務的資料量隨時間變數基本呈現為M走勢,所以為了將非線性走勢轉換為線性走勢,可以將時間段分為4部分,保證在每個時間段內資料的走勢為線性走勢。同理,如果其他的屬性使得走勢變為非線性,也可以分段分析。
  • 步驟三:將抽象好的資料代入到多元線性迴歸模型中,其方程組形式為:

通過對該模型的求解方式求得估計引數,最後得多元線性迴歸方程。

  • 步驟四:資料預測完之後通過控制對資料的處理速度,保證在規定的時間內完成對規定資料的計算,減輕對叢集的壓力。

實時資料計算策略

策略層:Key/Value 模式更適應於實時資料模型,不管是在儲存還是計算方面。Cellar(我們內部基於阿里開源的Tair研發的公共KV儲存)作為一個分散式的 Key/Value 結構資料的解決方案,可以做到幾乎無延遲的進行 IO 操作,並且可以支援高達千萬級別的 QPS,更重要的是 Cellar 支援很多原子操作,運用在實時資料計算上是一個不錯的選擇。所以作為資料的落腳點,本系統選擇了Cellar。

但是在資料計算的過程中會遇到一些問題,比如說統計截止到當前時刻入住旅館的男女比例是多少?很容易就會想到,從 Cellar 中取出截止到當前時刻入住的男生是多少,女生是多少,然後做一個比值就 OK 了。但是本系統是在多執行緒的環境執行的,如果該時刻有兩對夫婦入住了,產生了兩筆訂單,恰好這兩筆訂單被兩個執行緒所處理,當執行緒A將該男士計算到結果中,正要打算將該女士計算到結果中的時候,執行緒B已經計算完結果了,那麼執行緒B計算出的結果就是2/1,那就出錯啦。

所以為了保證資料在多執行緒處理時資料計算的正確性,我們需要用到分散式鎖。實現分散式鎖的方式有很多,本文就不贅述了。這裡給大家介紹一種更簡單快捷的方法。Cellar 中有個 setNx 函式,該函式是原子的,並且是(Set If Not Exists),所以用該函式鎖住關鍵的欄位就可以。就上面的例子而言,我們可以鎖住該旅館的唯一 ID 欄位,計算完之後 delete 該鎖,這樣就可以保證了計算的正確性。

另外一個重要的問題是 Cellar 不支援事務,就會導致該計算系統在升級或者重啟時會造成少量資料的不準確。為了解決該問題,運用到一種 getset 原子思想的方法。如下:

public void doSomeWork(String input) {
    cellar.mapPut("uniq_ID");
    cellar.add("uniq_ID_1","some data");
    cellar.add("uniq_ID_2","some data again");
    ....
    cellar.mapRemove("uniq_ID");
}

如果上述程式碼執行到[2..5]某一行時系統重啟了,導致後續的操作並沒有完成,如何將沒有完成的操作新增上去呢?如下:

public void remedySomething() {
    map = cellar.mapGetAll();
    version = cellar.mapGet("uniq_ID").getVersion();    for (string str : map) {        if (cellar.get(str + "_1").getVersion()!= version) {
            cellar.add(str + "_1", "some data");
            cellar.mapRemove(str);
        }
        .......
    }
}

正如程式碼裡那樣,會有一個容器記錄了哪些資料正在被操作,當系統重啟的時候,從該容器取出上次未執行完的資料,用 Version(版本號)來記錄哪些操作還沒有完成,將沒有完成的操作補上,這樣就可以保證了計算結果的準確性。起初 Version(版本號)被設計出來解決的問題是防止由於資料的併發更新導致的問題。

比如,系統有一個 value 為“a,b,c”,A和B同時get到這個 value。A執行操作,在後面新增一個d,value 為 “a,b,c,d”。B執行操作新增一個e,value為”a,b,c,e”。如果不加控制,無論A和B誰先更新成功,它的更新都會被後到的更新覆蓋。Tair 無法解決這個問題,但是引入了version 機制避免這樣的問題。還是拿剛才的例子,A和B取到資料,假設版本號為10,A先更新,更新成 功後,value 為”a,b,c,d”,與此同時,版本號會變為11。當B更新時,由於其基於的版本號是10,伺服器會拒絕更新,從而避免A的更新被覆蓋。B可以選擇 get 新版本的 value,然後在其基礎上修改,也可以選擇強行更新。

將 Version 運用到事務的解決上也算是一種新型的使用。為驗證該功能的正確性,灰度釋出期間每天不同時段對專案進行殺死並重啟,並對資料正確性進行校驗,資料的正確性為100%。

實時資料儲存

為了契合更多的需求,將資料分為三部分儲存。

Kafka:儲存稍加工之後的明細資料,方便做更多的擴充套件。

MySQL:儲存中間的計算結果資料,方便計算過程的視覺化。

Cellar:儲存最終的結果資料,供應用層直接查詢使用。

應用案例

1. 美團開店寶的實時經營資料卡片

美團開店寶作為美團商家的客戶端,支援著眾多餐飲商家的輔助經營,而經營資料的實時性對影響商家決策尤為重要。該功能上線之後受到了商家的熱烈歡迎。卡片展示如下圖:

2. 美團點評金融合作門店的實時熱度標籤

該功能用於與美團點評金融合作商家增加支付標籤,用以突出這些商家,增加營銷點。另一方面為優質商家吸引更多流量,為平臺帶來更多收益。展示如下圖:

總結與展望

以上就是該系統的設計框架與思路,並且部分功能已應用到系統中。為了商家更好的決策,使用者更好的體驗,在業務不斷增長的情況下,對實時資料的分析就需要做到更全面。所以實時資料分析還有很多東西可以去做。

老生常談的大資料 4V+1O 特徵,即資料量大(Volume)、型別繁多(Variety)、價值密度低(Value)、速度快時效性高(Velocity)、資料線上(Online),相比離線資料系統,對實時資料的計算和應用挑戰尤其艱鉅。在技術框架演進層面,對流式資料進行高度抽象,簡化開發流程;在應用端,我們後續希望在資料大屏、使用者行為分析產品、營銷效果跟蹤等 DW/BI 產品進行持續應用,通過加快資料流轉的速度,更好的發揮資料價值。

參考