【機器學習】pyspark中RDD的若干操作
1,讀取檔案
from pyspark import SparkContext
sc = SparkContext('local', 'pyspark')
a,text = sc.textFile(“file:///d:/test.txt”)
b,rdd = sc.parallelize([1,2,3,4,5])
2,RDD的操作
大家還對python的list comprehension有印象嗎,RDDs可以進行一系列的變換得到新的RDD,有點類似那個過程,我們先給大家提一下RDD上最最常用到的transformation:
- map() 對RDD的每一個item都執行同一個操作
- flatMap() 對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list
- filter() 篩選出來滿足條件的item distinct() 對RDD中的item去重
- sample() 從RDD中的item中取樣一部分出來,有放回或者無放回
- sortBy() 對RDD中的item進行排序
如果你想看操作後的結果,可以用一個叫做collect()的action把所有的item轉成一個Python list。
一個簡單的例子
numbersRDD = sc.parallelize(range(1,10+1))
print(numbersRDD.collect())
squaresRDD = numbersRDD.map(lambda x: x **2) # Square every number
print(squaresRDD.collect())
filteredRDD = numbersRDD.filter(lambda x: x % 2 == 0) # Only the evens
結果
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
[2, 4, 6, 8, 10]
flatMap()與Map()的關係相當於,Python中list的extend和append的關係。
前面提到的Transformation,可以一個接一個地串聯,比如:
def doubleIfOdd(x):
if x % 2 == 1:
return 2 * x
else:
return x
resultRDD = (numbersRDD # In parentheses so we can write each
.map(doubleIfOdd) # transformation in one line
.filter(lambda x: x > 6)
.distinct())
resultRDD.collect()
結果:
[8, 10, 18, 14]
3,RDD間的操作
如果你手頭上有2個RDD了,下面的這些操作能夠幫你對他們以個種方式組合得到1個RDD:
- rdd1.union(rdd2): 所有rdd1和rdd2中的item組合
- rdd1.intersection(rdd2): rdd1 和 rdd2的交集
- rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item(差集)
- rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡爾乘積
如下:
numbersRDD = sc.parallelize([1,2,3])
moreNumbersRDD = sc.parallelize([2,3,4])
numbersRDD.union(moreNumbersRDD).collect()
結果:
[1, 2, 3, 2, 3, 4]
特別注意:Spark的一個核心概念是惰性計算。當你把一個RDD轉換成另一個的時候,這個轉換不會立即生效執行!!!
Spark會把它先記在心裡,等到真的需要拿到轉換結果的時候,才會重新組織你的transformations(因為可能有一連串的變換)
這樣可以避免不必要的中間結果儲存和通訊。
剛才提到了惰性計算,那麼什麼東西能讓它真的執行轉換與運算呢? 是的,就是我們馬上提到的Actions,下面是常見的action,當他們出現的時候,表明我們需要執行剛才定義的transform了:
- collect(): 計算所有的items並返回所有的結果到driver端,接著 collect()會以Python list的形式返回結果
- first(): 和上面是類似的,不過只返回第1個item
- take(n): 類似,但是返回n個item
- count(): 計算RDD中item的個數
- top(n): 返回頭n個items,按照自然結果排序
- reduce(): 對RDD中的items做聚合
reduce的例子:
rdd = sc.parallelize(range(1,10+1))
rdd.reduce(lambda x, y: x + y)
結果:
55
5,針對更復雜結構的transform和action
有時候我們會遇到更復雜的結構,比如非常非常經典的是以元組形式組織的k-v對(key, value)我們把它叫做pair RDDs,而Sark中針對這種item結構的資料,定義了一些transform和action:
- reduceByKey(): 對所有有著相同key的items執行reduce操作
- groupByKey(): 返回類似(key, listOfValues)元組的RDD,後面的value List 是同一個key下面的
- sortByKey(): 按照key排序
- countByKey(): 按照key去對item個數進行統計
- collectAsMap(): 和collect有些類似,但是返回的是k-v的字典
簡單的詞頻統計的例子:
rdd = sc.parallelize(["Hello hello", "Hello New York", "York says hello"])
resultRDD = (
rdd
.flatMap(lambda sentence: sentence.split(" ")) # split into words
.map(lambda word: word.lower()) # lowercase
.map(lambda word: (word, 1)) # count each appearance
.reduceByKey(lambda x, y: x + y) # add counts for each word
)
resultRDD.collect()
結果:
[('says', 1), ('new', 1), ('hello', 4), ('york', 2)]
若將結果以k-v字典的形式返回:
result = resultRDD.collectAsMap()
結果:
{'hello': 4, 'new': 1, 'says': 1, 'york': 2}
如果你想要出現頻次最高的2個詞,可以這麼做:
print(resultRDD
.sortBy(keyfunc=lambda (word, count): count, ascending=False)
.take(2))
結果:
[('hello', 4), ('york', 2)]
6,在給定2個RDD後,我們可以通過一個類似SQL的方式去join他們
如:
homesRDD = sc.parallelize([
('Brussels', 'John'),
('Brussels', 'Jack'),
('Leuven', 'Jane'),
('Antwerp', 'Jill'),
])
# Quality of life index for various cities
lifeQualityRDD = sc.parallelize([
('Brussels', 10),
('Antwerp', 7),
('RestOfFlanders', 5),
])
homesRDD.join(lifeQualityRDD).collect()
結果為:
[('Antwerp', ('Jill', 7)),
('Brussels', ('John', 10)),
('Brussels', ('Jack', 10))]
homesRDD.leftOuterJoin(lifeQualityRDD).collect()
結果為:
[('Antwerp', ('Jill', 7)),
('Brussels', ('John', 10)),
('Brussels', ('Jack', 10)),
('Leuven', ('Jane', None))]