spark基本操作
[實驗目的]
- 熟悉通過pySpark介面訪問Spark的基本操作;
- 熟悉RDD的使用以及變換和動作;
- 熟悉Python中的匿名函式;
- 熟悉RDD的快取和儲存。
[實驗原理]
Apache Spark是一個大資料處理框架。Spark實現了許多資料處理和分析中的變換和演算法,並能進行分散式計算,而使用者不需要關心實現細節。Spark有非常良好的可擴充套件性和極高的計算效率,可以用於處理TB甚至PB級別的資料。
1. Spark基本原理
Spark叢集中有驅動器(driver)和執行器(executor),作業首先提交到驅動器,並由驅動器將作業(job)拆分為任務(task),並提交到執行器進行計算,最終結果會返回驅動器。使用Spark及其API時必須先有一個SparkContext
SparkContext
即建立了一個新的Spark應用。在本實驗中,SparkContext
已經自動建立完成,可以直接呼叫物件sc
來訪問Spark的各種功能。SparkContext
可以用於建立RDD(Resilient Distributed Dataset)。
在Spark中,資料集以RDD的形式儲存,會被分成不同的分割槽,每個分割槽都是資料集唯一的一個組成部分,分佈在不同的節點上。Spark區別於Hadoop的最主要特點,就是將資料儲存在記憶體中而不是磁碟中。這一特性使得Spark應用計算更快捷,因為不需要浪費時間從磁碟讀取資料。可以呼叫函式sc.parallelize()
2. RDD基本操作
RDD支援兩種操作:變換(transformation)和動作(action)。變換是指根據現有的資料集建立一個新的資料集。RDD一旦建立則無法修改,因此每次對RDD的變換其實建立了一個新的RDD。而動作是指在資料集上執行計算後,返回一個值給驅動程式。所有Spark中的變換都是惰性的,應用到基礎資料集上的這些變換不會馬上被計算,而只有在有動作發生,要求返回結果給驅動應用時,才會真正進行計算。這個設計讓Spark能夠更加效率地執行。
a. 變換函式
許多資料操作都是在資料集的每一個元素上進行,即變換。Spark可以將這些操作並行執行。對於每個資料分割槽,Spark會啟動一個任務(task)執行這些變換,並輸出一個新的資料分割槽。本實驗涉及到的常用變換函式有以下這些。
- 函式
map(f)
,將函式f()
應用於RDD的每一個元素; - 函式
filter(f)
,將函式f()
應用於資料集的每一個元素,僅保留返回值為真的元素; - 函式
flatMap(f)
,與函式map(f)
類似,將函式f()
應用於RDD的每一個元素並將結果化簡為一個列表; - 函式
reduceByKey(f)
,用於元素是鍵值對的RDD,將RDD鍵相同的元素化簡,函式f()
必須有兩個引數且返回一個值,並且需要滿足交換律和結合律,如果不滿足則每次返回的結果可能會不一致。
傳入變換函式的函式f()
也可以為匿名函式,用關鍵字lamda
定義。匿名函式僅限於一條命令。需要注意的是,匿名函式的使用從來不是必須的,完全可以定義一個正常函式取代之。使用匿名函式的一大好處就是程式碼顯得更緊湊。
b. 動作函式
本實驗涉及到的常用動作函式有以下這些。
- 函式
collect()
,將資料從分佈在各節點的執行器上的RDD分割槽收集到驅動器上,並新建立一個集合,必須確保返回的資料集足夠小,能夠存放在驅動器的記憶體中,不然驅動器會崩潰; - 函式
count()
,計算RDD的元素個數; - 函式
take(n)
,返回RDD的前nn個元素; - 函式
first()
,返回RDD的第一個元素,等同於take(1)
; - 函式
takeOrdered()
,返回RDD按升序或自定義順序排列的前nn個元素,與函式take()
和first()
相比,該函式返回的是確定的結果; - 函式
top()
,與函式takeOrdered()
類似,只不過返回降序排列的結果; - 函式
reduce(f)
,呼叫函式f()
將RDD的元素化簡,函式f()
必須有兩個引數且返回一個值,並且需要滿足交換律和結合律,如果不滿足則每次返回的結果可能會不一致。
c. 快取
如果計劃多次訪問某個RDD,可以呼叫函式cache()
將其快取在記憶體中,這樣可以節省磁碟互動開銷。但如果快取了太多的RDD使得Spark記憶體不足時,則會將最久沒有訪問過的RDD首先從記憶體中刪除。而如果該RDD被重新訪問,則該RDD會自動重新在記憶體中快取。可以通過RDD的is_cached
屬性檢視其快取狀態。而如果某個RDD不再需要訪問時,可以呼叫函式unpersist()
使其不再在記憶體中快取。
[實驗步驟]
1. 連線到大資料計算叢集
雙擊桌面上的xshell 5的快捷方式,通過xshell連線到大資料計算叢集。在命令列中輸入連線命令,將“<id>”替換成自己的使用者名稱,將“<master_host>”替換成實驗室當前環境的主機地址,並在彈出的對話方塊中輸入密碼。
[c:\~]$ ssh <id>@<master_ip>
2. 啟動Spark
在shell環境中通過命令pyspark
啟動Spark。
$ pyspark
# 省略輸出
3. 瞭解sc
物件
物件sc
是Spark各種功能的入口。檢視物件sc
的型別。
>>> type(sc)
<class 'pyspark.context.SparkContext'>
呼叫函式dir()
檢視物件sc
的可用屬性和方法。
>>> dir(sc)
['__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_accumulatorServer', '_active_spark_context', '_add_profile', '_batchSize', '_callsite', '_checkpointFile', '_conf', '_dictToJavaMap', '_do_init', '_ensure_initialized', '_gateway', '_getJavaStorageLevel', '_initialize_context', '_javaAccumulator', '_jsc', '_jvm', '_lock', '_next_accum_id', '_pickled_broadcast_vars', '_profile_stats', '_python_includes', '_temp_dir', '_unbatched_serializer', '_writeToFile', 'accumulator', 'addFile', 'addPyFile', 'appName', 'binaryFiles', 'binaryRecords', 'broadcast', 'cancelAllJobs', 'cancelJobGroup', 'clearFiles', 'defaultMinPartitions', 'defaultParallelism', 'dump_profiles', 'environment', 'getLocalProperty', 'hadoopFile', 'hadoopRDD', 'master', 'newAPIHadoopFile', 'newAPIHadoopRDD', 'parallelize', 'pickleFile', 'pythonExec', 'runJob', 'sequenceFile', 'serializer', 'setCheckpointDir', 'setJobGroup', 'setLocalProperty', 'setSystemProperty', 'show_profiles', 'sparkHome', 'sparkUser', 'stop', 'textFile', 'union', 'version', 'wholeTextFiles']
4. 建立一個整數RDD
呼叫函式xrange()
建立一個整數集合,並呼叫函式len()
檢視集合長度。
>>> data = xrange(1, 10001)
>>> len(data)
10000
呼叫函式sc.parallelize()
將該整數集合分成8個分割槽。
>>> xrangeRDD = sc.parallelize(data, 8)
呼叫函式toDebugString()
檢視RDD的變換傳承。
>>> print xrangeRDD.toDebugString()
(8) PythonRDD[1] at RDD at PythonRDD.scala:43 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:364 []
檢視RDD的型別,並呼叫函式getNumPartitions()
檢視RDD的分割槽數。
>>> type(xrangeRDD)
<class 'pyspark.rdd.PipelinedRDD'>
>>> xrangeRDD.getNumPartitions()
8
5. 呼叫變換函式map()
將RDD中每個元素的值減1
定義一個Python函式sub()
,該函式將輸入整數的值減1。
>>> def sub(value):
... return (value - 1)
...
呼叫函式map()
將函式sub()
應用於RDD的每一個元素,並呼叫函式toDebugString()
檢視RDD的變換層次。由於Spark採用惰性計算的策略,此時變換並沒有被真正執行,但可以通過函式toDebugString()
來檢視RDD的變換傳承。
>>> subRDD = xrangeRDD.map(sub)
>>> print subRDD.toDebugString()
(8) PythonRDD[2] at RDD at PythonRDD.scala:43 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:364 []
6. 呼叫動作函式collect()
檢視變換結果
呼叫函式collect()
將變換結果生成一個新的集合。由於Spark採用惰性計算的策略,直到呼叫函式collect()
時才真正執行了之前的操作(函式parallelize()
和map()
),任務才會真正啟動,每一個分割槽都會有一個相應的任務。輸出的結果中每一個元素都比輸入的值小了1。
>>> print subRDD.collect()
# 省略輸出
7. 呼叫動作函式count()
計算元素個數
呼叫函式count()
計算RDD中的元素個數。
>>> print subRDD.count()
# 省略部分輸出
8. 呼叫變換函式filter()
和動作函式collect()
過濾出資料集中小於10的元素
首先定義一個Python函式ten()
,輸入小於10則返回真,否則返回假。再將函式ten()
作為引數傳遞給變換函式filter()
。最後呼叫動作函式collect()
返回變換結果。
>>> def ten(value):
... if (value < 10):
... return True
... else:
... return False
...
>>> filteredRDD = subRDD.filter(ten)
>>> print filteredRDD.collect()
# 省略部分輸出
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
9. 將匿名函式傳入變換函式filter()
定義一個匿名函式實現上一步中的操作。
>>> lambdaRDD = subRDD.filter(lambda x: x < 10)
>>> lambdaRDD.collect()
# 省略部分輸出
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
10. 呼叫動作函式first()
和take()
取出RDD中的某個(些)元素
呼叫函式first()
返回RDD的第一個元素。
>>> print filteredRDD.first()
# 省略部分輸出
0
呼叫函式take(4)
和take(12)
返回RDD的前4和12個元素。即使RDD只有10個元素,還是可以呼叫take(12)
返回所有元素。
>>> print filteredRDD.take(4)
# 省略部分輸出
[0, 1, 2, 3]
>>> print filteredRDD.take(12)
# 省略部分輸出
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
11. 呼叫動作函式takeOrdered()
和top()
取出RDD中的某些元素
呼叫函式takeOrdered()
和top()
返回RDD的最小和最大的幾個元素。
>>> print filteredRDD.takeOrdered(3)
# 省略部分輸出
[0, 1, 2]
>>> print filteredRDD.top(5)
# 省略部分輸出
[9, 8, 7, 6, 5]
也可以傳入函式指定排列順序,這裡我們傳入一個匿名函式返回每個輸入的相反數,即反轉了排列順序。
>>> filteredRDD.takeOrdered(4, lambda s: -s)
# 省略部分輸出
[9, 8, 7, 6]
12. 呼叫動作函式reduce()
將RDD中所有元素求和。
將求和的匿名函式傳給函式reduce()
,將RDD中所有元素求和。
>>> print filteredRDD.reduce(lambda a, b: a + b)
# 省略部分輸出
45
傳入函式reduce(f)
的函式f()
必須滿足交換律和結合律,如不滿足,則可能返回不一致的結果。
>>> print filteredRDD.reduce(lambda a, b: a - b)
# 省略部分輸出
-45 # 每次輸出可能不同
>>> print filteredRDD.repartition(4).reduce(lambda a, b: a - b)
# 省略部分輸出
21 # 每次輸出可能不同
13. 呼叫變換函式flatMap()
使得一個輸入返回多個輸出,並與函式map()
作對比
建立一個包含多個單詞的RDD。
>>> wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
>>> wordsRDD = sc.parallelize(wordsList, 4)
分別呼叫變換函式map()
和flatMap()
,並呼叫動作函式collect()
對比返回的變換結果。可以發現函式map()
返回的是包含二元組列表,而函式flatMap()
則返回的是包含字串的列表。
>>> singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
>>> singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
>>> print singularAndPluralWordsRDDMap.collect()
# 省略部分輸出
[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
>>> print singularAndPluralWordsRDD.collect()
# 省略部分輸出
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
14. 呼叫變換函式reduceByKay()
生成一個包含單次原型和複數型的集合,並與函式map()
作對比
建立一個包含字母和整數型鍵值對的RDD。
>>> pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
呼叫變換函式reduceByKay()
將相同鍵的元組的值求和。
>>> print pairRDD.reduceByKey(lambda a, b: a + b).collect()
# 省略部分輸出
[('a', 3), ('b', 1)]
15. 將RDD在記憶體中快取,並檢視其快取狀態
首先呼叫函式setName()
設定RDD的名稱。再呼叫函式cache()
將RDD快取在記憶體中。最後通過屬性is_cached
檢視其快取狀態。
>>> filteredRDD.setName('My Filtered RDD')
My Filtered RDD PythonRDD[5] at collect at <stdin>:1
>>> filteredRDD.cache()
My Filtered RDD PythonRDD[5] at collect at <stdin>:1
>>> print filteredRDD.is_cached
True
16. 將RDD停止在記憶體中快取,並檢視其快取狀態
首先呼叫函式unpersist()
將RDD停止在記憶體中快取。再通過屬性is_cached
檢視其快取狀態。
>>> filteredRDD.unpersist()
# 省略部分輸出
My Filtered RDD PythonRDD[5] at collect at <stdin>:1
>>> print filteredRDD.is_cached
False
[附錄:輸入程式碼清單]
1. 連線到大資料計算叢集
2. 啟動Spark
3. 瞭解sc
物件
type(sc)
dir(sc)
4. 建立一個整數RDD
data = xrange(1, 10001)
len(data)
xrangeRDD = sc.parallelize(data, 8)
print xrangeRDD.toDebugString()
type(xrangeRDD)
xrangeRDD.getNumPartitions()
5. 呼叫變換函式map()
將RDD中每個元素的值減1
def sub(value):
return (value - 1)
subRDD = xrangeRDD.map(sub)
print subRDD.toDebugString()
6. 呼叫動作函式collect()
檢視變換結果
print subRDD.collect()
7. 呼叫動作函式count()
計算元素個數
print subRDD.count()
8. 呼叫變換函式filter()
和動作函式collect()
過濾出資料集中小於10的元素
def ten(value):
if (value < 10):
return True
else:
return False
filteredRDD = subRDD.filter(ten)
print filteredRDD.collect()
9. 將匿名函式傳入變換函式filter()
lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()
10. 呼叫動作函式first()
和take()
取出RDD中的某個(些)元素
print filteredRDD.first()
print filteredRDD.take(4)
print filteredRDD.take(12)
11. 呼叫動作函式takeOrdered()
和top()
取出RDD中的某些元素
print filteredRDD.takeOrdered(3)
print filteredRDD.top(5)
filteredRDD.takeOrdered(4, lambda s: -s)
12. 呼叫動作函式reduce()
將RDD中所有元素求和。
print filteredRDD.reduce(lambda a, b: a + b)
print filteredRDD.reduce(lambda a, b: a - b)
print filteredRDD.repartition(4).reduce(lambda a, b: a - b)
13. 呼叫變換函式flatMap()
使得一個輸入返回多個輸出,並與函式map()
作對比
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
print singularAndPluralWordsRDDMap.collect()
print singularAndPluralWordsRDD.collect()
14. 呼叫變換函式reduceByKay()
生成一個包含單次原型和複數型的集合,並與函式map()
作對比
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
print pairRDD.reduceByKey(lambda a, b: a + b).collect()
15. 將RDD在記憶體中快取,並檢視其快取狀態
filteredRDD.setName('My Filtered RDD')
filteredRDD.cache()
print filteredRDD.is_cached
16. 將RDD停止在記憶體中快取,並檢視其快取狀態
filteredRDD.unpersist()
print filteredRDD.is_cached