1. 程式人生 > 其它 >MongoDB + Spark: 完整的大資料解決方案

MongoDB + Spark: 完整的大資料解決方案

Spark介紹

按照官方的定義,Spark 是一個通用,快速,適用於大規模資料的處理引擎。

  • 通用性:我們可以使用Spark SQL來執行常規分析, Spark Streaming 來來做流資料處理, 以及用Mlib來執行機器學習等。Java,python,scala及R語言的支援也是其通用性的表現之一。
  • 快速: 這個可能是Spark成功的最初原因之一,主要歸功於其基於記憶體的運算方式。當資料的處理過程需要反覆迭代時,Spark可以直接在記憶體中暫存資料,而無需像MapReduce一樣需要把資料寫回磁碟。官方的資料表明:它可以比傳統的MapReduce快上100倍。
  • 大規模:原生支援HDFS,並且其計算節點支援彈性擴充套件,利用大量廉價計算資源併發的特點來支援大規模資料處理。

我們能用它做什麼

那我們能用Spark來做什麼呢? 場景數不勝數。

最簡單的可以只是統計一下某一個頁面多少點選量,複雜的可以通過機器學習來預測趨勢。

個性化 是一個常見的案例,比如說,Yahoo的網站首頁使用Spark來實現快速的使用者興趣分析。應該在首頁顯示什麼新聞?原始的做法是讓使用者選擇分類,聰明的做法就是在使用者互動的過程中揣摩使用者可能喜歡的文章;另一方面就是要在新聞進來時候進行分析並確定什麼樣的使用者是可能的受眾。新聞的時效性非常高,按照常規的MapReduce做法,對於Yahoo幾億使用者及海量的文章,可能需要計算一天才能得出所有結果。Spark的高效運算可以在這裡得到充分的運用,來保證新聞推薦在數十分鐘或更短時間內完成。另外,美國最大的有線電視商Comcast用它來做節目推薦,最近剛和滴滴聯姻的Uber用它做實時訂單分析,優酷則在Spark上實現了商業智慧的升級版

Spark生態系統

在我們開始談MongoDB 和Spark 之前,我們首先來了解一下Spark的生態系統。 Spark 作為一個大型分散式計算框架,需要和其他元件一起協同工作。

在Hdaoop裡面,HDFS作為一個數據層位於其核心部位。

Spark是Hadoop生態系統的一顆新星,原生就支援HDFS。大家知道HDFS是用來管理大規模非結構化資料的儲存系統,具有高可用和巨大的橫向擴充套件能力。

而作為一個橫向擴充套件的分散式叢集,資源管理是其核心必備的能力,Spark 可以通過YARN或者MESOS來負責資源(CPU)分配和任務排程。如果你不需要Spark管理節點的高可用,你也可以直接使用Spark standalone。

在有了資料層和資源管理層後, 接下來就是我們真正的計算引擎了。

Hadoop技術的兩大基石之一的MapReduce就是用來實現叢集大規模平行計算。而現在就多了一個選項:Spark。 MapReduce的特點是,用4個字來概括,簡單粗暴。採用divide & conquer戰術,我們可以用MapReduce來處理PB級的資料。 而Spark 作為打了雞血的MapReduce增強版,利用了記憶體價格大量下降的時代因素,充分把計算所用變數和中間結果放到記憶體裡,並且提供了一整套機器學習的分析演算法,在加上很多語言的支援,使之成為一個較之於MapReduce更加優秀的選擇。

由於MapReduce 是一個相對並不直觀的程式介面,所以為了方便使用,一系列的高層介面如Hive或者Pig應運而生。 Hive可以讓我們使用非常熟悉的SQL語句的方式來做一些常見的統計分析工作。同理,在Spark 引擎層也有類似的封裝,如Spark SQL、 RDD以及2.0版本新推出的Dataframe等。

所以一個完整的大資料解決方案,包含了儲存,資源管理,計算引擎及介面層。 那麼問題來了:我們畫了這麼大這麼圓的大餅,MongoDB可以吃哪一塊呢?

大家可以想象,MongoDB是個什麼?是個database。 所以自然而然,MongoDB可以擔任的角色,就是資料儲存的這一部分。在和 Spark一起使用的時候,MongoDB就可以扮演HDFS的角色來為Spark提供計算的原始資料,以及用來持久化分析計算的結果。

HDFS vs. MongoDB

既然我們說MongoDB可以用在HDFS的地方,那我們來詳細看看兩者之間的差異性。

在說區別之前,其實我們可以先來注意一下兩者的共同點。HDFS和MongoDB都是基於廉價x86伺服器的橫向擴充套件架構,都能支援到TB到PB級的資料量。資料會在多節點自動備份,來保證資料的高可用和冗餘。兩者都支援非結構化資料的儲存,等等。

但是,HDFS和MongoDB更多的是差異點:

  • 如在儲存方式上 HDFS的儲存是以檔案為單位,每個檔案64MB到128MB不等。而MongoDB則是細顆粒化的、以文件為單位的儲存。
  • HDFS不支援索引的概念,對資料的操作侷限於掃描性質的讀,MongoDB則支援基於二級索引的快速檢索。
  • MongoDB可以支援常見的增刪改查場景,而HDFS一般只是一次寫入後就很難進行修改。
  • 從響應時間上來說,HDFS一般是分鐘級別而MongoDB對手請求的響應時間通常以毫秒作為單位。

一個日誌的例子

如果說剛才的比較有些抽象,我們可以結合一個實際一點的例子來理解。

比如說,一個比較經典的案例可能是日誌記錄管理。在HDFS裡面你可能會用日期範圍來命名檔案,如7月1日,7月2日等等,每個檔案是個日誌文字檔案,可能會有幾萬到幾十萬行日誌。

而在MongoDB裡面,我們可以採用一個JSON的格式,每一條日誌就是一個JSON document。我們可以對某幾個關心的欄位建索引,如時間戳,錯誤型別等。

我們來考慮一些場景,加入我們相對7月份所有日誌做一些全量的統計,比如每個頁面的所有點選量,那麼這個HDFS和MongoDB都可以正常處理。

如果有一天你的經理告訴你:他想知道網站上每天有多少404錯誤在發生,這個時候如果你用HDFS,就還是需要通過全量掃描所有行,而MongoDB則可以通過索引,很快地找到所有的404日誌,可能花數秒鐘就可以解答你經理的問題。

又比如說,如果你希望對每個日誌項加一個自定義的屬性,在進行一些預處理後,MongoDB就會比較容地支援到。而一般來說,HDFS是不支援更新型別操作的。

好的我們瞭解了MongoDB為什麼可以替換HDFS並且為什麼有這個必要來做這個事情,下面我們就來看看Spark和MongoDB怎麼玩!

Spark + MongoDB

Spark的工作流程可以概括為三部曲:建立併發任務,對資料進行transformation操作,如map, filter,union,intersect等,然後執行運算,如reduce,count,或者簡單地收集結果。

這裡是Spark和MongoDB部署的一個典型架構。

Spark任務一般由Spark的driver節點發起,經過Spark Master進行資源排程分發。比如這裡我們有4個Spark worker節點,這些節點上的幾個executor 計算程序就會同時開始工作。一般一個core就對應一個executor。

每個executor會獨立的去MongoDB取來原始資料,直接套用Spark提供的分析演算法或者使用自定義流程來處理資料,計算完後把相應結果寫回到MongoDB。

我們需要提到的是:在這裡,所有和MongoDB的互動都是通過一個叫做Mongo-Spark的聯結器來完成的。

另一種常見的架構是結合MongoDB和HDFS的。Hadoop在非結構化資料處理的場景下要比MongoDB的普及率高。所以我們可以看到不少使用者會已經將資料存放在HDFS上。這個時候你可以直接在HDFS上面架Spark來跑,Spark從HDFS取來原始資料進行計算,而MongoDB在這個場景下是用來儲存處理結果。為什麼要這麼麻煩?幾個原因:

  • Spark處理結果數量可能會很大,比如說,個性化推薦可能會產生數百萬至數千萬條記錄,需要一個能夠支援每秒萬級寫入能力的資料庫
  • 處理結果可以直接用來驅動前臺APP,如使用者開啟頁面時獲取後臺已經為他準備好的推薦列表。

Mongo Spark Connector 聯結器

在這裡我們在介紹下MongoDB官方提供的Mongo Spark聯結器 。目前有3個聯結器可用,包括社群第三方開發的和之前Mongo Hadoop聯結器等,這個Mong Spark是最新的,也是我們推薦的連線方案。

這個聯結器是專門為Spark打造的,支援雙向資料,讀出和寫入。但是最關鍵的是 條件下推,也就是說:如果你在Spark端指定了查詢或者限制條件的情況下,這個條件會被下推到MongoDB去執行,這樣可以保證從MongoDB取出來、經過網路傳輸到Spark計算節點的資料確實都是用得著的。沒有下推支援的話,每次操作很可能需要從MongoDB讀取全量的資料,效能體驗將會很糟糕。拿剛才的日誌例子來說,如果我們只想對404錯誤日誌進行分析,看那些錯誤都是哪些頁面,以及每天錯誤頁面數量的變化,如果有條件下推,那麼我們可以給MongoDB一個限定條件:錯誤程式碼=404, 這個條件會在MongoDB伺服器端執行,這樣我們只需要通過網路傳輸可能只是全部日誌的0.1%的資料,而不是沒有條件下推情況下的全部資料。

另外,這個最新的聯結器還支援和Spark計算節點Co-Lo 部署。就是說在同一個節點上同時部署Spark例項和MongoDB例項。這樣做可以減少資料在網路上的傳輸帶來的資源消耗及時延。當然,這種部署方式需要注意記憶體資源和CPU資源的隔離。隔離的方式可以通過Linux的cgroups。

Spark + MongoDB 成功案例

目前已經有很多案例在不同的應用場景中使用Spark+MongoDB。

法國航空是法國最大的航空公司,為了提高客戶體驗,在最近施行的360度客戶檢視中,使用Spark對已經收集在MongoDB裡面的客戶資料進行分類及行為分析,並把結果(如客戶的類別、標籤等資訊)寫回到MongoDB內每一個客戶的文件結構裡。

Stratio是美國矽谷一家著名的金融大資料公司。他們最近在一家在31個國家有分支機構的跨國銀行實施了一個實時監控平臺。該銀行希望通過對日誌的監控和分析來保證客戶服務的響應時間以及實時監測一些可能的違規或者金融欺詐行為。在這個應用內, 他們使用了:

  • Apache Flume 來收集log
  • Spark來處理實時的log
  • MongoDB來儲存收集的log以及Spark分析的結果,如Key Performance Indicators等

在我們國內,東方航空最近剛完成了 一個Spark運價的POC測試。下面我們來看看他們做的事情。

東方航空的挑戰

東方航空作為國內的3大行之一,每天有1000多個航班,服務26萬多乘客。過去,顧客在網站上訂購機票,平均資料庫查詢200次就會下單訂購機票,但是現在平均要查詢1.2萬次才會發生一次訂購行為,同樣的訂單量,查詢量卻成長百倍。按照50%直銷率這個目標計算,東航的運價系統要支援每天16億的運價請求。

思路:空間換時間

當前的運價是通過實時計算的,按照現在的計算能力,需要對已有系統進行100多倍的擴容。另一個常用的思路,就是採用空間換時間的方式。與其對每一次的運價請求進行耗時300ms的運算,不如事先把所有可能的票價查詢組合窮舉出來並進行批量計算,然後把結果存入MongoDB裡面。當需要查詢運價時,直接按照 出發+目的地+日期的方式做一個快速的DB查詢,響應時間應該可以做到幾十毫秒。

那為什麼要用MongoDB?因為我們要處理的資料量龐大無比。按照1000多個航班,365天,26個倉位,100多渠道以及數個不同的航程型別,我們要實時存取的運價記錄有數十億條之多。這個已經遠遠超出常規RDBMS可以承受的範圍。

MongoDB基於記憶體快取的資料管理方式決定了對併發讀寫的響應可以做到很低延遲,水平擴充套件的方式可以通過多臺節點同時併發處理海量請求。

事實上,全球最大的航空分銷商,管理者全世界95%航空庫存的Amadeus也正是使用MongoDB作為其1000多億運價快取的儲存方案。

Spark + MongoDB 方案

我們知道MongoDB可以用來做我們海量運價資料的儲存方案,在大規模平行計算方案上,就可以用到嶄新的Spark技術。

這裡是一個運價系統的架構圖。 左邊是發起航班查詢請求的客戶端,首先會有API伺服器進行預處理。一般航班請求會分為庫存查詢和運價查詢。庫存查詢會直接到東航已有的庫存系統(Seat Inventory),同樣是實現在MongoDB上面的。在確定庫存後根據庫存結果再從Fare Cache系統內查詢相應的運價。

Spark叢集則是另外一套計算叢集,通過Spark MongoDB連線套件和MongoDB Fare Cache叢集連線。Spark 計算任務會定期觸發(如每天一次或者每4小時一次),這個任務會對所有的可能的運價組合進行全量計算,然後存入MongoDB,以供查詢使用。右半邊則把原來實時運算的叢集換成了Spark+MongoDB。Spark負責批量計算一年內所有航班所有倉位的所有價格,並以高併發的形式儲存到MongoDB裡面。每秒鐘處理的運價可以達到數萬條。

當來自客戶端的運價查詢達到服務端以後,服務端直接就向MongoDB發出按照日期,出發到達機場為條件的mongo查詢。

批處理計算流程

這裡是Spark計算任務的流程圖。需要計算的任務,也就是所有日期航班倉位的組合,事先已經存放到MongoDB裡面。

任務遞交到master,然後預先載入所需參考資料,broadcast就是把這些在記憶體裡的資料複製到每一個Spark計算節點的JVM,然後所有計算節點多執行緒併發執行,從Mongodb裡取出需要計算的倉位,呼叫東航自己的運價邏輯,得出結果以後,並儲存回MongoDB。

Spark 任務入口程式

Spark和MongoDB的連線使用非常簡單,下面就是一個程式碼示例:

處理能力和響應時間比較

這裡是一個在東航POC的簡單測試結果。從吞吐量的角度,新的API伺服器單節點就可以處理3400個併發的運價請求。在顯著提高了併發的同時,響應延遲則降低了10幾倍,平均10ms就可以返回運價結果。按照這個效能,6臺 API伺服器就可以應付將來每天16億的運價查詢。

Spark + MongoDB演示

接下來是一個簡單的Spark+MongoDB演示。

安裝 Spark

# curl -OL http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz
# mkdir -p ~/spark
# tar -xvf spark-1.6.0-bin-hadoop2.6.tgz -C ~/spark --strip-components=1

測試聯結器

# cd ~/spark
# ./bin/spark-shell 
--conf "spark.mongodb.input.uri=mongodb://127.0.0.1/flights.av" 
--conf "spark.mongodb.output.uri=mongodb://127.0.0.1/flights.output" 
--packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0


import com.mongodb.spark._
import org.bson.Document


MongoSpark.load(sc).take(10).foreach(println)

簡單分組統計

資料: 365天,所有航班庫存資訊,500萬文件

任務: 按航班統計一年內所有餘票量

MongoSpark.load(sc)
     .map(doc=>(doc.getString("flight") ,doc.getLong("seats")))
     .reduceByKey((x,y)=>(x+y))
      .take(10)
     .foreach(println)

簡單分組統計加條件過濾

資料: 365天,所有航班庫存資訊,500萬文件

任務: 按航班統計一年內所有庫存,但是隻處理昆明出發的航班

import org.bson.Document


MongoSpark.load(sc)
          .withPipeline(Seq(Document.parse("{ $match: { orig :  'KMG'  } }")))
    .map(doc=>(doc.getString("flight") ,doc.getLong("seats")))
    .reduceByKey((x,y)=>(x+y))
    .take(10)
    .foreach(println)

效能優化事項

  • 使用合適的chunksize (MB) Total data size / chunksize = chunks = RDD partitions = spark tasks
  • 不要將所有CPU核分配給Spark 預留1-2個core給作業系統及其他管理程序
  • 同機部署 適當情況可以同機部署Spark+MongoDB,利用本地IO提高效能

總結

上面只是一些簡單的演示,實際上Spark + MongoDB的使用可以通過Spark的很多種形式來使用。我們來總結一下Spark + MongoDB的應用場景。在座的同學可能很多人已經使用了MongoDB,也有些人已經使用了Hadoop。我們可以從兩個角度來考慮這個事情:

對那些已經使用MongoDB的使用者,如果你希望在你的MongoDB驅動的應用上提供個性化功能,比如說像Yahoo一樣為你找感興趣的新聞,能夠在你的MongoDB資料上利用到Spark強大的機器學習或者流處理,你就可以考慮在MongoDB叢集上部署Spark來實現這些功能。

如果你已經使用Hadoop而且資料已經在HDFS裡面,你可以考慮使用Spark來實現更加實時更加快速的分析型需求,並且如果你的分析結果有資料量大、格式多變以及這些結果資料要及時提供給前臺APP使用的需求,那麼MongoDB可以用來作為你分析結果的一個儲存方案。