Spark Python API函式:pyspark API(2)
阿新 • • 發佈:2018-11-06
文章目錄
• 1 sortBy
• 2 glom
• 3 cartesian
• 4 groupBy
• 5 pipe
• 6 foreach
• 7 foreachPartition
• 8 collect
• 9 reduce
• 10 fold
• 11 aggregate
• 12 max
• 13 min
• 14 sum
• 15 count
sortBy
# sortBy x = sc.parallelize(['Cat','Apple','Bat']) def keyGen(val): return val[0] y = x.sortBy(keyGen) print(y.collect()) ['Apple', 'Bat', 'Cat']
glom
# glom x = sc.parallelize(['C','B','A'], 2) y = x.glom() print(x.collect()) print(y.collect()) ['C', 'B', 'A'] [['C'], ['B', 'A']]
cartesian
# cartesian x = sc.parallelize(['A','B']) y = sc.parallelize(['C','D']) z = x.cartesian(y) print(x.collect()) print(y.collect()) print(z.collect()) ['A', 'B'] ['C', 'D'] [('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]
groupBy
# groupBy x = sc.parallelize([1,2,3]) y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' ) print(x.collect()) # y is nested, this iterates through it print([(j[0],[i for i in j[1]]) for j in y.collect()]) [1, 2, 3] [('A', [1, 3]), ('B', [2])]
pipe
# pipe x = sc.parallelize(['A', 'Ba', 'C', 'AD']) y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows print(x.collect()) print(y.collect()) ['A', 'Ba', 'C', 'AD'] ['A', 'Ba', 'AD']
foreach
# foreach from __future__ import print_function x = sc.parallelize([1,2,3]) def f(el): '''side effect: append the current RDD elements to a file''' f1=open("./foreachExample.txt", 'a+') print(el,file=f1) # first clear the file contents open('./foreachExample.txt', 'w').close() y = x.foreach(f) # writes into foreachExample.txt print(x.collect()) print(y) # foreach returns 'None' # print the contents of foreachExample.txt with open("./foreachExample.txt", "r") as foreachExample: print (foreachExample.read()) [1, 2, 3] None 3 1 2
foreachPartition
# foreachPartition from __future__ import print_function x = sc.parallelize([1,2,3],5) def f(parition): '''side effect: append the current RDD partition contents to a file''' f1=open("./foreachPartitionExample.txt", 'a+') print([el for el in parition],file=f1) # first clear the file contents open('./foreachPartitionExample.txt', 'w').close() y = x.foreachPartition(f) # writes into foreachExample.txt print(x.glom().collect()) print(y) # foreach returns 'None' # print the contents of foreachExample.txt with open("./foreachPartitionExample.txt", "r") as foreachExample: print (foreachExample.read()) [[], [1], [], [2], [3]] None [] [] [1] [2] [3]
collect
# collect x = sc.parallelize([1,2,3]) y = x.collect() print(x) # distributed print(y) # not distributed ParallelCollectionRDD[87] at parallelize at PythonRDD.scala:382 [1, 2, 3]
reduce
# reduce x = sc.parallelize([1,2,3]) y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum print(x.collect()) print(y) [1, 2, 3] 6
fold
# fold x = sc.parallelize([1,2,3]) neutral_zero_value = 0 # 0 for sum, 1 for multiplication y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum print(x.collect()) print(y) [1, 2, 3] 6
aggregate
# aggregate x = sc.parallelize([2,3,4]) neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1])) y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product) print(x.collect()) print(y) [2, 3, 4] (9, 24)
max
# max x = sc.parallelize([1,3,2]) y = x.max() print(x.collect()) print(y) [1, 3, 2] 3
min
# min x = sc.parallelize([1,3,2]) y = x.min() print(x.collect()) print(y) [1, 3, 2] 1
sum
# sum x = sc.parallelize([1,3,2]) y = x.sum() print(x.collect()) print(y) [1, 3, 2] 6
count
# count x = sc.parallelize([1,3,2]) y = x.count() print(x.collect()) print(y) [1, 3, 2] 3