1. 程式人生 > 實用技巧 >Spark系列:Python版Spark程式設計指南

Spark系列:Python版Spark程式設計指南

目錄

一、介紹

二、連線Spark

三、建立RDD

四、RDD常用的轉換 Transformation

五、RDD常用的執行動作 Action

二、連線Spark

Spark1.3.0只支援Python2.6或更高的版本(但不支援Python3)。它使用了標準的CPython直譯器,所以諸如NumPy一類的C庫也是可以使用的。

通過Spark目錄下的bin/spark-submit指令碼你可以在Python中執行Spark應用。這個指令碼會載入Spark的Java/Scala庫然後讓你將應用提交到叢集中。你可以執行bin/pyspark來開啟Python的互動命令列。

如果你希望訪問HDFS上的資料,你需要為你使用的HDFS版本建立一個PySpark連線。常見的HDFS版本標籤都已經列在了這個第三方發行版頁面。

最後,你需要將一些Spark的類import到你的程式中。加入如下這行:

from pyspark import SparkContext, SparkConf

在一個Spark程式中要做的第一件事就是建立一個SparkContext物件來告訴Spark如何連線一個叢集。為了建立SparkContext,你首先需要建立一個SparkConf物件,這個物件會包含你的應用的一些相關資訊。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName引數是在叢集UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN叢集的URL,如果你在本地執行那麼這個引數應該是特殊的”local”字串。在實際使用中,當你在叢集中執行你的程式,你一般不會把master引數寫死在程式碼中,而是通過用spark-submit執行程式來獲得這個引數。但是,在本地測試以及單元測試時,你仍需要自行傳入”local”來執行Spark程式。

三、建立RDD

Spark是以RDD概念為中心執行的。RDD是一個容錯的、可以被並行操作的元素集合。建立一個RDD有兩個方法:在你的驅動程式中並行化一個已經存在的集合;從外部儲存系統中引用一個數據集,這個儲存系統可以是一個共享檔案系統,比如HDFS、HBase或任意提供了Hadoop輸入格式的資料來源。

並行化集合

並行化集合是通過在驅動程式中一個現有的迭代器或集合上呼叫SparkContext的parallelize方法建立的。為了建立一個能夠並行操作的分佈資料集,集合中的元素都會被拷貝。比如,以下語句建立了一個包含1到5的並行化集合:

data = [1, 2, 3, 4, 5]
distData 
= sc.parallelize(data)

分佈資料集(distData)被建立起來之後,就可以進行並行操作了。比如,我們可以呼叫disData.reduce(lambda a, b: a+b)來對元素進行疊加。在後文中我們會描述分佈資料集上支援的操作。

並行集合的一個重要引數是將資料集劃分成分片的數量。對每一個分片,Spark會在叢集中執行一個對應的任務。 典型情況下,叢集中的每一個CPU將對應執行2-4個分片。一般情況下,Spark會根據當前叢集的情況自行設定分片數量。但是,你也可以通過將第二個參 數傳遞給parallelize方法(比如sc.parallelize(data, 10))來手動確定分片數量。注意:有些程式碼中會使用切片(slice,分片的同義詞)這個術語來保持向下相容性。

一個簡單的示例:

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
rdd=sc.parallelize([1,2,3,4,5])
rdd1=rdd.map(lambda r:r+10) #map對每一個元素操作
print(rdd1.collect())

外部資料集

PySpark可以通過Hadoop支援的外部資料來源(包括本地檔案系統、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分佈資料集。Spark支援文字檔案、序列檔案以及其他任何Hadoop輸入格式檔案。

通過文字檔案建立RDD要使用SparkContext的textFile方法。這個方法會使用一個檔案的URI(或本地檔案路徑,hdfs://、s3n://這樣的URI等等)然後讀入這個檔案建立一個文字行的集合。以下是一個例子:

>>> distFile = sc.textFile("data.txt")

建立完成後distFile上就可以呼叫資料集操作了。比如,我們可以呼叫map和reduce操作來疊加所有文字行的長度,程式碼如下:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

在Spark中讀入檔案時有幾點要注意:

  • 如果使用了本地檔案路徑時,要保證在worker節點上這個檔案也能夠通過這個路徑訪問。這點可以通過將這個檔案拷貝到所有worker上或者使用網路掛載的共享檔案系統來解決。
  • 包括textFile在內的所有基於檔案的Spark讀入方法,都支援將資料夾、壓縮檔案、包含萬用字元的路徑作為引數。比如,以下程式碼都是合法的:
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
  • textFile方法也可以傳入第二個可選引數來控制檔案的分片數量。預設情況下,Spark會為檔案的每一個塊(在HDFS中塊的大小預設是64MB) 建立一個分片。但是你也可以通過傳入一個更大的值來要求Spark建立更多的分片。注意,分片的數量絕不能小於檔案塊的數量。

除了文字檔案之外,Spark的Python API還支援多種其他資料格式:

  • SparkContext.wholeTextFiles能夠讀入包含多個小文字檔案的目錄,然後為每一個檔案返回一個(檔名,內容)對。這是與textFile方法為每一個文字行返回一條記錄相對應的。
  • RDD.saveAsPickleFile和SparkContext.pickleFile支援將RDD以序列化的Python物件格式儲存起來。序列化的過程中會以預設10個一批的數量批量處理。
  • 序列檔案和其他Hadoop輸入輸出格式。
注意

這個特性目前仍處於試驗階段,被標記為Experimental,目前只適用於高階使用者。這個特性在未來可能會被基於Spark SQL的讀寫支援所取代,因為Spark SQL是更好的方式。

可寫型別支援

PySpark序列檔案支援利用Java作為中介載入一個鍵值對RDD,將可寫型別轉化成Java的基本型別,然後使用Pyrolite將java結果物件序列化。當將一個鍵值對RDD儲存到一個序列檔案中時PySpark將會執行上述過程的相反過程。首先將Python物件反序列化成Java物件,然後轉化成可寫型別。以下可寫型別會自動轉換:

| 可寫型別 | Python型別 |

  • | Text | unicode str|
  • | IntWritable | int |
  • | FloatWritable | float |
  • | DoubleWritable | float |
  • | BooleanWritable | bool |
  • | BytesWritable | bytearray |
  • | NullWritable | None |
  • | MapWritable | dict |

陣列是不能自動轉換的。使用者需要在讀寫時指定ArrayWritable的子型別.在讀入的時候,預設的轉換器會把自定義的ArrayWritable子 型別轉化成Java的Object[],之後序列化成Python的元組。為了獲得Python的array.array型別來使用主要型別的陣列,使用者 需要自行指定轉換器。

儲存和讀取序列檔案

和文字檔案類似,序列檔案可以通過指定路徑來儲存與讀取。鍵值型別都可以自行指定,但是對於標準可寫型別可以不指定。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
儲存和讀取其他Hadoop輸入輸出格式

PySpark同樣支援寫入和讀出其他Hadoop輸入輸出格式,包括’新’和’舊’兩種Hadoop MapReduce API。如果有必要,一個Hadoop配置可以以Python字典的形式傳入。以下是一個例子,使用了Elasticsearch ESInputFormat:

$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

四、RDD常用的轉換 Transformation

RDD支援兩類操作:轉化操作,用於從已有的資料集轉化產生新的資料集;啟動操作,用於在計算結束後向驅動程式返回結果。舉個例子,map是一個轉化操作,可以將資料集中每一個元素傳給一個函式,同時將計算結果作為一個新的RDD返回。另一方面,reduce操作是一個啟動操作,能夠使用某些函式來聚集計算RDD中所有的元素,並且向驅動程式返回最終結果(同時還有一個並行的reduceByKey操作可以返回一個分佈資料集)。

在Spark所有的轉化操作都是惰性求值的,就是說它們並不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作物件(比如:一個檔案)。只有當一個啟動操作被執行,要向驅動程式返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark執行更加高效——比如,我們會發覺由map操作產生的資料集將會在reduce操作中用到,之後僅僅是返回了reduce的最終的結果而不是map產生的龐大資料集。

在預設情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過呼叫persist(或cache)方法來將RDD持久化到記憶體中,這樣Spark就可以在下次使用這個資料集時快速獲得。Spark同樣提供了對將RDD持久化到硬碟上或在多個節點間複製的支援。

下面的表格列出了Spark支援的常用轉化操作。欲知細節,請查閱RDD API文件(Scala,Java,Python)和鍵值對RDD函式文件(Scala,Java)。

轉化操作 | 作用
————| ——
map(func) | 返回一個新的分佈資料集,由原資料集元素經func處理後的結果組成
filter(func) | 返回一個新的資料集,由傳給func返回True的原資料集元素組成
flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值
mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立執行,所以func的引數和返回值必須都是迭代器
mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個引數,第一個是分片的序號,第二個是迭代器。返回值還是迭代器
sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,然後替換或不替換
union(otherDataset) | 返回新的資料集,包括原資料集和引數資料集的所有元素
intersection(otherDataset) | 返回新資料集,是兩個集的交集
distinct([numTasks]) | 返回新的集,包括原集中的不重複元素
groupByKey([numTasks]) | 當用於鍵值對RDD時返回(鍵,值迭代器)對的資料集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算
sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個引數決定
join(otherDataset, [numTasks]) | 用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD
cogroup(otherDataset, [numTasks]) | 用於兩個鍵值對RDD時返回(K, (V迭代器, W迭代器))RDD
cartesian(otherDataset) | 用於T和U型別RDD時返回(T, U)對型別鍵值對RDD
pipe(command, [envVars]) | 通過shell命令管道處理每個RDD分片
coalesce(numPartitions) | 把RDD的分片數量降低到引數大小
repartition(numPartitions) | 重新打亂RDD中元素順序並重新分片,數量由引數決定
repartitionAndSortWithinPartitions(partitioner) | 按照引數給定的分片器重新分片,同時每個分片內部按照鍵排序

具體示例:

map

將函式作用於資料集的每一個元素上,生成一個分散式的資料集返回

Return a new RDD by applying a function to each element of this RDD.

>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]

一個完整的例子:

from <span class='wp_keywordlink_affiliate'><a href="https://www.168seo.cn/tag/pyspark" title="View all posts in pyspark" target="_blank">pyspark</a></span> import SparkConf,SparkContext

#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
data = range(10)

print(list(data))
r1 = sc.parallelize(data)

r2 = r1.map(lambda x:x+1)

print(r2.collect())
sc.stop()

結果是:

filter

返回所有 funtion 返回值為True的函式,生成一個分散式的資料集返回

Return a new RDD containing only the elements that satisfy a predicate.
 
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]

一個完整的例子:

from pyspark import SparkConf,SparkContext
 
#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
data = range(10)
 
print(list(data))
r1 = sc.parallelize(data)
 
r2 = r1.filter(lambda x:x>5)
 
print(r2.collect())
 
sc.stop()
 

結果是:

flatMap

Return a new RDD by first applying a function to all elements of thisRDD, and then flattening the results.

一個完整的例子:

from pyspark import SparkConf,SparkContext

#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)

data = ["hello zeropython","hello 168seo.cn"]

# print(list(data))
r1 = sc.parallelize(data)

r2 = r1.flatMap(lambda x:x.split(" "))
r3 = r1.map(lambda x:x.split(" "))

print(r2.collect())
print(r3.collect())


sc.stop()
RDD, and then flattening the results.

結果是:

groupBykey

按照相同key的資料分成一組

from _operator import add
 
from pyspark import SparkConf,SparkContext
 
#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
"""
  Return a new RDD by first applying a function to all elements of this
        RDD, and then flattening the results.
"""
data = ["hello zeropython","hello 168seo.cn"]
 
# print(list(data))
r1 = sc.parallelize(data)
 
r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda y:(y,1))
print("r2",r2.collect())
r3 = r2.groupByKey()
print("r3",r3.collect())
 
r4 = r3.map(lambda x:{x[0]:list(x[1])})
 
print("r4",r4.collect())
 
 
print(r2.reduceByKey(add).collect())
 
sc.stop()
 

結果是:

groupBy運算
groupBy運算可以按照傳入匿名函式的規則,將資料分為多個Array。比如下面的程式碼將intRDD分為偶數和奇數:

result = intRDD.groupBy(lambda x : x % 2).collect()
print (sorted([(x, sorted(y)) for (x, y) in result]))

輸出為:

[(0, [2]), (1, [1, 3, 5, 5])]

reduceBykey

把相同的key 的資料分發到一起 並進行運算

from _operator import add
 
from pyspark import SparkConf,SparkContext
 
#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
 
data = ["hello zeropython","hello 168seo.cn"]
 
# print(list(data))
r1 = sc.parallelize(data)
 
r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1))
 
print("r2",r2.collect())
r3 = r2.reduceByKey(lambda x,y:x+y)
 
print("r3",r3.collect())
 
sc.stop()
 

結果是:

sortbykey

Sorts this RDD, which is assumed to consist of (key, value) pairs.

from _operator import add
 
from pyspark import SparkConf,SparkContext
 
#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
# sc.setLogLevel("FATAL")
# sc.setLogLevel("ERROR")
sc.setLogLevel("ERROR")
data = ["hello zeropython","hwlldsf world","168seo.cn","168seo.cn","hello 168seo.cn"]
 
# print(list(data))
r1 = sc.parallelize(data)
 
r2 = r1.flatMap(lambda x:x.split(" "))\
    .map(lambda y:(y,1))\
    .reduceByKey(lambda x,y:x+y)\
    .sortByKey(lambda x:x[1])
    # sortByKey排序根據關鍵詞的值進行排序
    # reduceByKey 讓[("a",[1,1,1,1])] 轉換成 [("a",3)]
 
print(r2.collect())
 
sc.stop()

結果是:

union

1 2 3 4 5 6 7 8 """ Return the union of this RDD and another one. >>> rdd = sc.parallelize([1, 1, 2, 3]) >>> rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] """

distinct

1 2 3 4 5 6 7 """ Return a new RDD containing the distinct elements in this RDD. >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) [1, 2, 3] """

join

1 2 3 4 5 >>> a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")]) >>> b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")]) >>> a.join(b).collect() [('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))]

leftOuterJoin

1 2 3 >>> a.leftOuterJoin(b).collect() [('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))]

rightOuterJoin

1 2 3 >>> a.rightOuterJoin(b).collect() [('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))]

fullOuterJoin

1 2 3 4 >>> a.fullOuterJoin(b).collect() [('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] >>>

randomSplit運算

randomSplit 運算將整個集合以隨機數的方式按照比例分為多個RDD,比如按照0.4和0.6的比例將intRDD分為兩個RDD,並輸出:

1 2 3 4 5 6 7 intRDD = sc.parallelize([3,1,2,5,5]) stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple']) sRDD = intRDD.randomSplit([0.4,0.6]) print (len(sRDD)) print (sRDD[0].collect()) print (sRDD[1].collect())

輸出為:

1 2 3 4 2 [3, 1] [2, 5, 5]

多個RDD轉換運算

RDD也支援執行多個RDD的運算,這裡,我們定義三個RDD:

1 2 3 4 intRDD1 = sc.parallelize([3,1,2,5,5]) intRDD2 = sc.parallelize([5,6]) intRDD3 = sc.parallelize([2,7])

並集運算

可以使用union函式進行並集運算:

1 2 print (intRDD1.union(intRDD2).union(intRDD3).collect())

輸出為:

1 2 [3, 1, 2, 5, 5, 5, 6, 2, 7]

交集運算

可以使用intersection進行交集運算:

1 2 print(intRDD1.intersection(intRDD2).collect())

兩個集合中只有一個相同元素5,所以輸出為:

1 2 [5]

差集運算

subtract(減去 去除)
可以使用subtract函式進行差集運算:

1 2 print (intRDD1.subtract(intRDD2).collect())

由於兩個RDD的重複部分為5,所以輸出為[1,2,3]:

1 2 [2, 1, 3]

笛卡爾積運算

笛卡爾乘積是指在數學中,兩個集合X和Y的笛卡尓積(Cartesian product),又稱直積,表示為X × Y,第一個物件是X的成員而第二個物件是Y的所有可能有序對的其中一個成員

笛卡爾積又叫笛卡爾乘積,是一個叫笛卡爾的人提出來的。 簡單的說就是兩個集合相乘的結果。
假設集合A={a, b},集合B={0, 1, 2},則兩個集合的笛卡爾積為{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。

可以使用cartesian函式進行笛卡爾乘積運算:

1 2 print (intRDD1.cartesian(intRDD2).collect())

由於兩個RDD分別有5個元素和2個元素,所以返回結果有10各元素:

1 2 [(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)]

五、RDD常用的執行動作 Action

下面的表格列出了Spark支援的部分常用啟動操作。欲知細節,請查閱RDD API文件(Scala,Java,Python)和鍵值對RDD函式文件(Scala,Java)。
啟動操作 | 作用
reduce(func) | 使用func進行聚集計算,func的引數是兩個,返回值一個,兩次func執行應當是完全解耦的,這樣才能正確地並行運算
collect() | 向驅動程式返回資料集的元素組成的陣列
count() | 返回資料集元素的數量
first() | 返回資料集的第一個元素
take(n) | 返回前n個元素組成的陣列
takeSample(withReplacement, num, [seed]) | 返回一個由原資料集中任意num個元素的suzuki,並且替換之
takeOrder(n, [ordering]) | 返回排序後的前n個元素
saveAsTextFile(path) | 將資料集的元素寫成文字檔案
saveAsSequenceFile(path) | 將資料集的元素寫成序列檔案,這個API只能用於Java和Scala程式
saveAsObjectFile(path) | 將資料集的元素使用Java的序列化特性寫到檔案中,這個API只能用於Java和Scala程式
countByCount() | 只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數
foreach(func) | 對資料集的每個元素執行func, 通常用於完成一些帶有副作用的函式,比如更新累加器(見下文)或與外部儲存互動等

Action(執行):觸發Spark作業的執行,真正觸發轉換運算元的計算

Pyspark rdd 常用的轉換 Transformation Pyspark(二)

https://www.168seo.cn/pyspark/24806.html

1 2 3 intRDD = sc.parallelize([3,1,2,5,5]) stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple'])

基本“動作”運算

讀取元素

可以使用下列命令讀取RDD內的元素,這是Actions運算,所以會馬上執行:

1 2 3 4 5 6 7 8 9 #取第一條資料 print (intRDD.first()) #取前兩條資料 print (intRDD.take(2)) #升序排列,並取前3條資料 print (intRDD.takeOrdered(3)) #降序排列,並取前3條資料 print (intRDD.takeOrdered(3,lambda x:-x))

輸出為:

1 2 3 4 5 3 [3, 1] [1, 2, 3] [5, 5, 3]

統計功能

可以將RDD內的元素進行統計運算:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #統計 print (intRDD.stats()) #最小值 print (intRDD.min()) #最大值 print (intRDD.max()) #標準差 print (intRDD.stdev()) #計數 print (intRDD.count()) #求和 print (intRDD.sum()) #平均 print (intRDD.mean())

輸出為:

RDD Key-Value基本“轉換”運算

Spark RDD支援鍵值對運算,Key-Value運算時mapreduce運算的基礎,本節介紹RDD鍵值的基本“轉換”運算。

初始化

我們用元素型別為tuple元組的陣列初始化我們的RDD,這裡,每個tuple的第一個值將作為鍵,而第二個元素將作為值。
作為值

1 2 kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])

得到key和value值
可以使用keys和values函式分別得到RDD的鍵陣列和值陣列:

1 2 3 print (kvRDD1.keys().collect()) print (kvRDD1.values().collect())

輸出為:

篩選元素

可以按照鍵進行元素篩選,也可以通過值進行元素篩選,和之前的一樣,使用filter函式,這裡要注意的是,雖然RDD中是以鍵值對形式存在,但是本質上還是一個二元組,二元組的第一個值代表鍵,第二個值代表值,所以按照如下的程式碼既可以按照鍵進行篩選,我們篩選鍵值小於5的資料:

1 2 print (kvRDD1.filter(lambda x:x[0] < 5).collect())

輸出為:

1 2 [(3, 4), (3, 6), (1, 2)]

同樣,將x[0]替換為x[1]就是按照值進行篩選,我們篩選值小於5的資料:

1 2 print (kvRDD1.filter(lambda x:x[1] < 5).collect())

輸出為:

1 2 [(3, 4), (1, 2)]

值運算

我們可以使用mapValues方法處理value值,下面的程式碼將value值進行了平方處理:

1 2 print (kvRDD1.mapValues(lambda x:x**2).collect())

輸出為:

1 2 [(3, 16), (3, 36), (5, 36), (1, 4)]

按照key排序

可以使用sortByKey按照key進行排序,傳入引數的預設值為true,是按照從小到大排序,也可以傳入引數false,表示從大到小排序:

1 2 3 4 print (kvRDD1.sortByKey().collect()) print (kvRDD1.sortByKey(True).collect()) print (kvRDD1.sortByKey(False).collect())

輸出為:

1 2 3 4 [(1, 2), (3, 4), (3, 6), (5, 6)] [(1, 2), (3, 4), (3, 6), (5, 6)] [(5, 6), (3, 4), (3, 6), (1, 2)]

合併相同key值的資料

使用reduceByKey函式可以對具有相同key值的資料進行合併。比如下面的程式碼,由於RDD中存在(3,4)和(3,6)兩條key值均為3的資料,他們將被合為一條資料:

1 2 print (kvRDD1.reduceByKey(lambda x,y:x+y).collect())

輸出為

1 2 [(1, 2), (3, 10), (5, 6)]

多個RDD Key-Value“轉換”運算

初始化
首先我們初始化兩個k-v的RDD:

1 2 3 kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) kvRDD2 = sc.parallelize([(3,8)])

內連線運算

join運算可以實現類似資料庫的內連線,將兩個RDD按照相同的key值join起來,kvRDD1與kvRDD2的key值唯一相同的是3,kvRDD1中有兩條key值為3的資料(3,4)和(3,6),而kvRDD2中只有一條key值為3的資料(3,8),所以join的結果是(3,(4,8)) 和(3,(6,8)):

1 2 print (kvRDD1.join(kvRDD2).collect())

輸出為:

1 2 [(3, (4, 8)), (3, (6, 8))]

左外連線

使用leftOuterJoin可以實現類似資料庫的左外連線,如果kvRDD1的key值對應不到kvRDD2,就會顯示None

1 2 print (kvRDD1.leftOuterJoin(kvRDD2).collect())

輸出為:

1 2 [(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]

右外連線
使用rightOuterJoin可以實現類似資料庫的右外連線,如果kvRDD2的key值對應不到kvRDD1,就會顯示None

1 2 print (kvRDD1.rightOuterJoin(kvRDD2).collect())

輸出為:

1 2 [(3, (4, 8)), (3, (6, 8))]

刪除相同key值資料

使用subtractByKey運算會刪除相同key值得資料:

1 2 print (kvRDD1.subtractByKey(kvRDD2).collect())

結果為:

1 2 [(1, 2), (5, 6)]

Key-Value“動作”運算

讀取資料
可以使用下面的幾種方式讀取RDD的資料:

1 2 3 4 5 6 7 8 9 #讀取第一條資料 print (kvRDD1.first()) #讀取前兩條資料 print (kvRDD1.take(2)) #讀取第一條資料的key值 print (kvRDD1.first()[0]) #讀取第一條資料的value值 print (kvRDD1.first()[1])

輸出為:

1 2 3 4 5 (3, 4) [(3, 4), (3, 6)] 3 4

按key值統計:

使用countByKey函式可以統計各個key值對應的資料的條數:

1 2 print (kvRDD1.countByKey().collect())

輸出為:

1 2 defaultdict(<type 'int'>, {1: 1, 3: 2, 5: 1})

lookup查詢運算

使用lookup函式可以根據輸入的key值來查詢對應的Value值:

1 2 print (kvRDD1.lookup(3))

輸出為:

1 2 [4, 6]

持久化操作

spark RDD的持久化機制,可以將需要重複運算的RDD儲存在記憶體中,以便大幅提升運算效率,有兩個主要的函式:

持久化

使用persist函式對RDD進行持久化:

1 2 kvRDD1.persist()

在持久化的同時我們可以指定持久化儲存等級:

首先我們匯入相關函式:

1 2 from pyspark.storagelevel import StorageLevel

在scala中可以直接使用上述的持久化等級關鍵詞,但是在pyspark中封裝為了一個類,
StorageLevel類,並在初始化時指定一些引數,通過不同的引數組合,可以實現上面的不同儲存等級。StorageLevel類的初始化函式如下:

1 2 3 4 5 6 7 def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1): self.useDisk = useDisk self.useMemory = useMemory self.useOffHeap = useOffHeap self.deserialized = deserialized self.replication = replication

那麼不同的儲存等級對應的引數為:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1) """ .. note:: The following four storage level constants are deprecated in 2.0, since the records \ will always be serialized in Python. """ StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead.""" StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2 """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead.""" StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` instead.""" StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2 """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` instead."""

取消持久化

使用unpersist函式對RDD進行持久化:

1 2 kvRDD1.unpersist()

整理回顧
哇,有關pyspark的RDD的基本操作就是上面這些啦,想要了解更多的盆友們可以參照官網給出的官方文件:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

今天主要介紹了兩種RDD,基本的RDD和Key-Value形式的RDD,介紹了他們的幾種“轉換”運算和“動作”運算,整理如下:

refer:

https://www.168seo.cn/pyspark/24806.html

https://www.168seo.cn/pyspark/24809.html

https://www.csdn.net/article/2015-04-24/2824552