1. 程式人生 > >1.使用Spark Shell進行互動式分析

1.使用Spark Shell進行互動式分析

使用Spark Shell進行互動式分析

基礎

Spark的shell提供了一個學習API的簡單方法,也是一個互動式分析資料的強大工具。它可以在Scala(在Java VM上執行,因此是使用現有Java庫的好方法)或Python中提供。通過在Spark目錄中執行以下程式碼來啟動它:

D:\spark-1.6.2-bin-hadoop2.6\bin>spark-shell

Spark的主要抽象是一個名為Resilient Distributed Dataset(RDD)。RDD可以通過Hadoop InputFormats(例如HDFS檔案)或通過轉換其他資料集來建立。讓我們從Spark源目錄中的README檔案的文字中建立一個新的資料集:

scala> val textFile = sc.textFile("../README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[5] at textFile at <console>:27

您可以直接從資料集中獲取值,通過呼叫某些操作,或者轉換資料集來獲取新的值

scala> textFile.count()
res0: Long = 95  //README.md總共有多少行資料

scala> textFile.first()
res1:
String = # Apache Spark //README.md第一行資料

現在讓我們轉換這個資料集到一個新的。我們呼叫filter返回一個新的資料集與檔案中的專案的一個子集。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) //transformation操作 含有”Spark“總共有多上行
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:29
scala> linesWithSpark.count() //action操作 res2: Long = 17

我們可以把 actions 和 transformations 連結在一起:

scala>  textFile.filter(line => line.contains("Spark")).count()
res3: Long = 17

更多的Rdd操作

RDD actions 和 transformations可用於更復雜的計算。比方說,我們想找到字數最多的單詞:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 14

首先將行對映成一個整型數值產生一個新 RDD。 在這個新的 RDD 上呼叫 reduce 找到行中最大的個數。 map 和 reduce 的引數是 Scala 的函式串(閉包),並且可以使用任何語言特性或者 Scala/Java 類庫。例如,我們可以很方便地呼叫其他的函式宣告。 我們使用 Math.max() 函式讓程式碼更容易理解:

scala>  import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 14

一種常見的資料流模式是MapReduce,正如Hadoop單詞統計一樣。Spark可以輕鬆實現MapReduce流程:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:30

在這裡,我們要求flatMap將一行資料集轉換為一個數據集的單詞,然後組合groupByKey並count計算檔案中每個單詞的數量作為(String,Long)對的資料集。為了收集我們shell中的字數,我們可以呼叫collect:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (["Specifying,1), ("yarn",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (DataFrames,,1), (provides,1), (refer,2)...

高效的快取

Spark還支援將資料集拉入叢集範圍內的記憶體快取。當重複訪問資料時,如查詢小的“熱”資料集或執行迭代演算法(如PageRank)時,這非常有用。作為一個簡單的例子,讓我們標記我們的linesWithSpark資料集被快取::

scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:29

scala> linesWithSpark.count()
res9: Long = 17