1. 程式人生 > >Apache Spark 入門簡介

Apache Spark 入門簡介

我是在2013年底第一次聽說Spark,當時我對Scala很感興趣,而Spark就是使用Scala編寫的。一段時間之後,我做了一個有趣的資料科學專案,它試著去預測在泰坦尼克號上倖存。對於進一步瞭解Spark內容和程式設計來說,這是一個很好的方式。對於任何有追求的、正在思考如何著手 Spark 的程式設計師,我都非常推薦這個專案。

今天,Spark已經被很多巨頭使用,包括Amazon、eBay以及Yahoo!。很多組織都在擁有成千上萬節點的叢集上執行Spark。根據Spark FAQ,已知的最大的Spark叢集擁有超過8000個節點。Spark確實是一個值得好好考慮和學習的技術。

在這裡插入圖片描述

這篇文章會向你介紹Spark,包括用例和示例。其中的資訊來自於Apache Spark網站以及 學習Spark – 快如閃電的大資料分析 一書。 加入大資料學習qq群:458345782,有大量乾貨(零基礎以及進階的經典實戰)分享給大家 Apache Spark是什麼?一個簡單介紹

Spark是一個Apache專案,它被標榜為“快如閃電的叢集計算”。它擁有一個繁榮的開源社群,並且是目前最活躍的Apache專案。

Spark提供了一個更快、更通用的資料處理平臺。和Hadoop相比,Spark可以讓你的程式在記憶體中執行時速度提升100倍,或者在磁碟上執行時速度提升10倍。去年,在100 TB Daytona GraySort比賽中,Spark戰勝了Hadoop,它只使用了十分之一的機器,但執行速度提升了3倍。Spark也已經成為 針對 PB 級別資料排序的最快的開源引擎。 sparkContext.textFile(“hdfs://…”) .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile(“hdfs://…”) Spark也讓我們更快地編寫程式碼變得可能,這就好像有80多個高水平的操作員在幫你處理。為了說明這一點,我們來看一下大資料中的“Hello World!”:單詞個數統計示例。在MapReduce中,我們需要編寫大概50行程式碼來實現這一功能,但對於Spark(以及Scala)來說,你可以像下面這樣簡單實現:

sparkContext.textFile(“hdfs://…”) .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile(“hdfs://…”) 在學習如何使用Apache Spark時,另外一個重要的部分就是互動式shell(REPL),它是開箱即用的。通過使用REPL,我們可以測試每一行程式碼的輸出,而無需首先編寫和執行整個作業(job)。這樣,你可以更快得到可工作的程式碼,並且點對點資料分析也變得可能。

Spark還提供了其它一些關鍵特性:

目前提供了針對Scala、Java和Python的API,即將提供針對其它語言(例如R)的支援。 可以很好地和Hadoop生態系統和資料來源(HDFS、Amazon S3、Hive、HBase、Cassandra等)進行整合。 可以執行在由Hadoop YARN或者Apache Mesos管理的叢集上,也可以執行在單獨的叢集上。 Spark核心由一組功能強大的、高級別的庫組成,這些庫可以無縫的應用到同一個應用程式中。目前這些庫包括SparkSQL、Spark Streaming、MLlib(用於機器學習)以及GraphX,我們會在稍後針對每一個庫進行進一步描述。 其它一些Spark庫和擴充套件也在陸續開發過程中。

Spark Core

Spark Core是一個基本引擎,用於大規模並行和分散式資料處理。它主要負責:

記憶體管理和故障恢復 在叢集上安排、分佈和監控作業 和儲存系統進行互動 Spark引入了一個稱為彈性分散式資料集(RDD,Resilient Distributed Dataset)的概念,它是一個不可變的、容錯的、分散式物件集合,我們可以並行的操作這個集合。RDD可以包含任何型別的物件,它在載入外部資料集或者從驅動應用程式分發集合時建立。

RDD支援兩種操作型別:

轉換是一種操作(例如對映、過濾、聯接、聯合等等),它在一個RDD上執行操作,然後建立一個新的RDD來儲存結果。 行動是一種操作(例如歸併、計數、第一等等),它在一個RDD上執行某種計算,然後將結果返回。 在Spark中,轉換是“懶惰”的,也就是說它們不會立刻計算出結果。相反,它們只是“記住”要執行的操作以及要操作的資料集(例如檔案)。只有當行為被呼叫時,轉換才會真正的進行計算,並將結果返回給驅動器程式。這種設計讓Spark執行得更有效率。例如,如果一個大檔案要通過各種方式進行轉換操作,並且檔案被傳遞給第一個行為,那麼Spark只會處理檔案的第一行內容並將結果返回,而不會處理整個檔案。

預設情況下,當你在經過轉換的RDD上執行一個行為時,這個RDD有可能會被重新計算。然而,你也可以通過使用持久化或者快取的方法,將一個RDD持久化從年初在記憶體中,這樣,Spark就會在叢集上保留這些元素,當你下一次查詢它時,查詢速度會快很多。

SparkSQL

SparkSQL是Spark的一個元件,它支援我們通過SQL或者Hive查詢語言來查詢資料。它最初來自於Apache Hive專案,用於執行在Spark上(來代替MapReduce),現在它已經被整合到Spark堆中。除了針對各種各樣的資料來源提供支援,它還讓程式碼轉換與SQL查詢編織在一起變得可能,這最終會形成一個非常強大的工具。下面是一個相容Hive的查詢示例:

// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql(“CREATE TABLE IF NOT EXISTS src (key INT, value STRING)”) sqlContext.sql(“LOAD DATA LOCAL INPATH ‘examples/src/main/resources/kv1.txt’ INTO TABLE src”)

// Queries are expressed in HiveQL sqlContext.sql(“FROM src SELECT key, value”).collect().foreach(println)

Spark Streaming

Spark Streaming支援對流資料的實時處理,例如產品環境web伺服器的日誌檔案(例如Apache Flume和HDFS/S3)、諸如Twitter的社交媒體以及像Kafka那樣的各種各樣的訊息佇列。在這背後,Spark Streaming會接收輸入資料,然後將其分為不同的批次,接下來Spark引擎來處理這些批次,並根據批次中的結果,生成最終的流。整個過程如下所示。

Spark Streaming API可以非常緊密匹配Spark核心API,這使得程式設計師可以很容易的工作在批處理資料和流資料的海洋中。

MLlib

MLlib是一個機器學習庫,它提供了各種各樣的演算法,這些演算法用來在叢集上針對分類、迴歸、聚類、協同過濾等(可以在 machine learning 上檢視Toptal的文章,來獲取更過的資訊)。其中一些演算法也可以應用到流資料上,例如使用普通最小二乘法或者K均值聚類(還有更多)來計算線性迴歸。Apache Mahout(一個針對Hadoop的機器學習庫)已經脫離MapReduce,轉而加入Spark MLlib。

GraphX

GraphX是一個庫,用來處理圖,執行基於圖的並行操作。它針對ETL、探索性分析和迭代圖計算提供了統一的工具。除了針對圖處理的內建操作,GraphX還提供了一個庫,用於通用的圖演算法,例如PageRank。

如何使用Apache Spark:事件探測用例

既然我們已經回答了“Apache Spark是什麼?”這個問題,接下來讓我們思考一下,使用Spark來解決什麼樣的問題或者挑戰最有效率。

最近,我偶然看到了一篇關於 通過分析Twitter流的方式來探測地震 的文章。它展示了這種技術可以比日本氣象廳更快的通知你日本哪裡發生了地震。雖然那篇文章使用了不同的技術,但我認為這是一個很好的示例,可以用來說明我們如何通過簡單的程式碼片段,在不需要”膠水程式碼“的情況下應用Spark。

首先,我們需要處理tweet,將那些和”地震“或”震動“等相關的內容過濾出來。我們可以使用Spark Streaming的方式很容易實現這一目標,如下所示:

TwitterUtils.createStream(…) .filter(_.getText.contains(“earthquake”) || _.getText.contains(“shaking”)) 然後,我們需要在tweets上執行一些語義分析,來確定它們是否代表當前發生了地震。例如,像“地震!”或者“現在正在震動”這樣的tweets,可能會被認為是正向匹配,而像“參加一個地震會議”或者“昨天的地震真可怕”這樣的tweets,則不是。這篇文章的作者使用了一個支援向量機(support vector machine, SVM)來實現這一點。我們在這裡使用同樣的方式,但也可以試一下 流版本。一個使用了MLlib的程式碼示例如下所示:

// We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, “sample_earthquate_tweets.txt”)

// Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1)

// Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold. model.clearThreshold()

// Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) }

// Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC()

println("Area under ROC = " + auROC)

如果對於這個模型的預測比例滿意,我們可以繼續往下走,無論何時發現地震,我們都要做出反應。為了檢測一個地震,我們需要在一個指定的時間視窗內(如文章中所述)有一定數量(例如密度)的正向tweets。請注意,對於帶有Twitter位置服務資訊的tweets來說,我們還能夠從中提取地震的位置資訊。有了這個只是以後,我們可以使用SparkSQL來查詢現有的Hive表(儲存那些對接收地震通知感興趣的使用者)來獲取使用者的郵箱地址,並向他們傳送一些個性化的警告郵件,如下所示:

// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql(“FROM earthquake_warning_users SELECT firstName, lastName, city, email”) .collect().foreach(sendEmail)

其它Apache Spark用例

當然,Spark潛在的用例遠遠超出了地震預測。

下面是一個針對其它一些用例示例(當然遠遠沒有列舉全部),這些用例都需要快速處理各種各樣的大資料,Spark也非常適合處理這些用例:

在遊戲領域,如果能從實時遊戲的事件的潛流中處理和發現模式,並能夠快速做出響應,這種能力可以帶來一門賺錢的生意,針對這種目的的例子包括玩家保留、定位廣告、自動調整複雜度等等。

在電子商務領域,實時交易的資訊可以被傳到像K均值這樣的流聚集演算法或者像ALS這樣的協同過濾的演算法上。而產生的結果可能會組合其它一些非結構化的資料來源,例如客戶評論或者產品評審。隨著時間的推移,我們可以用它來提升和改進系統的推薦功能。

在金融或者安全領域,Spark技術棧可以用於欺詐或者入侵檢測系統或者基於風險的認證系統。通過分析大規模的壓縮日誌,並結合外部資料來源,例如已經洩漏的資料以及洩漏的賬戶、從連線/請求中得到的一些諸如IP地址或者時間等資訊,我們可以實現一個非常好的結果。

結論

總之,Spark可以幫助我們簡化處理那些需要處理大量實時或壓縮資料的計算密集型任務和挑戰。這些資料既包括結構化資料,也包括非結構化資料。Spark可以和其它一些複雜能力進行無縫整合,例如機器學習、圖演算法等。Spark將大資料處理變得“接地氣”。趕快來試試吧。