1. 程式人生 > >Spark Python API函式:pyspark API(4)

Spark Python API函式:pyspark API(4)

文章目錄

•    1 countByKey
•    2 join
•    3 leftOuterJoin
•    4 rightOuterJoin
•    5 partitionBy
•    6 combineByKey
•    7 aggregateByKey
•    8 foldByKey
•    9 groupByKey
•    10 flatMapValues
•    11 mapValues
•    12 groupWith
•    13 cogroup
•    14 sampleByKey
•    15 subtractByKey
•    16 subtract
•    17 keyBy
•    18 repartition
•    19 coalesce
•    20 zip
•    21 zipWithIndex
•    22 zipWithUniqueId

countByKey

spark countByKey

# countByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])

y = x.countByKey()

print(x.collect())

print(y)

 

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]

defaultdict(<type 'int'>, {'A': 3, 'B': 2})

join

spark join

# join

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])

y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])

z = x.join(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

[('C', 4), ('B', 3), ('A', 2), ('A', 1)]

[('A', 8), ('B', 7), ('A', 6), ('D', 5)]

[('A', (2, 8)), ('A', (2, 6)), (

'A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]

leftOuterJoin

spark leftOuterJoin

# leftOuterJoin

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])

y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])

z = x.leftOuterJoin(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

[('C', 4), ('B', 3), ('A', 2), ('A', 1)]

[('A', 8), ('B', 7), ('A', 6), ('D', 5)]

[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('C', (4, None)), ('B', (3, 7))]

rightOuterJoin

spark rightOuterJoin

# rightOuterJoin

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])

y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])

z = x.rightOuterJoin(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

[('C', 4), ('B', 3), ('A', 2), ('A', 1)]

[('A', 8), ('B', 7), ('A', 6), ('D', 5)]

[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7)), ('D', (None, 5))]

partitionBy

spark partitionBy

# partitionBy

x = sc.parallelize([(0,1),(1,2),(2,3)],2)

y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x)  # only key is passed to paritionFunc

print(x.glom().collect())

print(y.glom().collect())

 

[[(0, 1)], [(1, 2), (2, 3)]]

[[(0, 1)], [(1, 2)], [(2, 3)]]

combineByKey

spark combineByKey

# combineByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])

createCombiner = (lambda el: [(el,el**2)])

mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated

mergeComb = (lambda agg1,agg2: agg1 + agg2 )  # append agg1 with agg2

y = x.combineByKey(createCombiner,mergeVal,mergeComb)

print(x.collect())

print(y.collect())

 

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]

[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

aggregateByKey

spark aggregateByKey

# aggregateByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])

zeroValue = [] # empty list is 'zero value' for append operation

mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)])

mergeComb = (lambda agg1,agg2: agg1 + agg2 )

y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)

print(x.collect())

print(y.collect())

 

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]

[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

foldByKey

spark foldByKey

# foldByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])

zeroValue = 1 # one is 'zero value' for multiplication

y = x.foldByKey(zeroValue,lambda agg,x: agg*x )  # computes cumulative product within each key

print(x.collect())

print(y.collect())

 

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]

[('A', 60), ('B', 2)]

groupByKey

spark groupByKey

# groupByKey

x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])

y = x.groupByKey()

print(x.collect())

print([(j[0],[i for i in j[1]]) for j in y.collect()])

 

[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]

[('A', [3, 2, 1]), ('B', [5, 4])]

flatMapValues

spark flatMapValues

# flatMapValues

x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])

y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened

print(x.collect())

print(y.collect())

 

[('A', (1, 2, 3)), ('B', (4, 5))]

[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]

mapValues

spark mapValues

# mapValues

x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])

y = x.mapValues(lambda x: [i**2 for i in x]) # function is applied to entire value

print(x.collect())

print(y.collect())

 

[('A', (1, 2, 3)), ('B', (4, 5))]

[('A', [1, 4, 9]), ('B', [16, 25])]

groupWith

spark groupWith

# groupWith

x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])

y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])

z = sc.parallelize([('D',9),('B',(8,8))])

a = x.groupWith(y,z)

print(x.collect())

print(y.collect())

print(z.collect())

print("Result:")

for key,val in list(a.collect()):

    print(key, [list(i) for i in val])

 

[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]

[('B', (7, 7)), ('A', 6), ('D', (5, 5))]

[('D', 9), ('B', (8, 8))]

Result:

D [[], [(5, 5)], [9]]

C [[4], [], []]

B [[(3, 3)], [(7, 7)], [(8, 8)]]

A [[2, (1, 1)], [6], []]

cogroup

spark cogroup

# cogroup

x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])

y = sc.parallelize([('A',8),('B',7),('A',6),('D',(5,5))])

z = x.cogroup(y)

print(x.collect())

print(y.collect())

for key,val in list(z.collect()):

    print(key, [list(i) for i in val])

 

[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]

[('A', 8), ('B', 7), ('A', 6), ('D', (5, 5))]

A [[2, (1, 1)], [8, 6]]

C [[4], []]

B [[(3, 3)], [7]]

D [[], [(5, 5)]]

sampleByKey

spark sampleByKey

# sampleByKey

x = sc.parallelize([('A',1),('B',2),('C',3),('B',4),('A',5)])

y = x.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2})

print(x.collect())

print(y.collect())

 

[('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)]

[('B', 2), ('C', 3), ('B', 4)]

subtractByKey

spark subtractByKey

# subtractByKey

x = sc.parallelize([('C',1),('B',2),('A',3),('A',4)])

y = sc.parallelize([('A',5),('D',6),('A',7),('D',8)])

z = x.subtractByKey(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

[('C', 1), ('B', 2), ('A', 3), ('A', 4)]

[('A', 5), ('D', 6), ('A', 7), ('D', 8)]

[('C', 1), ('B', 2)]

subtract

spark subtract

# subtract

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])

y = sc.parallelize([('C',8),('A',2),('D',1)])

z = x.subtract(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

[('C', 4), ('B', 3), ('A', 2), ('A', 1)]

[('C', 8), ('A', 2), ('D', 1)]

[('A', 1), ('C', 4), ('B', 3)]

keyBy

spark keyBy

# keyBy

x = sc.parallelize([1,2,3])

y = x.keyBy(lambda x: x**2)

print(x.collect())

print(y.collect())

 

[1, 2, 3]

[(1, 1), (4, 2), (9, 3)]

repartition

spark repartition

# repartition

x = sc.parallelize([1,2,3,4,5],2)

y = x.repartition(numPartitions=3)

print(x.glom().collect())

print(y.glom().collect())

 

[[1, 2], [3, 4, 5]]

[[], [1, 2, 3, 4], [5]]

coalesce

spark coalesce

# coalesce

x = sc.parallelize([1,2,3,4,5],2)

y = x.coalesce(numPartitions=1)

print(x.glom().collect())

print(y.glom().collect())

 

[[1, 2], [3, 4, 5]]

[[1, 2, 3, 4, 5]]

zip

spark zip

# zip

x = sc.parallelize(['B','A','A'])

# zip expects x and y to have same #partitions and #elements/partition

y = x.map(lambda x: ord(x)) 

z = x.zip(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

['B', 'A', 'A']

[66, 65, 65]

[('B', 66), ('A', 65), ('A', 65)]

zipWithIndex

spark zipWithIndex

# zipWithIndex

x = sc.parallelize(['B','A','A'],2)

y = x.zipWithIndex()

print(x.glom().collect())

print(y.collect())

 

[['B'], ['A', 'A']]

[('B', 0), ('A', 1), ('A', 2)]

zipWithUniqueId

spark zipWithUniqueId

# zipWithUniqueId

x = sc.parallelize(['B','A','A'],2)

y = x.zipWithUniqueId()

print(x.glom().collect())

print(y.collect())

 

[['B'], ['A', 'A']]

[('B', 0), ('A', 1), ('A', 3)]