1. 程式人生 > >Spark運算元篇 --Spark運算元之aggregateByKey詳解

Spark運算元篇 --Spark運算元之aggregateByKey詳解

一。基本介紹

rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函式是初始值

3代表每次分完組之後的每個組的初始值。

seqFunc代表combine的聚合邏輯

每一個mapTask的結果的聚合成為combine

combFunc reduce端大聚合的邏輯

ps:aggregateByKey預設分組

二。程式碼

from pyspark import SparkConf,SparkContext
from __builtin__ import str
conf = SparkConf().setMaster("local").setAppName("
AggregateByKey") sc = SparkContext(conf = conf) rdd = sc.parallelize([(1,1),(1,2),(2,1),(2,3),(2,4),(1,7)],2) def f(index,items): print "partitionId:%d" %index for val in items: print val return items rdd.mapPartitionsWithIndex(f, False).count() def seqFunc(a,b): print
"seqFunc:%s,%s" %(a,b) return max(a,b) #取最大值 def combFunc(a,b): print "combFunc:%s,%s" %(a ,b) return a + b #累加起來 ''' aggregateByKey這個運算元內部肯定有分組 ''' aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc) rest = aggregateRDD.collectAsMap() for k,v in rest.items(): print k,v sc.stop()

三。詳細邏輯

PS:

seqFunc函式 combine篇。

3是每個分組的最大值,所以把3傳進來,在combine函式中也就是seqFunc中第一次呼叫 3代表a,b即1,max(a,b)即3 第二次再呼叫則max(3.1)中的最大值3即輸入值,2即b值 所以結果則為(1,3)

底下類似。combine函式呼叫的次數與分組內的資料個數一致。

combFunc函式 reduce聚合

在reduce端大聚合拉完資料後也是先分組,然後再呼叫combFunc函式

四。結果

持續更新中。。。。,歡迎大家關注我的公眾號LHWorld.