1. 程式人生 > >大資料真實案例:Spark在美團的實踐

大資料真實案例:Spark在美團的實踐

美團是資料驅動的網際網路服務,使用者每天在美團上的點選、瀏覽、下單支付行為都會產生海量的日誌,這些日誌資料將被彙總處理、分析、挖掘與學習,為美團的各種推薦、搜尋系統甚至公司戰略目標制定提供資料支援。大資料處理滲透到了美團各業務線的各種應用場景,選擇合適、高效的資料處理引擎能夠大大提高資料生產的效率,進而間接或直接提升相關團隊的工作效率。

美團最初的資料處理以Hive SQL為主,底層計算引擎為MapReduce,部分相對複雜的業務會由工程師編寫MapReduce程式實現。隨著業務的發展,單純的Hive SQL查詢或者MapReduce程式已經越來越難以滿足資料處理和分析的需求。

一方面,MapReduce計算模型對多輪迭代的DAG作業支援不給力,每輪迭代都需要將資料落盤,極大地影響了作業執行效率,另外只提供Map和Reduce這兩種計算因子,使得使用者在實現迭代式計算(比如:機器學習演算法)時成本高且效率低。

另一方面,在資料倉庫的按天生產中,由於某些原始日誌是半結構化或者非結構化資料,因此,對其進行清洗和轉換操作時,需要結合SQL查詢以及複雜的過程式邏輯處理,這部分工作之前是由Hive SQL結合Python指令碼來完成。這種方式存在效率問題,當資料量比較大的時候,流程的執行時間較長,這些ETL流程通常處於比較上游的位置,會直接影響到一系列下游的完成時間以及各種重要資料報表的生成。

基於以上原因,美團在2014年的時候引入了Spark。為了充分利用現有Hadoop叢集的資源,我們採用了Spark on Yarn模式,所有的Spark app以及MapReduce作業會通過Yarn統一排程執行。Spark在美團資料平臺架構中的位置如圖所示:

v2-33cfa78abd8b2df4e30f3bcf08a116cd_r

經過近兩年的推廣和發展,從最開始只有少數團隊嘗試用Spark解決資料處理、機器學習等問題,到現在已經覆蓋了美團各大業務線的各種應用場景。從上游的ETL生產,到下游的SQL查詢分析以及機器學習等,Spark正在逐步替代MapReduce作業,成為美團大資料處理的主流計算引擎。目前美團Hadoop叢集使用者每天提交的Spark作業數和MapReduce作業數比例為4:1,對於一些上游的Hive ETL流程,遷移到Spark之後,在相同的資源使用情況下,作業執行速度提升了十倍,極大地提升了業務方的生產效率。

下面我們將介紹Spark在美團的實踐,包括我們基於Spark所做的平臺化工作以及Spark在生產環境下的應用案例。其中包含Zeppelin結合的互動式開發平臺,也有使用Spark任務完成的ETL資料轉換工具,資料探勘組基於Spark開發了特徵平臺和資料探勘平臺,另外還有基於Spark的互動式使用者行為分析系統以及在SEM投放服務中的應用,以下是詳細介紹。

Spark互動式開發平臺

在推廣如何使用Spark的過程中,我們總結了使用者開發應用的主要需求:

  1. 資料調研:在正式開發程式之前,首先需要認識待處理的業務資料,包括:資料格式,型別(若以表結構儲存則對應到欄位型別)、儲存方式、有無髒資料,甚至分析根據業務邏輯實現是否可能存在資料傾斜等等。這個需求十分基礎且重要,只有對資料有充分的掌控,才能寫出高效的Spark程式碼;
  2. 程式碼除錯:業務的編碼實現很難保證一蹴而就,可能需要不斷地除錯;如果每次少量的修改,測試程式碼都需要經過編譯、打包、提交線上,會對使用者的開發效率影響是非常大的;
  3. 聯合開發:對於一整個業務的實現,一般會有多方的協作,這時候需要能有一個方便的程式碼和執行結果共享的途徑,用於分享各自的想法和試驗結論。

基於這些需求,我們調研了現有的開源系統,最終選擇了Apache的孵化專案Zeppelin,將其作為基於Spark的互動式開發平臺。Zeppelin整合了Spark,Markdown,Shell,Angular等引擎,集成了資料分析和視覺化等功能。

v2-42d5869627f8f5741c00b16e811d978c_b

我們在原生的Zeppelin上增加了使用者登陸認證、使用者行為日誌審計、許可權管理以及執行Spark作業資源隔離,打造了一個美團的Spark的互動式開發平臺,不同的使用者可以在該平臺上調研資料、除錯程式、共享程式碼和結論。

整合在Zeppelin的Spark提供了三種直譯器:Spark、Pyspark、SQL,分別適用於編寫Scala、Python、SQL程式碼。對於上述的資料調研需求,無論是程式設計之初,還是編碼實現過程中,當需要檢索資料資訊時,通過Zeppelin提供的SQL介面可以很便利的獲取到分析結果;另外,Zeppelin中Scala和Python直譯器自身的互動式特性滿足了使用者對Spark和Pyspark分步除錯的需求,同時由於Zeppelin可以直接連線線上叢集,因此可以滿足使用者對線上資料的讀寫處理請求;最後,Zeppelin使用Web Socket通訊,使用者只需要簡單地傳送要分享內容所在的http連結,所有接受者就可以同步感知程式碼修改,執行結果等,實現多個開發者協同工作。

Spark作業ETL模板

除了提供平臺化的工具以外,我們也會從其他方面來提高使用者的開發效率,比如將類似的需求進行封裝,提供一個統一的ETL模板,讓使用者可以很方便的使用Spark實現業務需求。

美團目前的資料生產主體是通過ETL將原始的日誌通過清洗、轉換等步驟後加載到Hive表中。而很多線上業務需要將Hive表裡面的資料以一定的規則組成鍵值對,匯入到Tair中,用於上層應用快速訪問。其中大部分的需求邏輯相同,即把Hive表中幾個指定欄位的值按一定的規則拼接成key值,另外幾個欄位的值以json字串的形式作為value值,最後將得到的對寫入Tair。

v2-0a119b77acc868edbd353589e6333093_b

由於Hive表中的資料量一般較大,使用單機程式讀取資料和寫入Tair效率比較低,因此部分業務方決定使用Spark來實現這套邏輯。最初由業務方的工程師各自用Spark程式實現從Hive讀資料,寫入到Tair中(以下簡稱hive2Tair流程),這種情況下存在如下問題:每個業務方都要自己實現一套邏輯類似的流程,產生大量重複的開發工作;由於Spark是分散式的計算引擎,因此程式碼實現和引數設定不當很容易對Tair叢集造成巨大壓力,影響Tair的正常服務。基於以上原因,我們開發了Spark版的hive2Tair流程,並將其封裝成一個標準的ETL模板,其格式和內容如下所示:v2-864ee6ac385a1f1bf43c4410ea7c74bf_b

source用於指定Hive表源資料,target指定目標Tair的庫和表,這兩個引數可以用於排程系統解析該ETL的上下游依賴關係,從而很方便地加入到現有的ETL生產體系中。

有了這個模板,使用者只需要填寫一些基本的資訊(包括Hive表來源,組成key的欄位列表,組成value的欄位列表,目標Tair叢集)即可生成一個hive2Tair的ETL流程。整個流程生成過程不需要任何Spark基礎,也不需要做任何的程式碼開發,極大地降低了使用者的使用門檻,避免了重複開發,提高了開發效率。該流程執行時會自動生成一個Spark作業,以相對保守的引數執行:預設開啟動態資源分配,每個Executor核數為2,記憶體2GB,最大Executor數設定為100。如果對於效能有很高的要求,並且申請的Tair叢集比較大,那麼可以使用一些調優引數來提升寫入的效能。目前我們僅對使用者暴露了設定Executor數量以及每個Executor記憶體的介面,並且設定了一個相對安全的最大值規定,避免由於引數設定不合理給Hadoop叢集以及Tair叢集造成異常壓力。

基於Spark的使用者特徵平臺

在沒有特徵平臺之前,各個資料探勘人員按照各自專案的需求提取使用者特徵資料,主要是通過美團的ETL排程平臺按月/天來完成資料的提取。

但從使用者特徵來看,其實會有很多的重複工作,不同的專案需要的使用者特徵其實有很多是一樣的,為了減少冗餘的提取工作,也為了節省計算資源,建立特徵平臺的需求隨之誕生,特徵平臺只需要聚合各個開發人員已經提取的特徵資料,並提供給其他人使用。特徵平臺主要使用Spark的批處理功能來完成資料的提取和聚合。
開發人員提取特徵主要還是通過ETL來完成,有些資料使用Spark來處理,比如使用者搜尋關鍵詞的統計。
開發人員提供的特徵資料,需要按照平臺提供的配置檔案格式新增到特徵庫,比如在圖團購的配置檔案中,團購業務中有一個使用者24小時時段支付的次數特徵,輸入就是一個生成好的特徵表,開發人員通過測試驗證無誤之後,即完成了資料上線;另外對於有些特徵,只需要從現有的表中提取部分特徵資料,開發人員也只需要簡單的配置即可完成。

v2-ad7464e5c9a581568f310ff56d442945_b

在圖中,我們可以看到特徵聚合分兩層,第一層是各個業務資料內部聚合,比如團購的資料配置檔案中會有很多的團購特徵、購買、瀏覽等分散在不同的表中,每個業務都會有獨立的Spark任務來完成聚合,構成一個使用者團購特徵表;特徵聚合是一個典型的join任務,對比MapReduce效能提升了10倍左右。第二層是把各個業務表資料再進行一次聚合,生成最終的使用者特徵資料表。特徵庫中的特徵是視覺化的,我們在聚合特徵時就會統計特徵覆蓋的人數,特徵的最大最小數值等,然後同步到RDB,這樣管理人員和開發者都能通過視覺化來直觀地瞭解特徵。 另外,我們還提供特徵監測和告警,使用最近7天的特徵統計資料,對比各個特徵昨天和今天的覆蓋人數,是增多了還是減少了,比如性別為女這個特徵的覆蓋人數,如果發現今天的覆蓋人數比昨天低了1%(比如昨天6億使用者,女性2億,那麼人數降低了1%*2億=2百萬)突然減少2萬女性使用者說明資料出現了極大的異常,何況網站的使用者數每天都是增長的。這些異常都會通過郵件傳送到平臺和特徵提取的相關人。

資料探勘平臺是完全依賴於使用者特徵庫的,通過特徵庫提供使用者特徵,資料探勘平臺對特徵進行轉換並統一格式輸出,就此開發人員可以快速完成模型的開發和迭代,之前需要兩週開發一個模型,現在短則需要幾個小時,多則幾天就能完成。特徵的轉換包括特徵名稱的編碼,也包括特徵值的平滑和歸一化,平臺也提供特徵離散化和特徵選擇的功能,這些都是使用Spark離線完成。

開發人員拿到訓練樣本之後,可以使用Spark mllib或者Python sklearn等完成模型訓練,得到最優化模型之後,將模型儲存為平臺定義好的模型儲存格式,並提供相關配置引數,通過平臺即可完成模型上線,模型可以按天或者按周進行排程。當然如果模型需要重新訓練或者其它調整,那麼開發者還可以把模型下線。不只如此,平臺還提供了一個模型準確率告警的功能,每次模型在預測完成之後,會計算使用者提供的樣本中預測的準確率,並比較開發者提供的準確率告警閾值,如果低於閾值則發郵件通知開發者,是否需要對模型重新訓練。

在開發挖掘平臺的模型預測功時能我們走了點彎路,平臺的模型預測功能開始是相容Spark介面的,也就是使用Spark儲存和載入模型檔案並預測,使用過的人知道Spark mllib的很多API都是私有的開發人員無法直接使用,所以我們這些介面進行封裝然後再提供給開發者使用,但也只解決了Spark開發人員的問題,平臺還需要相容其他平臺的模型輸出和載入以及預測的功能,這讓我們面臨必需維護一個模型多個介面的問題,開發和維護成本都較高,最後還是放棄了相容Spark介面的實現方式,我們自己定義了模型的儲存格式,以及模型載入和模型預測的功能。

v2-8c85cfdef52604509efed464e4fdd62a_b

以上內容介紹了美團基於Spark所做的平臺化工作,這些平臺和工具是面向全公司所有業務線服務的,旨在避免各團隊做無意義的重複性工作,以及提高公司整體的資料生產效率。目前看來效果是比較好的,這些平臺和工具在公司內部得到了廣泛的認可和應用,當然也有不少的建議,推動我們持續地優化。
隨著Spark的發展和推廣,從上游的ETL到下游的日常資料統計分析、推薦和搜尋系統,越來越多的業務線開始嘗試使用Spark進行各種複雜的資料處理和分析工作。下面將以Spark在互動式使用者行為分析系統以及SEM投放服務為例,介紹Spark在美團實際業務生產環境下的應用。

Spark在互動式使用者行為分析系統中的實踐

美團的互動式使用者行為分析系統,用於提供對海量的流量資料進行互動式分析的功能,系統的主要使用者為公司內部的PM和運營人員。普通的BI類報表系統,只能夠提供對聚合後的指標進行查詢,比如PV、UV等相關指標。但是PM以及運營人員除了檢視一些聚合指標以外,還需要根據自己的需求去分析某一類使用者的流量資料,進而瞭解各種使用者群體在App上的行為軌跡。根據這些資料,PM可以優化產品設計,運營人員可以為自己的運營工作提供資料支援,使用者核心的幾個訴求包括:

  1. 自助查詢,不同的PM或運營人員可能隨時需要執行各種各樣的分析功能,因此係統需要支援使用者自助使用。
  2. 響應速度,大部分分析功能都必須在幾分鐘內完成。
  3. 視覺化,可以通過視覺化的方式檢視分析結果。

要解決上面的幾個問題,技術人員需要解決以下兩個核心問題:

  1. 海量資料的處理,使用者的流量資料全部儲存在Hive中,資料量非常龐大,每天的資料量都在數十億的規模。
  2. 快速計算結果,系統需要能夠隨時接收使用者提交的分析任務,並在幾分鐘之內計算出他們想要的結果。

要解決上面兩個問題,目前可供選擇的技術主要有兩種:MapReduce和Spark。在初期架構中選擇了使用MapReduce這種較為成熟的技術,但是通過測試發現,基於MapReduce開發的複雜分析任務需要數小時才能完成,這會造成極差的使用者體驗,使用者無法接受。

因此我們嘗試使用Spark這種記憶體式的快速大資料計算引擎作為系統架構中的核心部分,主要使用了Spark Core以及Spark SQL兩個元件,來實現各種複雜的業務邏輯。實踐中發現,雖然Spark的效能非常優秀,但是在目前的發展階段中,還是或多或少會有一些效能以及OOM方面的問題。因此在專案的開發過程中,對大量Spark作業進行了各種各樣的效能調優,包括運算元調優、引數調優、shuffle調優以及資料傾斜調優等,最終實現了所有Spark作業的執行時間都在數分鐘左右。並且在實踐中解決了一些shuffle以及資料傾斜導致的OOM問題,保證了系統的穩定性。

結合上述分析,最終的系統架構與工作流程如下所示:

  1. 使用者在系統介面中選擇某個分析功能對應的選單,並進入對應的任務建立介面,然後選擇篩選條件和任務引數,並提交任務。
  2. 由於系統需要滿足不同類別的使用者行為分析功能(目前系統中已經提供了十個以上分析功能),因此需要為每一種分析功能都開發一個Spark作業。
  3. 採用J2EE技術開發了Web服務作為後臺系統,在接收到使用者提交的任務之後,根據任務型別選擇其對應的Spark作業,啟動一條子執行緒來執行Spark-submit命令以提交Spark作業。
  4. Spark作業執行在Yarn叢集上,並針對Hive中的海量資料進行計算,最終將計算結果寫入資料庫中。
  5. 使用者通過系統介面檢視任務分析結果,J2EE系統負責將資料庫中的計算結果返回給介面進行展現。

v2-3175f11d79e2d5f840776336c071d198_b

該系統上線後效果良好:90%的Spark作業執行時間都在5分鐘以內,剩下10%的Spark作業執行時間在30分鐘左右,該速度足以快速響應使用者的分析需求。通過反饋來看,使用者體驗非常良好。目前每個月該系統都要執行數百個使用者行為分析任務,有效並且快速地支援了PM和運營人員的各種分析需求。

Spark在SEM投放服務中的應用

流量技術組負責著美團站外廣告的投放技術,目前在SEM、SEO、DSP等多種業務中大量使用了Spark平臺,包括離線挖掘、模型訓練、流資料處理等。美團SEM(搜尋引擎營銷)投放著上億的關鍵詞,一個關鍵詞從被挖掘策略發現開始,就踏上了精彩的SEM之旅。它經過預估模型的篩選,投放到各大搜索引擎,可能因為市場競爭頻繁調價,也可能因為效果不佳被迫下線。而這樣的旅行,在美團每分鐘都在發生。如此大規模的隨機“遷徙”能夠順利進行,Spark功不可沒。

v2-aa5367b691a88ec57d71e9a222edbf12_b

Spark不止用於美團SEM的關鍵詞挖掘、預估模型訓練、投放效果統計等大家能想到的場景,還罕見地用於關鍵詞的投放服務,這也是本段介紹的重點。一個快速穩定的投放系統是精準營銷的基礎。

美團早期的SEM投放服務採用的是單機版架構,隨著關鍵詞數量的極速增長,舊有服務存在的問題逐漸暴露。受限於各大搜索引擎API的配額(請求頻次)、賬戶結構等規則,投放服務只負責處理API請求是遠遠不夠的,還需要處理大量業務邏輯。單機程式在小資料量的情況下還能通過多程序勉強應對,但對於如此大規模的投放需求,就很難做到“兼顧全域性”了。

新版SEM投放服務在15年Q2上線,內部開發代號為Medusa。在Spark平臺上搭建的Medusa,全面發揮了Spark大資料處理的優勢,提供了高效能高可用的分散式SEM投放服務,具有以下幾個特性:

  1. 低門檻,Medusa整體架構的設計思路是提供資料庫一樣的服務。在介面層,讓RD可以像操作本地資料庫一樣,通過SQL來“增刪改查”線上關鍵詞表,並且只需要關心自己的策略標籤,不需要關注關鍵詞的物理儲存位置。Medusa利用Spark SQL作為服務的介面,提高了服務的易用性,也規範了資料儲存,可同時對其他服務提供資料支援。基於Spark開發分散式投放系統,還可以讓RD從系統層細節中解放出來,全部程式碼只有400行。
  2. 高效能、可伸縮,為了達到投放的“時間”、“空間”最優化,Medusa利用Spark預計算出每一個關鍵詞在遠端賬戶中的最佳儲存位置,每一次API請求的最佳時間內容。在配額和賬號容量有限的情況下,輕鬆掌控著億級的線上關鍵詞投放。通過控制Executor數量實現了投放效能的可擴充套件,並在實戰中做到了全渠道4小時全量回滾。
  3. 高可用,有的同學或許會有疑問:API請求適合放到Spark中做嗎?因為函數語言程式設計要求函式是沒有副作用的純函式(輸入是確定的,輸出就是確定的)。這確實是一個問題,Medusa的思路是把請求API封裝成獨立的模組,讓模組儘量做到“純函式”的無副作用特性,並參考面向軌道程式設計的思路,將全部請求log重新返回給Spark繼續處理,最終落到Hive,以此保證投放的成功率。為了更精準的控制配額消耗,Medusa沒有引入單次請求重試機制,並制定了服務降級方案,以極低的資料丟失率,完整地記錄了每一個關鍵詞的旅行。

結論和展望

本文我們介紹了美團引入Spark的起源,基於Spark所做的一些平臺化工作,以及Spark在美團具體應用場景下的實踐。總體而言,Spark由於其靈活的程式設計介面、高效的記憶體計算,能夠適用於大部分資料處理場景。在推廣和使用Spark的過程中,我們踩過不少坑,也遇到過很多問題,但填坑和解決問題的過程,讓我們對Spark有了更深入的理解,我們也期待著Spark在更多的應用場景中發揮重要的作用。