1. 程式人生 > >Spark + Python入門

Spark + Python入門

Spark + Python實踐入門

配置好spark以及環境變數後在終端中輸入pyspark即可執行,spark配置過程:

Spark最重要的一個概念就是RDD(Resilient Distributed Dataset),彈性分散式資料集。RDD可以利用Hadoop的InputFormats建立,或者從其他RDD轉換。

這裡,作為入門,我們利用spark安裝後文件夾中自帶的README.md檔案作為例子,學習如何建立一個新的RDD。

建立新的RDD:

textFile = sc.textFile(“README.md”)#從檔案建立
rdd1 = sc.parallelize('dsaohnfoauhdwo')#從字串生成
rdd2 = sc.parallelize(([1,1],[1,3],[1,5],[2,1],[2,5]))#從二維陣列生成
rdd3 = sc.parallelize(range(10))#從陣列生成

若從檔案建立失敗,則可能是需要使用絕對路徑,如下所示:

>>> textFile = sc.textFile(“file:///usr/local/spark/README.md”)
99

RDD支援兩種型別的操作,actions和transformations:

actions: 在資料集上執行計算後返回值

transformations: 轉換, 從現有資料集建立一個新的資料集

RDD可以有執行一系列的動作(actions),這些動作可以返回值(values),轉換(transformations),或者指向新的RDD的指標。下邊學習RDD的一些簡單的動作:

>>> textFile.count()  # 計數,返回RDD中items的個數,這裡就是README.md的總行# 數
99
>>> textFile.first()  # RDD中的第一個item,這裡就是檔案README.md的第一行
u'# Apache Spark'

下邊嘗試使用一個轉換(transformation)。例如,使用filter這一轉換返回一個新的RDD,這些RDD中的items都含有“Spark”字串:

>>> linesWithSpark = textFile.filter(lambda line: “Spark” in line)
>>> linesWithSpark.count()
20

我們還可以將actions和transformation連結起來:

>>> textFile.filter(lambda line: “Spark” in line).count()  # 有多少行含有“Spark”這一字串
20

利用RDD的動作和轉換能夠完成很多複雜的計算。例如,我們希望找到含有最多單詞的一句話:

>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a>b) else b)
22

這個語句中,map函式將len(line.split())這一語句在所有line上執行,返回每個line所含有的單詞個數,也就是將line都map到一個整數值,然後建立一個新的RDD。然後呼叫reduce,找到最大值。map和reduce函式裡的引數是python中的匿名函式(lambda)

在spark中,能夠更加容易的實現MapReduce

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
>>> wordCounts.collect()
[(u'when', 1), (u'R,', 1), (u'including', 4), (u'computation', 1), ......

上述語句中,利用flatMap, map和reduceByKey三個轉換,計算檔案README.md中每個單詞出現的個數,並返回一個新的RDD,每個item的格式為(string, int),即單詞和對應的出現次數。其中,

flatMap(func): (原文)與map相似,但是每個輸入的item能夠被map到0個或者更多的輸出items上,也就是說func的返回值應當是一個Seq,而不是一個單獨的item,上述語句中,匿名函式返回的就是一句話中所含的每個單詞。 (我的理解)flatMap函式就是將傳入的func先作用到每個元素中去,然後將所有元素歸到一個數組中。例如,兩行句子[‘hello world’,‘good bye’],先被split轉化為了[[‘hello’,‘world’],[‘good’,‘bye’]],然後被flat為了[‘hello’,‘world’,‘good’,‘bye’]

reduceByKey(func):可以作用於使用“鍵-值”(K, V)形式儲存的資料集上並返回一組新的資料集(K, V),其中,每個鍵的值為聚合使用func操作的結果,這裡相當於python中字典的含義。上述語句中,相當於當某個單詞出現一次時,就在這個單詞的出現次數上加1,每個單詞就是一個Key,reducByKey中的匿名函式計算單詞的出現次數。

Spark也支援將資料集存入叢集範圍的記憶體快取中。這對於需要進行重複訪問的資料非常有用,比如我們需要在一個小的資料集中執行查詢操作,或者需要執行一個迭代演算法(例如PageRank)。下面,利用之前命令中得到的linesWithSpark資料集,演示快取這一操作過程:

>>> linesWithSpark.cache()
PythonRDD[26] at RDD at PythonRDD.scala:48
>>> linesWithSpark.count()
19

利用Spark去快取一個100行的檔案可能並沒什麼意義。但是有趣的是,這一系列的操作可以用於非常大的資料集上,甚至含有成千上萬的節點的資料集。

自含式應用程式(self-contained applications) 假設我們希望利用Spark API寫一個自含式應用程式,我們可以利用Scala,Java或者Python完成。

下邊,簡單介紹一下怎樣利用Python API (PySpark)寫一個應用程式,命名為SimpleApp.py.

spark-submit --master local[4] SimpleApp.py

輸出為:

Lines with a: 61, Lines with b: 27

此外,Spark自帶很多例子,可以在spark目錄下輸入下列指令檢視:

# For Scala and Java, use run-example:

./bin/run-example SparkPi

# For Python examples, use spark-submit directly:

./bin/spark-submit examples/src/main/python/pi.py
輸出:3.13428

# For R examples, use spark-submit directly:

./bin/spark-submit examples/src/main/r/dataframe.R