1. 程式人生 > >【機器學習】pyspark中RDD的若干操作

【機器學習】pyspark中RDD的若干操作

1,讀取檔案

from pyspark import SparkContext
sc = SparkContext('local', 'pyspark')

a,text = sc.textFile(“file:///d:/test.txt”)
b,rdd = sc.parallelize([1,2,3,4,5])

2,RDD的操作
大家還對python的list comprehension有印象嗎,RDDs可以進行一系列的變換得到新的RDD,有點類似那個過程,我們先給大家提一下RDD上最最常用到的transformation:

  • map() 對RDD的每一個item都執行同一個操作
  • flatMap() 對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list
  • filter() 篩選出來滿足條件的item distinct() 對RDD中的item去重
  • sample() 從RDD中的item中取樣一部分出來,有放回或者無放回
  • sortBy() 對RDD中的item進行排序
    如果你想看操作後的結果,可以用一個叫做collect()的action把所有的item轉成一個Python list。
    一個簡單的例子
numbersRDD = sc.parallelize(range(1,10+1))
print(numbersRDD.collect())

squaresRDD = numbersRDD.map(lambda x: x
**2) # Square every number print(squaresRDD.collect()) filteredRDD = numbersRDD.filter(lambda x: x % 2 == 0) # Only the evens

結果

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
[2, 4, 6, 8, 10]

flatMap()與Map()的關係相當於,Python中list的extend和append的關係。

前面提到的Transformation,可以一個接一個地串聯,比如:

def doubleIfOdd(x):
    if x % 2 == 1:
        return 2 * x
    else:
        return x

resultRDD = (numbersRDD           # In parentheses so we can write each
             .map(doubleIfOdd)    # transformation in one line
             .filter(lambda x: x > 6)
             .distinct())

resultRDD.collect()

結果:

[8, 10, 18, 14]

3,RDD間的操作
如果你手頭上有2個RDD了,下面的這些操作能夠幫你對他們以個種方式組合得到1個RDD:

  • rdd1.union(rdd2): 所有rdd1和rdd2中的item組合
  • rdd1.intersection(rdd2): rdd1 和 rdd2的交集
  • rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item(差集)
  • rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡爾乘積
    如下:
numbersRDD = sc.parallelize([1,2,3])
moreNumbersRDD = sc.parallelize([2,3,4])
numbersRDD.union(moreNumbersRDD).collect()

結果:

[1, 2, 3, 2, 3, 4]

特別注意:Spark的一個核心概念是惰性計算。當你把一個RDD轉換成另一個的時候,這個轉換不會立即生效執行!!!
Spark會把它先記在心裡,等到真的需要拿到轉換結果的時候,才會重新組織你的transformations(因為可能有一連串的變換)
這樣可以避免不必要的中間結果儲存和通訊。
剛才提到了惰性計算,那麼什麼東西能讓它真的執行轉換與運算呢? 是的,就是我們馬上提到的Actions,下面是常見的action,當他們出現的時候,表明我們需要執行剛才定義的transform了:

  • collect(): 計算所有的items並返回所有的結果到driver端,接著 collect()會以Python list的形式返回結果
  • first(): 和上面是類似的,不過只返回第1個item
  • take(n): 類似,但是返回n個item
  • count(): 計算RDD中item的個數
  • top(n): 返回頭n個items,按照自然結果排序
  • reduce(): 對RDD中的items做聚合
    reduce的例子:
rdd = sc.parallelize(range(1,10+1))
rdd.reduce(lambda x, y: x + y)

結果:

55

5,針對更復雜結構的transform和action
有時候我們會遇到更復雜的結構,比如非常非常經典的是以元組形式組織的k-v對(key, value)我們把它叫做pair RDDs,而Sark中針對這種item結構的資料,定義了一些transform和action:

  • reduceByKey(): 對所有有著相同key的items執行reduce操作
  • groupByKey(): 返回類似(key, listOfValues)元組的RDD,後面的value List 是同一個key下面的
  • sortByKey(): 按照key排序
  • countByKey(): 按照key去對item個數進行統計
  • collectAsMap(): 和collect有些類似,但是返回的是k-v的字典
    簡單的詞頻統計的例子:
rdd = sc.parallelize(["Hello hello", "Hello New York", "York says hello"])
resultRDD = (
    rdd
    .flatMap(lambda sentence: sentence.split(" "))  # split into words
    .map(lambda word: word.lower())                 # lowercase
    .map(lambda word: (word, 1))                    # count each appearance
    .reduceByKey(lambda x, y: x + y)                # add counts for each word
)
resultRDD.collect()

結果:

[('says', 1), ('new', 1), ('hello', 4), ('york', 2)]

若將結果以k-v字典的形式返回:

result = resultRDD.collectAsMap()

結果:

{'hello': 4, 'new': 1, 'says': 1, 'york': 2}

如果你想要出現頻次最高的2個詞,可以這麼做:

print(resultRDD
      .sortBy(keyfunc=lambda (word, count): count, ascending=False)
      .take(2))

結果:

[('hello', 4), ('york', 2)]

6,在給定2個RDD後,我們可以通過一個類似SQL的方式去join他們
如:

homesRDD = sc.parallelize([
        ('Brussels', 'John'),
        ('Brussels', 'Jack'),
        ('Leuven', 'Jane'),
        ('Antwerp', 'Jill'),
    ])

# Quality of life index for various cities
lifeQualityRDD = sc.parallelize([
        ('Brussels', 10),
        ('Antwerp', 7),
        ('RestOfFlanders', 5),
    ])
homesRDD.join(lifeQualityRDD).collect()

結果為:

[('Antwerp', ('Jill', 7)),
 ('Brussels', ('John', 10)),
 ('Brussels', ('Jack', 10))]
homesRDD.leftOuterJoin(lifeQualityRDD).collect()

結果為:

[('Antwerp', ('Jill', 7)),
 ('Brussels', ('John', 10)),
 ('Brussels', ('Jack', 10)),
 ('Leuven', ('Jane', None))]