1. 程式人生 > >Spark (Python版) 零基礎學習筆記(一)—— 快速入門

Spark (Python版) 零基礎學習筆記(一)—— 快速入門

由於Scala才剛剛開始學習,還是對python更為熟悉,因此在這記錄一下自己的學習過程,主要內容來自於spark的官方幫助文件,這一節的地址為:

文章主要是翻譯了文件的內容,但也在裡邊加入了一些自己在實際操作中遇到的問題及解決的方案,和一些補充的小知識,一起學習。

環境:Ubuntu 16.04 LTS,Spark 2.0.1, Hadoop 2.7.3, Python 3.5.2

利用spark shell進行互動式分析

1. 基礎

首先開啟spark與python互動的API

$ cd /usr/local/spark
$ ./bin/pyspark

Spark最重要的一個概念就是RDD(Resilient Distributed Dataset),彈性分散式資料集。RDD可以利用Hadoop的InputFormats建立,或者從其他RDD轉換。

這裡,作為入門,我們利用spark安裝後文件夾中自帶的README.md(此檔案位置為/usr/local/spark/README.md)檔案作為例子,學習如何建立一個新的RDD。

建立新的RDD: 

>>> textFile = sc.textFile(“README.md”)

RDD支援兩種型別的操作,actions和transformtions:

actions: 在資料集上執行計算後返回值

transformations: 轉換, 從現有資料集建立一個新的資料集

RDD可以有執行一系列的動作(actions),這些動作可以返回值(values),轉換(transformations),或者指向新的RDD的指標。下邊學習RDD的一些簡單的動作:

>>> textFile.count()  # 計數,返回RDD中items的個數,這裡就是README.md的總行# 數
99
>>> textFile.first()  # RDD中的第一個item,這裡就是檔案README.md的第一行
u'# Apache Spark'

注意:如果之前是從/usr/local/spark啟動pyspark,然後讀取README.md檔案的,如果執行count語句,會出現以下錯誤:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/spark/README.md

這是因為在使用相對路徑時,系統預設是從hdfs://localhost:9000/目錄下讀取README.md檔案的,但是README.md檔案並不在這一目錄下,所以sc.textFile()必須使用絕對路徑,此時程式碼修改為:

>>> textFile = sc.textFile(“file:///usr/local/spark/README.md”)
99
下邊嘗試使用一個轉換(transformation)。例如,使用filter這一轉換返回一個新的RDD,這些RDD中的items都含有“Spark”字串。
>>> linesWithSpark = textFile.filter(lambda line: “Spark” in line)

我們還可以將actions和transformation連結起來:

>>> textFile.filter(lambda line: “Spark” in line).count()  # 有多好行含有“Spark”這一字串
19


2. 更多的RDD操作

利用RDD的動作和轉換能夠完成很多複雜的計算。例如,我們希望找到含有最後單詞的一句話:

>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a>b) else b)
22

這個語句中,map函式將len(line.split())這一語句在所有line上執行,返回每個line所含有的單詞個數,也就是將line都map到一個整數值,然後建立一個新的RDD。然後呼叫reduce,找到最大值。map和reduce函式裡的引數是python中的匿名函式(lambda),事實上,我們這裡也可以傳遞python中更頂層的函式。比如,我們先定義一個比較大小的函式,這樣我們的程式碼會更容易理解:

>>> def max(a, b):
. . .     if a > b:
. . .         return a
. . .     else:
. . .         return b
. . .
>>> textFile.map(lambda line: len(line.split())).reduce(max)
22

Hadoop掀起了MapReduce的熱潮。在spark中,能夠更加容易的實現MapReduce

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

上述語句中,利用flatMap, map和reduceByKey三個轉換,計算檔案README.md中每個單詞出現的個數,並返回一個新的RDD,每個item的格式為(string, int),即單詞和對應的出現次數。其中,

flatMap(func):與map相似,但是每個輸入的item能夠被map到0個或者更多的輸出items上,也就是說func的返回值應當是一個Seq,而不是一個單獨的item,上述語句中,匿名函式返回的就是一句話中所含的每個單詞

reduceByKey(func):可以作用於使用“鍵-值”(K, V)形式儲存的資料集上並返回一組新的資料集(K, V),其中,每個鍵的值為聚合使用func操作的結果,這裡相當於python中字典的含義。上述語句中,相當於當某個單詞出現一次時,就在這個單詞的出現次數上加1,每個單詞就是一個Key,reducByKey中的匿名函式計算單詞的出現次數。

要收集上述語句的計算結果,可以使用collect這一動作:

>>> wordCounts.collect()
[(u'when', 1), (u'R,', 1), (u'including', 3), (u'computation', 1), ...]

3. 快取Caching

Spark也支援將資料集存入叢集範圍的記憶體快取中。這對於需要進行重複訪問的資料非常有用,比如我們需要在一個小的資料集中執行查詢操作,或者需要執行一個迭代演算法(例如PageRank)。下面,利用之前命令中得到的linesWithSpark資料集,演示快取這一操作過程:

>>> linesWithSpark.cache()
PythonRDD[26] at RDD at PythonRDD.scala:48
>>> linesWithSpark.count()
19
>>> linesWithSpark.count()
19

利用Spark去快取一個100行的檔案可能並沒什麼意義。但是有趣的是,這一系列的操作可以用於非常大的資料集上,甚至含有成千上萬的節點的資料集。

4. 自含式應用程式(self-contained applications)

假設我們希望利用Spark API寫一個自含式應用程式,我們可以利用Scala,Java或者Python完成。

下邊,簡單介紹一下怎樣利用Python API (PySpark)寫一個應用程式,命名為SimpleApp.py.

在spark所在目錄下輸入:

./bin/spark-submit --master local[4] SimpleApp.py

輸出為:
Lines with a: 61, Lines with b: 27

此外,Spark自帶很多例子,可以在spark目錄下輸入下列指令檢視:

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R