spark小技巧-mapPartitions
阿新 • • 發佈:2019-01-26
與map方法類似,map是對rdd中的每一個元素進行操作,而mapPartitions(foreachPartition)則是對rdd中的每個分割槽的迭代器進行操作。如果在map過程中需要頻繁建立額外的物件(例如將rdd中的資料通過jdbc寫入資料庫,map需要為每個元素建立一個連結而mapPartition為每個partition建立一個連結),則mapPartitions效率比map高的多。
SparkSql或DataFrame預設會對程式進行mapPartition的優化。
Demo
實現將每個數字變成原來的2倍的功能
比如:輸入2,結果(2,4)
使用map
val a = sc.parallelize(1 to 9, 3)
def mapDoubleFunc(a : Int) : (Int,Int) = {
(a,a*2)
}
val mapResult = a.map(mapDoubleFunc)
println(mapResult.collect().mkString)
結果
(1,2)(2,4)(3,6)(4,8)(5,10)(6,12)(7,14)(8,16)(9,18)
使用mapPartitions
val a = sc.parallelize(1 to 9, 3)
def doubleFunc(iter: Iterator[Int]) : Iterator[(Int,Int)] = {
var res = List[(Int,Int)]()
while (iter.hasNext)
{
val cur = iter.next;
res .::= (cur,cur*2)
}
res.iterator
}
val result = a.mapPartitions(doubleFunc)
println(result.collect().mkString)
結果
(3,6)(2,4)(1,2)(6,12)(5,10)(4,8)(9,18)(8,16)(7,14)