Spark中groupByKey、reduceByKey與sortByKey
groupByKey把相同的key的數據分組到一個集合序列當中:
[("hello",1), ("world",1), ("hello",1), ("fly",1), ("hello",1), ("world",1)] --> [("hello",(1,1,1)),("word",(1,1)),("fly",(1))]
reduceByKey把相同的key的數據聚合到一起並進行相應的計算:
[("hello",1), ("world",1), ("hello",1), ("fly",1), ("hello",1), ("world",1)] add--> [("hello",3),("word",2),("fly",1)]
sortByKey按key的大小排序,默認為升序排序:
[(3,"hello"),(2,"word"),(1,"fly")] --> [(1,"fly"),(2,"word"),(3,"hello")]
groupByKey、reduceByKey及sortByKey的比較:
from pyspark import SparkConf, SparkContext
from operator import add
conf = SparkConf()
sc = SparkContext(conf=conf)
def func_by_key():
data = [
"hello world", "hello fly", "hello world",
"hello fly", "hello fly", "hello fly"
]
data_rdd = sc.parallelize(data)
word_rdd = data_rdd.flatMap(lambda s: s.split(" ")).map(lambda x: (x, 1))
group_by_key_rdd = word_rdd.groupByKey()
print("groupByKey:{}".format(group_by_key_rdd.mapValues(list).collect()))
print("groupByKey mapValues(len):{}".format(
group_by_key_rdd.mapValues(len).collect()
))
reduce_by_key_rdd = word_rdd.reduceByKey(add)
print("reduceByKey:{}".format(reduce_by_key_rdd.collect()))
print("sortByKey:{}".format(reduce_by_key_rdd.map(
lambda x: (x[1], x[0])
).sortByKey().map(lambda x: (x[0], x[1])).collect()))
func_by_key()
sc.stop()
"""
result:
groupByKey:[(‘fly‘, [1, 1, 1, 1]), (‘world‘, [1, 1]), (‘hello‘, [1, 1, 1, 1, 1, 1])]
groupByKey mapValues(len):[(‘fly‘, 4), (‘world‘, 2), (‘hello‘, 6)]
reduceByKey:[(‘fly‘, 4), (‘world‘, 2), (‘hello‘, 6)]
sortByKey:[(2, ‘world‘), (4, ‘fly‘), (6, ‘hello‘)]
"""
從結果可以看出,groupByKey對分組後的每個key的value做mapValues(len)後的結果與reduceByKey的結果一致,即:如果分組後要對每一個key所對應的值進行操作則應直接用reduceByKey;sortByKey是按key排序,如果要對value排序,可以交換key與value的位置,再排序。
Spark中groupByKey、reduceByKey與sortByKey