Spark系列:Python版Spark程式設計指南
目錄
一、介紹
二、連線Spark
三、建立RDD
四、RDD常用的轉換 Transformation
五、RDD常用的執行動作 Action
二、連線Spark
Spark1.3.0只支援Python2.6或更高的版本(但不支援Python3)。它使用了標準的CPython直譯器,所以諸如NumPy一類的C庫也是可以使用的。
通過Spark目錄下的bin/spark-submit指令碼你可以在Python中執行Spark應用。這個指令碼會載入Spark的Java/Scala庫然後讓你將應用提交到叢集中。你可以執行bin/pyspark來開啟Python的互動命令列。
如果你希望訪問HDFS上的資料,你需要為你使用的HDFS版本建立一個PySpark連線。常見的HDFS版本標籤都已經列在了這個第三方發行版頁面。
最後,你需要將一些Spark的類import到你的程式中。加入如下這行:
from pyspark import SparkContext, SparkConf
在一個Spark程式中要做的第一件事就是建立一個SparkContext物件來告訴Spark如何連線一個叢集。為了建立SparkContext,你首先需要建立一個SparkConf物件,這個物件會包含你的應用的一些相關資訊。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
appName引數是在叢集UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN叢集的URL,如果你在本地執行那麼這個引數應該是特殊的”local”字串。在實際使用中,當你在叢集中執行你的程式,你一般不會把master引數寫死在程式碼中,而是通過用spark-submit執行程式來獲得這個引數。但是,在本地測試以及單元測試時,你仍需要自行傳入”local”來執行Spark程式。
三、建立RDD
Spark是以RDD概念為中心執行的。RDD是一個容錯的、可以被並行操作的元素集合。建立一個RDD有兩個方法:在你的驅動程式中並行化一個已經存在的集合;從外部儲存系統中引用一個數據集,這個儲存系統可以是一個共享檔案系統,比如HDFS、HBase或任意提供了Hadoop輸入格式的資料來源。
並行化集合
並行化集合是通過在驅動程式中一個現有的迭代器或集合上呼叫SparkContext的parallelize方法建立的。為了建立一個能夠並行操作的分佈資料集,集合中的元素都會被拷貝。比如,以下語句建立了一個包含1到5的並行化集合:
data = [1, 2, 3, 4, 5] distData= sc.parallelize(data)
分佈資料集(distData)被建立起來之後,就可以進行並行操作了。比如,我們可以呼叫disData.reduce(lambda a, b: a+b)
來對元素進行疊加。在後文中我們會描述分佈資料集上支援的操作。
並行集合的一個重要引數是將資料集劃分成分片的數量。對每一個分片,Spark會在叢集中執行一個對應的任務。 典型情況下,叢集中的每一個CPU將對應執行2-4個分片。一般情況下,Spark會根據當前叢集的情況自行設定分片數量。但是,你也可以通過將第二個參 數傳遞給parallelize方法(比如sc.parallelize(data, 10))來手動確定分片數量。注意:有些程式碼中會使用切片(slice,分片的同義詞)這個術語來保持向下相容性。
一個簡單的示例:
import findspark findspark.init() from pyspark import SparkContext from pyspark import SparkConf conf = SparkConf().setAppName("miniProject").setMaster("local[*]") sc=SparkContext.getOrCreate(conf) rdd=sc.parallelize([1,2,3,4,5]) rdd1=rdd.map(lambda r:r+10) #map對每一個元素操作 print(rdd1.collect())
外部資料集
PySpark可以通過Hadoop支援的外部資料來源(包括本地檔案系統、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分佈資料集。Spark支援文字檔案、序列檔案以及其他任何Hadoop輸入格式檔案。
通過文字檔案建立RDD要使用SparkContext的textFile方法。這個方法會使用一個檔案的URI(或本地檔案路徑,hdfs://、s3n://這樣的URI等等)然後讀入這個檔案建立一個文字行的集合。以下是一個例子:
>>> distFile = sc.textFile("data.txt")
建立完成後distFile上就可以呼叫資料集操作了。比如,我們可以呼叫map和reduce操作來疊加所有文字行的長度,程式碼如下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
在Spark中讀入檔案時有幾點要注意:
- 如果使用了本地檔案路徑時,要保證在worker節點上這個檔案也能夠通過這個路徑訪問。這點可以通過將這個檔案拷貝到所有worker上或者使用網路掛載的共享檔案系統來解決。
- 包括textFile在內的所有基於檔案的Spark讀入方法,都支援將資料夾、壓縮檔案、包含萬用字元的路徑作為引數。比如,以下程式碼都是合法的:
textFile("/my/directory") textFile("/my/directory/*.txt") textFile("/my/directory/*.gz")
- textFile方法也可以傳入第二個可選引數來控制檔案的分片數量。預設情況下,Spark會為檔案的每一個塊(在HDFS中塊的大小預設是64MB) 建立一個分片。但是你也可以通過傳入一個更大的值來要求Spark建立更多的分片。注意,分片的數量絕不能小於檔案塊的數量。
除了文字檔案之外,Spark的Python API還支援多種其他資料格式:
- SparkContext.wholeTextFiles能夠讀入包含多個小文字檔案的目錄,然後為每一個檔案返回一個(檔名,內容)對。這是與textFile方法為每一個文字行返回一條記錄相對應的。
- RDD.saveAsPickleFile和SparkContext.pickleFile支援將RDD以序列化的Python物件格式儲存起來。序列化的過程中會以預設10個一批的數量批量處理。
- 序列檔案和其他Hadoop輸入輸出格式。
注意
這個特性目前仍處於試驗階段,被標記為Experimental,目前只適用於高階使用者。這個特性在未來可能會被基於Spark SQL的讀寫支援所取代,因為Spark SQL是更好的方式。
可寫型別支援
PySpark序列檔案支援利用Java作為中介載入一個鍵值對RDD,將可寫型別轉化成Java的基本型別,然後使用Pyrolite將java結果物件序列化。當將一個鍵值對RDD儲存到一個序列檔案中時PySpark將會執行上述過程的相反過程。首先將Python物件反序列化成Java物件,然後轉化成可寫型別。以下可寫型別會自動轉換:
| 可寫型別 | Python型別 |
- | Text | unicode str|
- | IntWritable | int |
- | FloatWritable | float |
- | DoubleWritable | float |
- | BooleanWritable | bool |
- | BytesWritable | bytearray |
- | NullWritable | None |
- | MapWritable | dict |
陣列是不能自動轉換的。使用者需要在讀寫時指定ArrayWritable的子型別.在讀入的時候,預設的轉換器會把自定義的ArrayWritable子 型別轉化成Java的Object[],之後序列化成Python的元組。為了獲得Python的array.array型別來使用主要型別的陣列,使用者 需要自行指定轉換器。
儲存和讀取序列檔案
和文字檔案類似,序列檔案可以通過指定路徑來儲存與讀取。鍵值型別都可以自行指定,但是對於標準可寫型別可以不指定。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
儲存和讀取其他Hadoop輸入輸出格式
PySpark同樣支援寫入和讀出其他Hadoop輸入輸出格式,包括’新’和’舊’兩種Hadoop MapReduce API。如果有必要,一個Hadoop配置可以以Python字典的形式傳入。以下是一個例子,使用了Elasticsearch ESInputFormat:
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
四、RDD常用的轉換 Transformation
RDD支援兩類操作:轉化操作,用於從已有的資料集轉化產生新的資料集;啟動操作,用於在計算結束後向驅動程式返回結果。舉個例子,map
是一個轉化操作,可以將資料集中每一個元素傳給一個函式,同時將計算結果作為一個新的RDD返回。另一方面,reduce
操作是一個啟動操作,能夠使用某些函式來聚集計算RDD中所有的元素,並且向驅動程式返回最終結果(同時還有一個並行的reduceByKey
操作可以返回一個分佈資料集)。
在Spark所有的轉化操作都是惰性求值的,就是說它們並不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作物件(比如:一個檔案)。只有當一個啟動操作被執行,要向驅動程式返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark執行更加高效——比如,我們會發覺由map
操作產生的資料集將會在reduce
操作中用到,之後僅僅是返回了reduce
的最終的結果而不是map
產生的龐大資料集。
在預設情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過呼叫persist
(或cache
)方法來將RDD持久化到記憶體中,這樣Spark就可以在下次使用這個資料集時快速獲得。Spark同樣提供了對將RDD持久化到硬碟上或在多個節點間複製的支援。
下面的表格列出了Spark支援的常用轉化操作。欲知細節,請查閱RDD API文件(Scala,Java,Python)和鍵值對RDD函式文件(Scala,Java)。
轉化操作 | 作用
————| ——
map(func) | 返回一個新的分佈資料集,由原資料集元素經func處理後的結果組成
filter(func) | 返回一個新的資料集,由傳給func返回True的原資料集元素組成
flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值
mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立執行,所以func的引數和返回值必須都是迭代器
mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個引數,第一個是分片的序號,第二個是迭代器。返回值還是迭代器
sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,然後替換或不替換
union(otherDataset) | 返回新的資料集,包括原資料集和引數資料集的所有元素
intersection(otherDataset) | 返回新資料集,是兩個集的交集
distinct([numTasks]) | 返回新的集,包括原集中的不重複元素
groupByKey([numTasks]) | 當用於鍵值對RDD時返回(鍵,值迭代器)對的資料集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算
sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個引數決定
join(otherDataset, [numTasks]) | 用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD
cogroup(otherDataset, [numTasks]) | 用於兩個鍵值對RDD時返回(K, (V迭代器, W迭代器))RDD
cartesian(otherDataset) | 用於T和U型別RDD時返回(T, U)對型別鍵值對RDD
pipe(command, [envVars]) | 通過shell命令管道處理每個RDD分片
coalesce(numPartitions) | 把RDD的分片數量降低到引數大小
repartition(numPartitions) | 重新打亂RDD中元素順序並重新分片,數量由引數決定
repartitionAndSortWithinPartitions(partitioner) | 按照引數給定的分片器重新分片,同時每個分片內部按照鍵排序
具體示例:
map
將函式作用於資料集的每一個元素上,生成一個分散式的資料集返回
Return a new RDD by applying a function to each element of this RDD. >>> rdd = sc.parallelize(["b", "a", "c"]) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)]
一個完整的例子:
from <span class='wp_keywordlink_affiliate'><a href="https://www.168seo.cn/tag/pyspark" title="View all posts in pyspark" target="_blank">pyspark</a></span> import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = range(10) print(list(data)) r1 = sc.parallelize(data) r2 = r1.map(lambda x:x+1) print(r2.collect()) sc.stop()
結果是:
filter
返回所有 funtion 返回值為True的函式,生成一個分散式的資料集返回
Return a new RDD containing only the elements that satisfy a predicate. >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4]
一個完整的例子:
from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = range(10) print(list(data)) r1 = sc.parallelize(data) r2 = r1.filter(lambda x:x>5) print(r2.collect()) sc.stop()
結果是:
flatMap
Return a new RDD by first applying a function to all elements of thisRDD, and then flattening the results.
一個完整的例子:
from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")) r3 = r1.map(lambda x:x.split(" ")) print(r2.collect()) print(r3.collect()) sc.stop() RDD, and then flattening the results.
結果是:
groupBykey
按照相同key的資料分成一組
from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") """ Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. """ data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda y:(y,1)) print("r2",r2.collect()) r3 = r2.groupByKey() print("r3",r3.collect()) r4 = r3.map(lambda x:{x[0]:list(x[1])}) print("r4",r4.collect()) print(r2.reduceByKey(add).collect()) sc.stop()
結果是:
groupBy運算
groupBy運算可以按照傳入匿名函式的規則,將資料分為多個Array。比如下面的程式碼將intRDD分為偶數和奇數:
result = intRDD.groupBy(lambda x : x % 2).collect() print (sorted([(x, sorted(y)) for (x, y) in result]))
輸出為:
[(0, [2]), (1, [1, 3, 5, 5])]
reduceBykey
把相同的key 的資料分發到一起 並進行運算
from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)) print("r2",r2.collect()) r3 = r2.reduceByKey(lambda x,y:x+y) print("r3",r3.collect()) sc.stop()
結果是:
sortbykey
Sorts this RDD, which is assumed to consist of (key, value) pairs.
from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) # sc.setLogLevel("FATAL") # sc.setLogLevel("ERROR") sc.setLogLevel("ERROR") data = ["hello zeropython","hwlldsf world","168seo.cn","168seo.cn","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" "))\ .map(lambda y:(y,1))\ .reduceByKey(lambda x,y:x+y)\ .sortByKey(lambda x:x[1]) # sortByKey排序根據關鍵詞的值進行排序 # reduceByKey 讓[("a",[1,1,1,1])] 轉換成 [("a",3)] print(r2.collect()) sc.stop()
結果是:
union
1 2 3 4 5 6 7 8 | """ Return the union of this RDD and another one. >>> rdd = sc.parallelize([1, 1, 2, 3]) >>> rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] """ |
distinct
1 2 3 4 5 6 7 | """ Return a new RDD containing the distinct elements in this RDD. >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) [1, 2, 3] """ |
join
1 2 3 4 5 | >>> a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")]) >>> b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")]) >>> a.join(b).collect() [('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] |
leftOuterJoin
1 2 3 | >>> a.leftOuterJoin(b).collect() [('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] |
rightOuterJoin
1 2 3 | >>> a.rightOuterJoin(b).collect() [('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] |
fullOuterJoin
1 2 3 4 | >>> a.fullOuterJoin(b).collect() [('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] >>> |
randomSplit運算
randomSplit 運算將整個集合以隨機數的方式按照比例分為多個RDD,比如按照0.4和0.6的比例將intRDD分為兩個RDD,並輸出:
1 2 3 4 5 6 7 | intRDD = sc.parallelize([3,1,2,5,5]) stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple']) sRDD = intRDD.randomSplit([0.4,0.6]) print (len(sRDD)) print (sRDD[0].collect()) print (sRDD[1].collect()) |
輸出為:
1 2 3 4 | 2 [3, 1] [2, 5, 5] |
多個RDD轉換運算
RDD也支援執行多個RDD的運算,這裡,我們定義三個RDD:
1 2 3 4 | intRDD1 = sc.parallelize([3,1,2,5,5]) intRDD2 = sc.parallelize([5,6]) intRDD3 = sc.parallelize([2,7]) |
並集運算
可以使用union函式進行並集運算:
1 2 | print (intRDD1.union(intRDD2).union(intRDD3).collect()) |
輸出為:
1 2 | [3, 1, 2, 5, 5, 5, 6, 2, 7] |
交集運算
可以使用intersection進行交集運算:
1 2 | print(intRDD1.intersection(intRDD2).collect()) |
兩個集合中只有一個相同元素5,所以輸出為:
1 2 | [5] |
差集運算
subtract(減去 去除)
可以使用subtract函式進行差集運算:
1 2 | print (intRDD1.subtract(intRDD2).collect()) |
由於兩個RDD的重複部分為5,所以輸出為[1,2,3]:
1 2 | [2, 1, 3] |
笛卡爾積運算
笛卡爾乘積是指在數學中,兩個集合X和Y的笛卡尓積(Cartesian product),又稱直積,表示為X × Y,第一個物件是X的成員而第二個物件是Y的所有可能有序對的其中一個成員
笛卡爾積又叫笛卡爾乘積,是一個叫笛卡爾的人提出來的。 簡單的說就是兩個集合相乘的結果。
假設集合A={a, b},集合B={0, 1, 2},則兩個集合的笛卡爾積為{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。
可以使用cartesian函式進行笛卡爾乘積運算:
1 2 | print (intRDD1.cartesian(intRDD2).collect()) |
由於兩個RDD分別有5個元素和2個元素,所以返回結果有10各元素:
1 2 | [(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)] |
五、RDD常用的執行動作 Action
下面的表格列出了Spark支援的部分常用啟動操作。欲知細節,請查閱RDD API文件(Scala,Java,Python)和鍵值對RDD函式文件(Scala,Java)。
啟動操作 | 作用
reduce(func) | 使用func進行聚集計算,func的引數是兩個,返回值一個,兩次func執行應當是完全解耦的,這樣才能正確地並行運算
collect() | 向驅動程式返回資料集的元素組成的陣列
count() | 返回資料集元素的數量
first() | 返回資料集的第一個元素
take(n) | 返回前n個元素組成的陣列
takeSample(withReplacement, num, [seed]) | 返回一個由原資料集中任意num個元素的suzuki,並且替換之
takeOrder(n, [ordering]) | 返回排序後的前n個元素
saveAsTextFile(path) | 將資料集的元素寫成文字檔案
saveAsSequenceFile(path) | 將資料集的元素寫成序列檔案,這個API只能用於Java和Scala程式
saveAsObjectFile(path) | 將資料集的元素使用Java的序列化特性寫到檔案中,這個API只能用於Java和Scala程式
countByCount() | 只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數
foreach(func) | 對資料集的每個元素執行func, 通常用於完成一些帶有副作用的函式,比如更新累加器(見下文)或與外部儲存互動等
Action(執行):觸發Spark作業的執行,真正觸發轉換運算元的計算
Pyspark rdd 常用的轉換 Transformation Pyspark(二)
https://www.168seo.cn/pyspark/24806.html
1 2 3 | intRDD = sc.parallelize([3,1,2,5,5]) stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple']) |
基本“動作”運算
讀取元素
可以使用下列命令讀取RDD內的元素,這是Actions運算,所以會馬上執行:
1 2 3 4 5 6 7 8 9 | #取第一條資料 print (intRDD.first()) #取前兩條資料 print (intRDD.take(2)) #升序排列,並取前3條資料 print (intRDD.takeOrdered(3)) #降序排列,並取前3條資料 print (intRDD.takeOrdered(3,lambda x:-x)) |
輸出為:
1 2 3 4 5 | 3 [3, 1] [1, 2, 3] [5, 5, 3] |
統計功能
可以將RDD內的元素進行統計運算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #統計 print (intRDD.stats()) #最小值 print (intRDD.min()) #最大值 print (intRDD.max()) #標準差 print (intRDD.stdev()) #計數 print (intRDD.count()) #求和 print (intRDD.sum()) #平均 print (intRDD.mean()) |
輸出為:
RDD Key-Value基本“轉換”運算
Spark RDD支援鍵值對運算,Key-Value運算時mapreduce運算的基礎,本節介紹RDD鍵值的基本“轉換”運算。
初始化
我們用元素型別為tuple元組的陣列初始化我們的RDD,這裡,每個tuple的第一個值將作為鍵,而第二個元素將作為值。
作為值
1 2 | kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) |
得到key和value值
可以使用keys和values函式分別得到RDD的鍵陣列和值陣列:
1 2 3 | print (kvRDD1.keys().collect()) print (kvRDD1.values().collect()) |
輸出為:
篩選元素
可以按照鍵進行元素篩選,也可以通過值進行元素篩選,和之前的一樣,使用filter函式,這裡要注意的是,雖然RDD中是以鍵值對形式存在,但是本質上還是一個二元組,二元組的第一個值代表鍵,第二個值代表值,所以按照如下的程式碼既可以按照鍵進行篩選,我們篩選鍵值小於5的資料:
1 2 | print (kvRDD1.filter(lambda x:x[0] < 5).collect()) |
輸出為:
1 2 | [(3, 4), (3, 6), (1, 2)] |
同樣,將x[0]替換為x[1]就是按照值進行篩選,我們篩選值小於5的資料:
1 2 | print (kvRDD1.filter(lambda x:x[1] < 5).collect()) |
輸出為:
1 2 | [(3, 4), (1, 2)] |
值運算
我們可以使用mapValues方法處理value值,下面的程式碼將value值進行了平方處理:
1 2 | print (kvRDD1.mapValues(lambda x:x**2).collect()) |
輸出為:
1 2 | [(3, 16), (3, 36), (5, 36), (1, 4)] |
按照key排序
可以使用sortByKey按照key進行排序,傳入引數的預設值為true,是按照從小到大排序,也可以傳入引數false,表示從大到小排序:
1 2 3 4 | print (kvRDD1.sortByKey().collect()) print (kvRDD1.sortByKey(True).collect()) print (kvRDD1.sortByKey(False).collect()) |
輸出為:
1 2 3 4 | [(1, 2), (3, 4), (3, 6), (5, 6)] [(1, 2), (3, 4), (3, 6), (5, 6)] [(5, 6), (3, 4), (3, 6), (1, 2)] |
合併相同key值的資料
使用reduceByKey函式可以對具有相同key值的資料進行合併。比如下面的程式碼,由於RDD中存在(3,4)和(3,6)兩條key值均為3的資料,他們將被合為一條資料:
1 2 | print (kvRDD1.reduceByKey(lambda x,y:x+y).collect()) |
輸出為
1 2 | [(1, 2), (3, 10), (5, 6)] |
多個RDD Key-Value“轉換”運算
初始化
首先我們初始化兩個k-v的RDD:
1 2 3 | kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) kvRDD2 = sc.parallelize([(3,8)]) |
內連線運算
join運算可以實現類似資料庫的內連線,將兩個RDD按照相同的key值join起來,kvRDD1與kvRDD2的key值唯一相同的是3,kvRDD1中有兩條key值為3的資料(3,4)和(3,6),而kvRDD2中只有一條key值為3的資料(3,8),所以join的結果是(3,(4,8)) 和(3,(6,8)):
1 2 | print (kvRDD1.join(kvRDD2).collect()) |
輸出為:
1 2 | [(3, (4, 8)), (3, (6, 8))] |
左外連線
使用leftOuterJoin可以實現類似資料庫的左外連線,如果kvRDD1的key值對應不到kvRDD2,就會顯示None
1 2 | print (kvRDD1.leftOuterJoin(kvRDD2).collect()) |
輸出為:
1 2 | [(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))] |
右外連線
使用rightOuterJoin可以實現類似資料庫的右外連線,如果kvRDD2的key值對應不到kvRDD1,就會顯示None
1 2 | print (kvRDD1.rightOuterJoin(kvRDD2).collect()) |
輸出為:
1 2 | [(3, (4, 8)), (3, (6, 8))] |
刪除相同key值資料
使用subtractByKey運算會刪除相同key值得資料:
1 2 | print (kvRDD1.subtractByKey(kvRDD2).collect()) |
結果為:
1 2 | [(1, 2), (5, 6)] |
Key-Value“動作”運算
讀取資料
可以使用下面的幾種方式讀取RDD的資料:
1 2 3 4 5 6 7 8 9 | #讀取第一條資料 print (kvRDD1.first()) #讀取前兩條資料 print (kvRDD1.take(2)) #讀取第一條資料的key值 print (kvRDD1.first()[0]) #讀取第一條資料的value值 print (kvRDD1.first()[1]) |
輸出為:
1 2 3 4 5 | (3, 4) [(3, 4), (3, 6)] 3 4 |
按key值統計:
使用countByKey函式可以統計各個key值對應的資料的條數:
1 2 | print (kvRDD1.countByKey().collect()) |
輸出為:
1 2 | defaultdict(<type 'int'>, {1: 1, 3: 2, 5: 1}) |
lookup查詢運算
使用lookup函式可以根據輸入的key值來查詢對應的Value值:
1 2 | print (kvRDD1.lookup(3)) |
輸出為:
1 2 | [4, 6] |
持久化操作
spark RDD的持久化機制,可以將需要重複運算的RDD儲存在記憶體中,以便大幅提升運算效率,有兩個主要的函式:
持久化
使用persist函式對RDD進行持久化:
1 2 | kvRDD1.persist() |
在持久化的同時我們可以指定持久化儲存等級:
首先我們匯入相關函式:
1 2 | from pyspark.storagelevel import StorageLevel |
在scala中可以直接使用上述的持久化等級關鍵詞,但是在pyspark中封裝為了一個類,
StorageLevel類,並在初始化時指定一些引數,通過不同的引數組合,可以實現上面的不同儲存等級。StorageLevel類的初始化函式如下:
1 2 3 4 5 6 7 | def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1): self.useDisk = useDisk self.useMemory = useMemory self.useOffHeap = useOffHeap self.deserialized = deserialized self.replication = replication |
那麼不同的儲存等級對應的引數為:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1) """ .. note:: The following four storage level constants are deprecated in 2.0, since the records \ will always be serialized in Python. """ StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead.""" StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2 """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead.""" StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` instead.""" StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2 """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` instead.""" |
取消持久化
使用unpersist函式對RDD進行持久化:
1 2 | kvRDD1.unpersist() |
整理回顧
哇,有關pyspark的RDD的基本操作就是上面這些啦,想要了解更多的盆友們可以參照官網給出的官方文件:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
今天主要介紹了兩種RDD,基本的RDD和Key-Value形式的RDD,介紹了他們的幾種“轉換”運算和“動作”運算,整理如下:
refer:
https://www.168seo.cn/pyspark/24806.html