1. 程式人生 > >Spark Python API函式:pyspark API(2)

Spark Python API函式:pyspark API(2)

文章目錄

•    1 sortBy
•    2 glom
•    3 cartesian
•    4 groupBy
•    5 pipe
•    6 foreach
•    7 foreachPartition
•    8 collect
•    9 reduce
•    10 fold
•    11 aggregate
•    12 max
•    13 min
•    14 sum
•    15 count

sortBy

spark sortBy

# sortBy
x = sc.parallelize(['Cat','Apple','Bat'])
def keyGen(val): return val[0]
y = x.sortBy(keyGen)
print(y.collect())

['Apple', 'Bat', 'Cat']

glom

spark glom

# glom
x = sc.parallelize(['C','B','A'], 2)
y = x.glom()
print(x.collect()) 
print(y.collect())

['C', 'B', 'A']
[['C'], ['B', 'A']]

cartesian

spark cartesian

# cartesian
x = sc.parallelize(['A','B'])
y = sc.parallelize(['C','D'])
z = x.cartesian(y)
print(x.collect())
print(y.collect())
print(z.collect())

['A', 'B']
['C', 'D']
[('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]

groupBy

spark groupBy

# groupBy
x = sc.parallelize([1,2,3])
y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
print(x.collect())
# y is nested, this iterates through it
print([(j[0],[i for i in j[1]]) for j in y.collect()]) 

[1, 2, 3]
[('A', [1, 3]), ('B', [2])]

pipe

spark pipe

# pipe
x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows 
print(x.collect())
print(y.collect())

['A', 'Ba', 'C', 'AD']
['A', 'Ba', 'AD']

foreach

spark foreach

# foreach
from __future__ import print_function
x = sc.parallelize([1,2,3])
def f(el):
    '''side effect: append the current RDD elements to a file'''
    f1=open("./foreachExample.txt", 'a+') 
    print(el,file=f1)

# first clear the file contents
open('./foreachExample.txt', 'w').close()  

y = x.foreach(f) # writes into foreachExample.txt

print(x.collect())
print(y) # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachExample.txt", "r") as foreachExample:
    print (foreachExample.read())
    
[1, 2, 3]
None
3
1
2

foreachPartition

spark foreachPartition

# foreachPartition
from __future__ import print_function
x = sc.parallelize([1,2,3],5)
def f(parition):
    '''side effect: append the current RDD partition contents to a file'''
    f1=open("./foreachPartitionExample.txt", 'a+') 
    print([el for el in parition],file=f1)

# first clear the file contents
open('./foreachPartitionExample.txt', 'w').close()  

y = x.foreachPartition(f) # writes into foreachExample.txt

print(x.glom().collect())
print(y)  # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachPartitionExample.txt", "r") as foreachExample:
    print (foreachExample.read())

[[], [1], [], [2], [3]]
None
[]
[]
[1]
[2]
[3]

collect

spark collect

# collect
x = sc.parallelize([1,2,3])
y = x.collect()
print(x)  # distributed
print(y)  # not distributed

ParallelCollectionRDD[87] at parallelize at PythonRDD.scala:382
[1, 2, 3]

reduce

spark reduce

# reduce
x = sc.parallelize([1,2,3])
y = x.reduce(lambda obj, accumulated: obj + accumulated)  # computes a cumulative sum
print(x.collect())
print(y)

[1, 2, 3]
6

fold

spark fold

# fold
x = sc.parallelize([1,2,3])
neutral_zero_value = 0  # 0 for sum, 1 for multiplication
y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum
print(x.collect())
print(y)

[1, 2, 3]
6

aggregate

spark aggregate

# aggregate
x = sc.parallelize([2,3,4])
neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x
seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) 
combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1]))
y = x.aggregate(neutral_zero_value,seqOp,combOp)  # computes (cumulative sum, cumulative product)
print(x.collect())
print(y)

[2, 3, 4]
(9, 24)

max

spark max

# max
x = sc.parallelize([1,3,2])
y = x.max()
print(x.collect())
print(y)

[1, 3, 2]
3

min

spark min

# min
x = sc.parallelize([1,3,2])
y = x.min()
print(x.collect())
print(y)

[1, 3, 2]
1

sum

spark sum

# sum
x = sc.parallelize([1,3,2])
y = x.sum()
print(x.collect())
print(y)

[1, 3, 2]
6

count

spark count

# count
x = sc.parallelize([1,3,2])
y = x.count()
print(x.collect())
print(y)

[1, 3, 2]
3