1. 程式人生 > >Spark mapPartitions 及mapPartitionsWithIndex算子

Spark mapPartitions 及mapPartitionsWithIndex算子

tor strong sca ole UNC 耗時 con spa ont

mapPartitions

與map類似,map函數是應用到每個元素,而mapPartitions的輸入函數是每個分區的數據,把每個分區中的內容作為整體來處理的。 當map裏面有比較耗時的初始化操作時,比如連接db,可以采用mapPartitions,它對每個partition操作一次,其函數的輸入與輸出都是iterator類型。

實例如下:

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={ | var res=List[(T,T)]() | var pre=iter.next | while (iter.hasNext) { | val cur=iter.next | res.::=(pre,cur) | pre=cur | } | res.iterator | }myfunc: [T](iter: Iterator[T])Iterator[(T, T)]scala> rdd1.mapPartitions(myfunc)res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitions at <console>:28scala> res2.collect()res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

mapPartitionsWithIndex

mapPartitions 類似,參數需多傳一個分區的index.

實例如下:

scala> val mapReslut=rdd1.mapPartitionsWithIndex{ | (index,iterator)=>{ | val list=iterator.toList | list.map(x=>x +"->"+index).iterator | } | }mapReslut: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:25scala> mapReslut.collectres6: Array[String] = Array(1->0, 2->0, 3->0, 4->1, 5->1, 6->1, 7->2, 8->2, 9->2)

Spark mapPartitions 及mapPartitionsWithIndex算子