1. 程式人生 > >Spark Python API函式:pyspark API(1)

Spark Python API函式:pyspark API(1)

文章目錄

•    1 pyspark version
•    2 map
•    3 flatMap
•    4 mapPartitions
•    5 mapPartitionsWithIndex
•    6 getNumPartitions
•    7 filter
•    8 distinct
•    9 sample
•    10 takeSample
•    11 union
•    12 intersection
•    13 sortByKey

pyspark version

# print Spark version

print("pyspark version:" + str(sc.version))

 

pyspark version:1.2.2

map

spark map

# map

# sc = spark context, parallelize creates an RDD from the passed object

x = sc.parallelize([1,2,3])

y = x.map(lambda x: (x,x**2))

 

# collect copies RDD elements to a list on the driver

print(x.collect())

print(y.collect())

 

[1, 2, 3]

[(1, 1), (2, 4), (3, 9)]

flatMap

spark flatMap

# flatMap

x = sc.parallelize([1,2,3])

y = x.flatMap(lambda x: (x, 100*x, x**2))

print(x.collect())

print(y.collect())

 

[1,

2, 3]

[1, 100, 1, 2, 200, 4, 3, 300, 9]

mapPartitions

spark mapPartitions

# mapPartitions

x = sc.parallelize([1,2,3], 2)

def f(iterator): yield sum(iterator)

y = x.mapPartitions(f)

# glom() flattens elements on the same partition

print(x.glom().collect()) 

print(y.glom().collect())

 

[[1], [2, 3]]

[[1], [5]]

mapPartitionsWithIndex

spark mapPartitionsWithIndex

# mapPartitionsWithIndex

x = sc.parallelize([1,2,3], 2)

def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))

y = x.mapPartitionsWithIndex(f)

 

# glom() flattens elements on the same partition

print(x.glom().collect()) 

print(y.glom().collect())

 

[[1], [2, 3]]

[[(0, 1)], [(1, 5)]]

getNumPartitions

spark getNumPartitions

# getNumPartitions

x = sc.parallelize([1,2,3], 2)

y = x.getNumPartitions()

print(x.glom().collect())

print(y)

 

[[1], [2, 3]]

2

filter

spark filter

# filter

x = sc.parallelize([1,2,3])

y = x.filter(lambda x: x%2 == 1# filters out even elements

print(x.collect())

print(y.collect())

 

[1, 2, 3]

[1, 3]

distinct

spark distinct

# distinct

x = sc.parallelize(['A','A','B'])

y = x.distinct()

print(x.collect())

print(y.collect())

 

['A', 'A', 'B']

['A', 'B']

sample

spark sample

# sample

x = sc.parallelize(range(7))

# call 'sample' 5 times

ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)]

print('x = ' + str(x.collect()))

for cnt,y in zip(range(len(ylist)), ylist):

    print('sample:' + str(cnt) + ' y = ' +  str(y.collect()))

 

x = [0, 1, 2, 3, 4, 5, 6]

sample:0 y = [0, 2, 5, 6]

sample:1 y = [2, 6]

sample:2 y = [0, 4, 5, 6]

sample:3 y = [0, 2, 6]

sample:4 y = [0, 3, 4]

takeSample

spark takeSample

# takeSample

x = sc.parallelize(range(7))

# call 'sample' 5 times

ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)] 

print('x = ' + str(x.collect()))

for cnt,y in zip(range(len(ylist)), ylist):

    print('sample:' + str(cnt) + ' y = ' +  str(y))  # no collect on y

 

x = [0, 1, 2, 3, 4, 5, 6]

sample:0 y = [0, 2, 6]

sample:1 y = [6, 4, 2]

sample:2 y = [2, 0, 4]

sample:3 y = [5, 4, 1]

sample:4 y = [3, 1, 4]

union

spark union

# union

x = sc.parallelize(['A','A','B'])

y = sc.parallelize(['D','C','A'])

z = x.union(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

['A', 'A', 'B']

['D', 'C', 'A']

['A', 'A', 'B', 'D', 'C', 'A']

intersection

spark intersection

# intersection

x = sc.parallelize(['A','A','B'])

y = sc.parallelize(['A','C','D'])

z = x.intersection(y)

print(x.collect())

print(y.collect())

print(z.collect())

 

['A', 'A', 'B']

['A', 'C', 'D']

['A']

sortByKey

spark sortByKey

# sortByKey

x = sc.parallelize([('B',1),('A',2),('C',3)])

y = x.sortByKey()

print(x.collect())

print(y.collect())

 

[('B', 1), ('A', 2), ('C', 3)]

[('A', 2), ('B', 1), ('C', 3)]