Spark中map與flatMap
阿新 • • 發佈:2018-10-06
必須 ret iter ext within serve 函數 range fail
map將函數作用到數據集的每一個元素上,生成一個新的分布式的數據集(RDD)返回
map函數的源碼:
def map(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each element of this RDD.
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[(‘a‘, 1), (‘b‘, 1), (‘c‘, 1)]
"""
def func(_, iterator):
return map(fail_on_stopiteration(f), iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
map將每一條輸入執行func操作並對應返回一個對象,形成一個新的rdd,如源碼中的rdd.map(lambda x: (x, 1) --> [(‘a‘, 1), (‘b‘, 1), (‘c‘, 1)]
flatMap會先執行map的操作,再將所有對象合並為一個對象, 返回值是一個Sequence
flatMap源碼:
def flatMap(self, f, preservesPartitioning=False):
"""
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(s, iterator):
return chain.from_iterable(map(fail_on_stopiteration(f), iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
註意:flatMap將輸入執行func操作時,對象必須是可叠代的
map與flatMap的區別:
1 from pyspark import SparkConf, SparkContext
2
3 conf = SparkConf()
4 sc = SparkContext(conf=conf)
5
6
7 def func_map():
8 data = ["hello world", "hello fly"]
9 data_rdd = sc.parallelize(data)
10 map_rdd = data_rdd.map(lambda s: s.split(" "))
11 print("map print:{}".format(map_rdd.collect()))
12
13
14 def func_flat_map():
15 data = ["hello world", "hello fly"]
16 data_rdd = sc.parallelize(data)
17 flat_rdd = data_rdd.flatMap(lambda s: s.split(" "))
18 print("flatMap print:{}".format(flat_rdd.collect()))
19
20
21 func_map()
22 func_flat_map()
23 sc.stop()
執行結果:
map print:[[‘hello‘, ‘world‘], [‘hello‘, ‘fly‘]]
flatMap print:[‘hello‘, ‘world‘, ‘hello‘, ‘fly‘]
可以看出,map對 "hello world", "hello fly"這兩個對象分別映射為[‘hello‘, ‘world‘], [‘hello‘, ‘fly‘],而flatMap在map的基礎上做了一個合並操作,將這兩個對象合並為一個[‘hello‘, ‘world‘, ‘hello‘, ‘fly‘],這就造就了flatMap在詞頻統計方面的優勢。
Spark中map與flatMap