Spark算子之aggregateByKey詳解
阿新 • • 發佈:2018-10-27
all item bubuko 最大 name rest map com class
一、基本介紹
rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函數是初始值
3代表每次分完組之後的每個組的初始值。
seqFunc代表combine的聚合邏輯
每一個mapTask的結果的聚合成為combine
combFunc reduce端大聚合的邏輯
ps:aggregateByKey默認分組
二、源碼
三、代碼
from pyspark import SparkConf,SparkContext
from __builtin__ import str
conf = SparkConf().setMaster("local").setAppName(" AggregateByKey")
sc = SparkContext(conf = conf)
rdd = sc.parallelize([(1,1),(1,2),(2,1),(2,3),(2,4),(1,7)],2)
def f(index,items):
print "partitionId:%d" %index
for val in items:
print val
return items
rdd.mapPartitionsWithIndex(f, False).count()
def seqFunc(a,b):
print "seqFunc:%s,%s" %(a,b)
return max(a,b) #取最大值
def combFunc(a,b):
print "combFunc:%s,%s" %(a ,b)
return a + b #累加起來
‘‘‘
aggregateByKey這個算子內部肯定有分組
‘‘‘
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
rest = aggregateRDD.collectAsMap()
for k,v in rest.items():
print k,v
sc.stop()
四、詳細邏輯
PS:seqFunc函數 combine篇。
3是每個分組的最大值,所以把3傳進來,在combine函數中也就是seqFunc中第一次調用 3代表a,b即1,max(a,b)即3 第二次再調用則max(3.1)中的最大值3即輸入值,2即b值 所以結果則為(1,3)
底下類似。combine函數調用的次數與分組內的數據個數一致。
combFunc函數 reduce聚合
在reduce端大聚合,拉完數據後也是先分組,然後再調用combFunc函數
五、結果
Spark算子之aggregateByKey詳解