1. 程式人生 > 實用技巧 >spark基本操作

spark基本操作

[實驗目的]

  • 熟悉通過pySpark介面訪問Spark的基本操作;
  • 熟悉RDD的使用以及變換和動作;
  • 熟悉Python中的匿名函式;
  • 熟悉RDD的快取和儲存。

[實驗原理]

Apache Spark是一個大資料處理框架。Spark實現了許多資料處理和分析中的變換和演算法,並能進行分散式計算,而使用者不需要關心實現細節。Spark有非常良好的可擴充套件性和極高的計算效率,可以用於處理TB甚至PB級別的資料。

1. Spark基本原理

Spark叢集中有驅動器(driver)和執行器(executor),作業首先提交到驅動器,並由驅動器將作業(job)拆分為任務(task),並提交到執行器進行計算,最終結果會返回驅動器。使用Spark及其API時必須先有一個SparkContext

。建立一個新的SparkContext即建立了一個新的Spark應用。在本實驗中,SparkContext已經自動建立完成,可以直接呼叫物件sc來訪問Spark的各種功能。SparkContext可以用於建立RDD(Resilient Distributed Dataset)。

在Spark中,資料集以RDD的形式儲存,會被分成不同的分割槽,每個分割槽都是資料集唯一的一個組成部分,分佈在不同的節點上。Spark區別於Hadoop的最主要特點,就是將資料儲存在記憶體中而不是磁碟中。這一特性使得Spark應用計算更快捷,因為不需要浪費時間從磁碟讀取資料。可以呼叫函式sc.parallelize()

建立RDD,並指定分割槽的數量。

2. RDD基本操作

RDD支援兩種操作:變換(transformation)和動作(action)。變換是指根據現有的資料集建立一個新的資料集。RDD一旦建立則無法修改,因此每次對RDD的變換其實建立了一個新的RDD。而動作是指在資料集上執行計算後,返回一個值給驅動程式。所有Spark中的變換都是惰性的,應用到基礎資料集上的這些變換不會馬上被計算,而只有在有動作發生,要求返回結果給驅動應用時,才會真正進行計算。這個設計讓Spark能夠更加效率地執行。

a. 變換函式

許多資料操作都是在資料集的每一個元素上進行,即變換。Spark可以將這些操作並行執行。對於每個資料分割槽,Spark會啟動一個任務(task)執行這些變換,並輸出一個新的資料分割槽。本實驗涉及到的常用變換函式有以下這些。

  • 函式map(f),將函式f()應用於RDD的每一個元素;
  • 函式filter(f),將函式f()應用於資料集的每一個元素,僅保留返回值為真的元素;
  • 函式flatMap(f),與函式map(f)類似,將函式f()應用於RDD的每一個元素並將結果化簡為一個列表;
  • 函式reduceByKey(f),用於元素是鍵值對的RDD,將RDD鍵相同的元素化簡,函式f()必須有兩個引數且返回一個值,並且需要滿足交換律和結合律,如果不滿足則每次返回的結果可能會不一致。

傳入變換函式的函式f()也可以為匿名函式,用關鍵字lamda定義。匿名函式僅限於一條命令。需要注意的是,匿名函式的使用從來不是必須的,完全可以定義一個正常函式取代之。使用匿名函式的一大好處就是程式碼顯得更緊湊。

b. 動作函式

本實驗涉及到的常用動作函式有以下這些。

  • 函式collect(),將資料從分佈在各節點的執行器上的RDD分割槽收集到驅動器上,並新建立一個集合,必須確保返回的資料集足夠小,能夠存放在驅動器的記憶體中,不然驅動器會崩潰;
  • 函式count(),計算RDD的元素個數;
  • 函式take(n),返回RDD的前nn個元素;
  • 函式first(),返回RDD的第一個元素,等同於take(1)
  • 函式takeOrdered(),返回RDD按升序或自定義順序排列的前nn個元素,與函式take()first()相比,該函式返回的是確定的結果;
  • 函式top(),與函式takeOrdered()類似,只不過返回降序排列的結果;
  • 函式reduce(f),呼叫函式f()將RDD的元素化簡,函式f()必須有兩個引數且返回一個值,並且需要滿足交換律和結合律,如果不滿足則每次返回的結果可能會不一致。

c. 快取

如果計劃多次訪問某個RDD,可以呼叫函式cache()將其快取在記憶體中,這樣可以節省磁碟互動開銷。但如果快取了太多的RDD使得Spark記憶體不足時,則會將最久沒有訪問過的RDD首先從記憶體中刪除。而如果該RDD被重新訪問,則該RDD會自動重新在記憶體中快取。可以通過RDD的is_cached屬性檢視其快取狀態。而如果某個RDD不再需要訪問時,可以呼叫函式unpersist()使其不再在記憶體中快取。

[實驗步驟]

1. 連線到大資料計算叢集

雙擊桌面上的xshell 5的快捷方式,通過xshell連線到大資料計算叢集。在命令列中輸入連線命令,將“<id>”替換成自己的使用者名稱,將“<master_host>”替換成實驗室當前環境的主機地址,並在彈出的對話方塊中輸入密碼。

[c:\~]$ ssh <id>@<master_ip>

2. 啟動Spark

在shell環境中通過命令pyspark啟動Spark。

$ pyspark
# 省略輸出

3. 瞭解sc物件

物件sc是Spark各種功能的入口。檢視物件sc的型別。

>>> type(sc)
<class 'pyspark.context.SparkContext'>

呼叫函式dir()檢視物件sc的可用屬性和方法。

>>> dir(sc)
['__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_accumulatorServer', '_active_spark_context', '_add_profile', '_batchSize', '_callsite', '_checkpointFile', '_conf', '_dictToJavaMap', '_do_init', '_ensure_initialized', '_gateway', '_getJavaStorageLevel', '_initialize_context', '_javaAccumulator', '_jsc', '_jvm', '_lock', '_next_accum_id', '_pickled_broadcast_vars', '_profile_stats', '_python_includes', '_temp_dir', '_unbatched_serializer', '_writeToFile', 'accumulator', 'addFile', 'addPyFile', 'appName', 'binaryFiles', 'binaryRecords', 'broadcast', 'cancelAllJobs', 'cancelJobGroup', 'clearFiles', 'defaultMinPartitions', 'defaultParallelism', 'dump_profiles', 'environment', 'getLocalProperty', 'hadoopFile', 'hadoopRDD', 'master', 'newAPIHadoopFile', 'newAPIHadoopRDD', 'parallelize', 'pickleFile', 'pythonExec', 'runJob', 'sequenceFile', 'serializer', 'setCheckpointDir', 'setJobGroup', 'setLocalProperty', 'setSystemProperty', 'show_profiles', 'sparkHome', 'sparkUser', 'stop', 'textFile', 'union', 'version', 'wholeTextFiles']

4. 建立一個整數RDD

呼叫函式xrange()建立一個整數集合,並呼叫函式len()檢視集合長度。

>>> data = xrange(1, 10001)
>>> len(data)
10000

呼叫函式sc.parallelize()將該整數集合分成8個分割槽。

>>> xrangeRDD = sc.parallelize(data, 8)

呼叫函式toDebugString()檢視RDD的變換傳承。

>>> print xrangeRDD.toDebugString()
(8) PythonRDD[1] at RDD at PythonRDD.scala:43 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:364 []

檢視RDD的型別,並呼叫函式getNumPartitions()檢視RDD的分割槽數。

>>> type(xrangeRDD)
<class 'pyspark.rdd.PipelinedRDD'>
>>> xrangeRDD.getNumPartitions()
8

5. 呼叫變換函式map()將RDD中每個元素的值減1

定義一個Python函式sub(),該函式將輸入整數的值減1。

>>> def sub(value):
...     return (value - 1)
... 

呼叫函式map()將函式sub()應用於RDD的每一個元素,並呼叫函式toDebugString()檢視RDD的變換層次。由於Spark採用惰性計算的策略,此時變換並沒有被真正執行,但可以通過函式toDebugString()來檢視RDD的變換傳承。

>>> subRDD = xrangeRDD.map(sub)
>>> print subRDD.toDebugString()
(8) PythonRDD[2] at RDD at PythonRDD.scala:43 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:364 []

6. 呼叫動作函式collect()檢視變換結果

呼叫函式collect()將變換結果生成一個新的集合。由於Spark採用惰性計算的策略,直到呼叫函式collect()時才真正執行了之前的操作(函式parallelize()map()),任務才會真正啟動,每一個分割槽都會有一個相應的任務。輸出的結果中每一個元素都比輸入的值小了1。

>>> print subRDD.collect()
# 省略輸出

7. 呼叫動作函式count()計算元素個數

呼叫函式count()計算RDD中的元素個數。

>>> print subRDD.count()
# 省略部分輸出

8. 呼叫變換函式filter()和動作函式collect()過濾出資料集中小於10的元素

首先定義一個Python函式ten(),輸入小於10則返回真,否則返回假。再將函式ten()作為引數傳遞給變換函式filter()。最後呼叫動作函式collect()返回變換結果。

>>> def ten(value):
...     if (value < 10):
...         return True
...     else:
...         return False
... 
>>> filteredRDD = subRDD.filter(ten)
>>> print filteredRDD.collect()
# 省略部分輸出
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

9. 將匿名函式傳入變換函式filter()

定義一個匿名函式實現上一步中的操作。

>>> lambdaRDD = subRDD.filter(lambda x: x < 10)
>>> lambdaRDD.collect()
# 省略部分輸出
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

10. 呼叫動作函式first()take()取出RDD中的某個(些)元素

呼叫函式first()返回RDD的第一個元素。

>>> print filteredRDD.first()
# 省略部分輸出
0

呼叫函式take(4)take(12)返回RDD的前4和12個元素。即使RDD只有10個元素,還是可以呼叫take(12)返回所有元素。

>>> print filteredRDD.take(4)
# 省略部分輸出
[0, 1, 2, 3]
>>> print filteredRDD.take(12)
# 省略部分輸出
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

11. 呼叫動作函式takeOrdered()top()取出RDD中的某些元素

呼叫函式takeOrdered()top()返回RDD的最小和最大的幾個元素。

>>> print filteredRDD.takeOrdered(3)
# 省略部分輸出
[0, 1, 2]
>>> print filteredRDD.top(5)
# 省略部分輸出
[9, 8, 7, 6, 5]

也可以傳入函式指定排列順序,這裡我們傳入一個匿名函式返回每個輸入的相反數,即反轉了排列順序。

>>> filteredRDD.takeOrdered(4, lambda s: -s)
# 省略部分輸出
[9, 8, 7, 6]

12. 呼叫動作函式reduce()將RDD中所有元素求和。

將求和的匿名函式傳給函式reduce(),將RDD中所有元素求和。

>>> print filteredRDD.reduce(lambda a, b: a + b)
# 省略部分輸出
45

傳入函式reduce(f)的函式f()必須滿足交換律和結合律,如不滿足,則可能返回不一致的結果。

>>> print filteredRDD.reduce(lambda a, b: a - b)
# 省略部分輸出
-45 # 每次輸出可能不同
>>> print filteredRDD.repartition(4).reduce(lambda a, b: a - b)
# 省略部分輸出
21 # 每次輸出可能不同

13. 呼叫變換函式flatMap()使得一個輸入返回多個輸出,並與函式map()作對比

建立一個包含多個單詞的RDD。

>>> wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
>>> wordsRDD = sc.parallelize(wordsList, 4)

分別呼叫變換函式map()flatMap(),並呼叫動作函式collect()對比返回的變換結果。可以發現函式map()返回的是包含二元組列表,而函式flatMap()則返回的是包含字串的列表。

>>> singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
>>> singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
>>> print singularAndPluralWordsRDDMap.collect()
# 省略部分輸出
[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
>>> print singularAndPluralWordsRDD.collect()
# 省略部分輸出
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']

14. 呼叫變換函式reduceByKay()生成一個包含單次原型和複數型的集合,並與函式map()作對比

建立一個包含字母和整數型鍵值對的RDD。

>>> pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])

呼叫變換函式reduceByKay()將相同鍵的元組的值求和。

>>> print pairRDD.reduceByKey(lambda a, b: a + b).collect()
# 省略部分輸出
[('a', 3), ('b', 1)]

15. 將RDD在記憶體中快取,並檢視其快取狀態

首先呼叫函式setName()設定RDD的名稱。再呼叫函式cache()將RDD快取在記憶體中。最後通過屬性is_cached檢視其快取狀態。

>>> filteredRDD.setName('My Filtered RDD')
My Filtered RDD PythonRDD[5] at collect at <stdin>:1
>>> filteredRDD.cache()
My Filtered RDD PythonRDD[5] at collect at <stdin>:1
>>> print filteredRDD.is_cached
True

16. 將RDD停止在記憶體中快取,並檢視其快取狀態

首先呼叫函式unpersist()將RDD停止在記憶體中快取。再通過屬性is_cached檢視其快取狀態。

>>> filteredRDD.unpersist()
# 省略部分輸出
My Filtered RDD PythonRDD[5] at collect at <stdin>:1
>>> print filteredRDD.is_cached
False

[附錄:輸入程式碼清單]

1. 連線到大資料計算叢集

2. 啟動Spark

3. 瞭解sc物件

type(sc)
dir(sc)

4. 建立一個整數RDD

data = xrange(1, 10001)
len(data)
xrangeRDD = sc.parallelize(data, 8)
print xrangeRDD.toDebugString()
type(xrangeRDD)
xrangeRDD.getNumPartitions()

5. 呼叫變換函式map()將RDD中每個元素的值減1

def sub(value):
    return (value - 1)

subRDD = xrangeRDD.map(sub)
print subRDD.toDebugString()

6. 呼叫動作函式collect()檢視變換結果

print subRDD.collect()

7. 呼叫動作函式count()計算元素個數

print subRDD.count()

8. 呼叫變換函式filter()和動作函式collect()過濾出資料集中小於10的元素

def ten(value):
    if (value < 10):
        return True
    else:
        return False

filteredRDD = subRDD.filter(ten)
print filteredRDD.collect()

9. 將匿名函式傳入變換函式filter()

lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()

10. 呼叫動作函式first()take()取出RDD中的某個(些)元素

print filteredRDD.first()
print filteredRDD.take(4)
print filteredRDD.take(12)

11. 呼叫動作函式takeOrdered()top()取出RDD中的某些元素

print filteredRDD.takeOrdered(3)
print filteredRDD.top(5)
filteredRDD.takeOrdered(4, lambda s: -s)

12. 呼叫動作函式reduce()將RDD中所有元素求和。

print filteredRDD.reduce(lambda a, b: a + b)
print filteredRDD.reduce(lambda a, b: a - b)
print filteredRDD.repartition(4).reduce(lambda a, b: a - b)

13. 呼叫變換函式flatMap()使得一個輸入返回多個輸出,並與函式map()作對比

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
print singularAndPluralWordsRDDMap.collect()
print singularAndPluralWordsRDD.collect()

14. 呼叫變換函式reduceByKay()生成一個包含單次原型和複數型的集合,並與函式map()作對比

pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
print pairRDD.reduceByKey(lambda a, b: a + b).collect()

15. 將RDD在記憶體中快取,並檢視其快取狀態

filteredRDD.setName('My Filtered RDD')
filteredRDD.cache()
print filteredRDD.is_cached

16. 將RDD停止在記憶體中快取,並檢視其快取狀態

filteredRDD.unpersist()
print filteredRDD.is_cached