1. 程式人生 > 其它 >【Spark研究】用Apache Spark進行大資料處理第一部分:入門介紹

【Spark研究】用Apache Spark進行大資料處理第一部分:入門介紹

什麼是Spark

Apache Spark是一個圍繞速度、易用性和複雜分析構建的大資料處理框架。最初在2009年由加州大學伯克利分校的AMPLab開發,並於2010年成為Apache的開源專案之一。

與Hadoop和Storm等其他大資料和MapReduce技術相比,Spark有如下優勢。

首先,Spark為我們提供了一個全面、統一的框架用於管理各種有著不同性質(文字資料、圖表資料等)的資料集和資料來源(批量資料或實時的流資料)的大資料處理的需求。

Spark可以將Hadoop叢集中的應用在記憶體中的執行速度提升100倍,甚至能夠將應用在磁碟上的執行速度提升10倍。

Spark讓開發者可以快速的用Java、Scala或Python編寫程式。它本身自帶了一個超過80個高階操作符集合。而且還可以用它在shell中以互動式地查詢資料。

除了Map和Reduce操作之外,它還支援SQL查詢,流資料,機器學習和圖表資料處理。開發者可以在一個數據管道用例中單獨使用某一能力或者將這些能力結合在一起使用。

在這個Apache Spark文章系列的第一部分中,我們將瞭解到什麼是Spark,它與典型的MapReduce解決方案的比較以及它如何為大資料處理提供了一套完整的工具。

Hadoop和Spark

Hadoop這項大資料處理技術大概已有十年曆史,而且被看做是首選的大資料集合處理的解決方案。MapReduce是一路計算的優秀解決方案,不過對於需要多路計算和演算法的用例來說,並非十分高效。資料處理流程中的每一步都需要一個Map階段和一個Reduce階段,而且如果要利用這一解決方案,需要將所有用例都轉換成MapReduce模式。

在下一步開始之前,上一步的作業輸出資料必須要儲存到分散式檔案系統中。因此,複製和磁碟儲存會導致這種方式速度變慢。另外Hadoop解決方案中通常會包含難以安裝和管理的叢集。而且為了處理不同的大資料用例,還需要整合多種不同的工具(如用於機器學習的Mahout和流資料處理的Storm)。

如果想要完成比較複雜的工作,就必須將一系列的MapReduce作業串聯起來然後順序執行這些作業。每一個作業都是高時延的,而且只有在前一個作業完成之後下一個作業才能開始啟動。

而Spark則允許程式開發者使用有向無環圖(DAG)開發複雜的多步資料管道。而且還支援跨有向無環圖的記憶體資料共享,以便不同的作業可以共同處理同一個資料。

Spark執行在現有的Hadoop分散式檔案系統基礎之上(HDFS)提供額外的增強功能。它支援將Spark應用部署到現存的Hadoop v1叢集(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN叢集甚至是Apache Mesos之中。

我們應該將Spark看作是Hadoop MapReduce的一個替代品而不是Hadoop的替代品。其意圖並非是替代Hadoop,而是為了提供一個管理不同的大資料用例和需求的全面且統一的解決方案。

Spark特性

Spark通過在資料處理過程中成本更低的洗牌(Shuffle)方式,將MapReduce提升到一個更高的層次。利用記憶體資料儲存和接近實時的處理能力,Spark比其他的大資料處理技術的效能要快很多倍。

Spark還支援大資料查詢的延遲計算,這可以幫助優化大資料處理流程中的處理步驟。Spark還提供高階的API以提升開發者的生產力,除此之外還為大資料解決方案提供一致的體系架構模型。

Spark將中間結果儲存在記憶體中而不是將其寫入磁碟,當需要多次處理同一資料集時,這一點特別實用。Spark的設計初衷就是既可以在記憶體中又可以在磁碟上工作的執行引擎。當記憶體中的資料不適用時,Spark操作符就會執行外部操作。Spark可以用於處理大於叢集記憶體容量總和的資料集。

Spark會嘗試在記憶體中儲存儘可能多的資料然後將其寫入磁碟。它可以將某個資料集的一部分存入記憶體而剩餘部分存入磁碟。開發者需要根據資料和用例評估對記憶體的需求。Spark的效能優勢得益於這種記憶體中的資料儲存。

Spark的其他特性包括:

  • 支援比Map和Reduce更多的函式。
  • 優化任意操作運算元圖(operator graphs)。
  • 可以幫助優化整體資料處理流程的大資料查詢的延遲計算。
  • 提供簡明、一致的Scala,Java和Python API。
  • 提供互動式Scala和Python Shell。目前暫不支援Java。

Spark是用Scala程式設計語言編寫而成,運行於Java虛擬機器(JVM)環境之上。目前支援如下程式設計語言編寫Spark應用:

  • Scala
  • Java
  • Python
  • Clojure
  • R

Spark生態系統

除了Spark核心API之外,Spark生態系統中還包括其他附加庫,可以在大資料分析和機器學習領域提供更多的能力。

這些庫包括:

  • Spark Streaming:
    • Spark Streaming基於微批量方式的計算和處理,可以用於處理實時的流資料。它使用DStream,簡單來說就是一個彈性分散式資料集(RDD)系列,處理實時資料。
  • Spark SQL:
    • Spark SQL可以通過JDBC API將Spark資料集暴露出去,而且還可以用傳統的BI和視覺化工具在Spark資料上執行類似SQL的查詢。使用者還可以用Spark SQL對不同格式的資料(如JSON,Parquet以及資料庫等)執行ETL,將其轉化,然後暴露給特定的查詢。
  • Spark MLlib:
    • MLlib是一個可擴充套件的Spark機器學習庫,由通用的學習演算法和工具組成,包括二元分類、線性迴歸、聚類、協同過濾、梯度下降以及底層優化原語。
  • Spark GraphX:
    • GraphX是用於圖計算和並行圖計算的新的(alpha)Spark API。通過引入彈性分散式屬性圖(Resilient Distributed Property Graph),一種頂點和邊都帶有屬性的有向多重圖,擴充套件了Spark RDD。為了支援圖計算,GraphX暴露了一個基礎操作符集合(如subgraph,joinVertices和aggregateMessages)和一個經過優化的Pregel API變體。此外,GraphX還包括一個持續增長的用於簡化圖分析任務的圖演算法和構建器集合。

除了這些庫以外,還有一些其他的庫,如BlinkDB和Tachyon。

BlinkDB是一個近似查詢引擎,用於在海量資料上執行互動式SQL查詢。BlinkDB可以通過犧牲資料精度來提升查詢響應時間。通過在資料樣本上執行查詢並展示包含有意義的錯誤線註解的結果,操作大資料集合。

Tachyon是一個以記憶體為中心的分散式檔案系統,能夠提供記憶體級別速度的跨叢集框架(如Spark和MapReduce)的可信檔案共享。它將工作集檔案快取在記憶體中,從而避免到磁碟中載入需要經常讀取的資料集。通過這一機制,不同的作業/查詢和框架可以以記憶體級的速度訪問快取的檔案。 此外,還有一些用於與其他產品整合的介面卡,如Cassandra(Spark Cassandra 聯結器)和R(SparkR)。Cassandra Connector可用於訪問儲存在Cassandra資料庫中的資料並在這些資料上執行資料分析。

下圖展示了在Spark生態系統中,這些不同的庫之間的相互關聯。

圖1. Spark框架中的庫

我們將在這一系列文章中逐步探索這些Spark庫

Spark體系架構

Spark體系架構包括如下三個主要元件:

  • 資料儲存
  • API
  • 管理框架

接下來讓我們詳細瞭解一下這些元件。

資料儲存:

Spark用HDFS檔案系統儲存資料。它可用於儲存任何兼容於Hadoop的資料來源,包括HDFS,HBase,Cassandra等。

API:

利用API,應用開發者可以用標準的API介面建立基於Spark的應用。Spark提供Scala,Java和Python三種程式設計語言的API。

下面是三種語言Spark API的網站連結。

  • Scala API
  • Java
  • Python

資源管理:

Spark既可以部署在一個單獨的伺服器也可以部署在像Mesos或YARN這樣的分散式計算框架之上。

下圖2展示了Spark體系架構模型中的各個元件。

圖2 Spark體系架構

彈性分散式資料集

彈性分散式資料集(基於Matei的研究論文)或RDD是Spark框架中的核心概念。可以將RDD視作資料庫中的一張表。其中可以儲存任何型別的資料。Spark將資料儲存在不同分割槽上的RDD之中。

RDD可以幫助重新安排計算並優化資料處理過程。

此外,它還具有容錯性,因為RDD知道如何重新建立和重新計算資料集。

RDD是不可變的。你可以用變換(Transformation)修改RDD,但是這個變換所返回的是一個全新的RDD,而原有的RDD仍然保持不變。

RDD支援兩種型別的操作:

  • 變換(Transformation)
  • 行動(Action)

變換:變換的返回值是一個新的RDD集合,而不是單個值。呼叫一個變換方法,不會有任何求值計算,它只獲取一個RDD作為引數,然後返回一個新的RDD。

變換函式包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。

行動:行動操作計算並返回一個新的值。當在一個RDD物件上呼叫行動函式時,會在這一時刻計算全部的資料處理查詢並返回結果值。

行動操作包括:reduce,collect,count,first,take,countByKey以及foreach。

如何安裝Spark

安裝和使用Spark有幾種不同方式。你可以在自己的電腦上將Spark作為一個獨立的框架安裝或者從諸如Cloudera,HortonWorks或MapR之類的供應商處獲取一個Spark虛擬機器映象直接使用。或者你也可以使用在雲端環境(如Databricks Cloud)安裝並配置好的Spark。

在本文中,我們將把Spark作為一個獨立的框架安裝並在本地啟動它。最近Spark剛剛釋出了1.2.0版本。我們將用這一版本完成示例應用的程式碼展示。

如何執行Spark

當你在本地機器安裝了Spark或使用了基於雲端的Spark後,有幾種不同的方式可以連線到Spark引擎。

下表展示了不同的Spark執行模式所需的Master URL引數。

如何與Spark互動

Spark啟動並執行後,可以用Spark shell連線到Spark引擎進行互動式資料分析。Spark shell支援Scala和Python兩種語言。Java不支援互動式的Shell,因此這一功能暫未在Java語言中實現。

可以用spark-shell.cmd和pyspark.cmd命令分別執行Scala版本和Python版本的Spark Shell。

Spark網頁控制檯

不論Spark執行在哪一種模式下,都可以通過訪問Spark網頁控制檯檢視Spark的作業結果和其他的統計資料,控制檯的URL地址如下:

http://localhost:4040

Spark控制檯如下圖3所示,包括Stages,Storage,Environment和Executors四個標籤頁

(點選檢視大圖)

圖3. Spark網頁控制檯

共享變數

Spark提供兩種型別的共享變數可以提升叢集環境中的Spark程式執行效率。分別是廣播變數和累加器。

廣播變數:廣播變數可以在每臺機器上快取只讀變數而不需要為各個任務傳送該變數的拷貝。他們可以讓大的輸入資料集的叢集拷貝中的節點更加高效。

下面的程式碼片段展示瞭如何使用廣播變數。

//
// Broadcast Variables
//
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

累加器:只有在使用相關操作時才會新增累加器,因此它可以很好地支援並行。累加器可用於實現計數(就像在MapReduce中那樣)或求和。可以用add方法將執行在叢集上的任務新增到一個累加器變數中。不過這些任務無法讀取變數的值。只有驅動程式才能夠讀取累加器的值。

下面的程式碼片段展示瞭如何使用累加器共享變數:

//
// Accumulators
//

val accum = sc.accumulator(0, "My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

accum.value

Spark應用示例

本篇文章中所涉及的示例應用是一個簡單的字數統計應用。這與學習用Hadoop進行大資料處理時的示例應用相同。我們將在一個文字檔案上執行一些資料分析查詢。本示例中的文字檔案和資料集都很小,不過無須修改任何程式碼,示例中所用到的Spark查詢同樣可以用到大容量資料集之上。

為了讓討論儘量簡單,我們將使用Spark Scala Shell。

首先讓我們看一下如何在你自己的電腦上安裝Spark。

前提條件:

  • 為了讓Spark能夠在本機正常工作,你需要安裝Java開發工具包(JDK)。這將包含在下面的第一步中。
  • 同樣還需要在電腦上安裝Spark軟體。下面的第二步將介紹如何完成這項工作。

注:下面這些指令都是以Windows環境為例。如果你使用不同的作業系統環境,需要相應的修改系統變數和目錄路徑已匹配你的環境。

I. 安裝JDK

1)從Oracle網站上下載JDK。推薦使用JDK 1.7版本。

將JDK安裝到一個沒有空格的目錄下。對於Windows使用者,需要將JDK安裝到像c:dev這樣的資料夾下,而不能安裝到“c:Program Files”資料夾下。“c:Program Files”資料夾的名字中包含空格,如果軟體安裝到這個資料夾下會導致一些問題。

注:不要在“c:Program Files”資料夾中安裝JDK或(第二步中所描述的)Spark軟體。

2)完成JDK安裝後,切換至JDK 1.7目錄下的”bin“資料夾,然後鍵入如下命令,驗證JDK是否正確安裝:

java -version

如果JDK安裝正確,上述命令將顯示Java版本。

II. 安裝Spark軟體:

從Spark網站上下載最新版本的Spark。在本文發表時,最新的Spark版本是1.2。你可以根據Hadoop的版本選擇一個特定的Spark版本安裝。我下載了與Hadoop 2.4或更高版本匹配的Spark,檔名是spark-1.2.0-bin-hadoop2.4.tgz。

將安裝檔案解壓到本地資料夾中(如:c:dev)。

為了驗證Spark安裝的正確性,切換至Spark資料夾然後用如下命令啟動Spark Shell。這是Windows環境下的命令。如果使用Linux或Mac OS,請相應地編輯命令以便能夠在相應的平臺上正確執行。

c:
cd c:devspark-1.2.0-bin-hadoop2.4
binspark-shell

如果Spark安裝正確,就能夠在控制檯的輸出中看到如下資訊。

….
15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/_,_/_/ /_/_   version 1.2.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
….
15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager
15/01/17 23:17:53 INFO SparkILoop: Created spark context..
Spark context available as sc.

可以鍵入如下命令檢查Spark Shell是否工作正常。

sc.version

(或)

sc.appName

完成上述步驟之後,可以鍵入如下命令退出Spark Shell視窗:

:quit

如果想啟動Spark Python Shell,需要先在電腦上安裝Python。你可以下載並安裝Anaconda,這是一個免費的Python發行版本,其中包括了一些比較流行的科學、數學、工程和資料分析方面的Python包。

然後可以執行如下命令啟動Spark Python Shell:

c:
cd c:devspark-1.2.0-bin-hadoop2.4
binpyspark

Spark示例應用

完成Spark安裝並啟動後,就可以用Spark API執行資料分析查詢了。

這些從文字檔案中讀取並處理資料的命令都很簡單。我們將在這一系列文章的後續文章中向大家介紹更高階的Spark框架使用的用例。

首先讓我們用Spark API執行流行的Word Count示例。如果還沒有執行Spark Scala Shell,首先開啟一個Scala Shell視窗。這個示例的相關命令如下所示:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
 
val txtFile = "README.md"
val txtData = sc.textFile(txtFile)
txtData.cache()

我們可以呼叫cache函式將上一步生成的RDD物件儲存到快取中,在此之後Spark就不需要在每次資料查詢時都重新計算。需要注意的是,cache()是一個延遲操作。在我們呼叫cache時,Spark並不會馬上將資料儲存到記憶體中。只有當在某個RDD上呼叫一個行動時,才會真正執行這個操作。

現在,我們可以呼叫count函式,看一下在文字檔案中有多少行資料。

txtData.count()

然後,我們可以執行如下命令進行字數統計。在文字檔案中統計資料會顯示在每個單詞的後面。

val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

wcData.collect().foreach(println)

如果想檢視更多關於如何使用Spark核心API的程式碼示例,請參考網站上的Spark文件。

後續計劃

在後續的系列文章中,我們將從Spark SQL開始,學習更多關於Spark生態系統的其他部分。之後,我們將繼續瞭解Spark Streaming,Spark MLlib和Spark GraphX。我們也會有機會學習像Tachyon和BlinkDB等框架。

小結

在本文中,我們瞭解了Apache Spark框架如何通過其標準API幫助完成大資料處理和分析工作。我們還對Spark和傳統的MapReduce實現(如Apache Hadoop)進行了比較。Spark與Hadoop基於相同的HDFS檔案儲存系統,因此如果你已經在Hadoop上進行了大量投資和基礎設施建設,可以一起使用Spark和MapReduce。

此外,也可以將Spark處理與Spark SQL、機器學習以及Spark Streaming結合在一起。關於這方面的內容我們將在後續的文章中介紹。

利用Spark的一些整合功能和介面卡,我們可以將其他技術與Spark結合在一起。其中一個案例就是將Spark、Kafka和Apache Cassandra結合在一起,其中Kafka負責輸入的流式資料,Spark完成計算,最後Cassandra NoSQL資料庫用於儲存計算結果資料。

不過需要牢記的是,Spark生態系統仍不成熟,在安全和與BI工具整合等領域仍然需要進一步的改進。

參考文獻

  • Spark主站
  • Spark示例
  • 2014年Spark峰會簡報和視訊
  • Spark on Databricks website
  • Databricks網站上的Spark欄目

來源:http://www.infoq.com/cn/articles/apache-spark-introduction