Spark運算元:RDD行動Action操作(4)–countByKey、foreach、foreachPartition、sortBy
阿新 • • 發佈:2019-01-01
關鍵字:Spark運算元、Spark函式、Spark RDD行動Action、countByKey、foreach、foreachPartition、sortBy
countByKey
def countByKey(): Map[K, Long]
countByKey用於統計RDD[K,V]中每個K的數量。
- scala>var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
- rdd1: org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[7
- scala> rdd1.countByKey
- res5: scala.collection.Map[String,Long]=Map(A ->2, B ->3)
foreach
def foreach(f: (T) ⇒ Unit): Unit
foreach用於遍歷RDD,將函式f應用於每一個元素。
但要注意,如果對RDD執行foreach,只會在Executor端有效,而並不是Driver端。
比如:rdd.foreach(println),只會在Executor的stdout中打印出來,Driver端是看不到的。
我在Spark1.4中是這樣,不知道是否真如此。
這時候,使用accumulator共享變數與foreach結合,倒是個不錯的選擇。
- scala>var cnt = sc.accumulator(0)
- cnt: org.apache.spark.Accumulator[Int]=0
- scala>var rdd1 = sc.makeRDD(1 to 10,2)
- rdd1: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[5] at makeRDD at :21
- scala> rdd1.foreach(x => cnt += x)
- scala> cnt.value
- res51
- scala> rdd1.collect.foreach(println)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
foreachPartition
def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
foreachPartition和foreach類似,只不過是對每一個分割槽使用f。
- scala>var rdd1 = sc.makeRDD(1 to 10,2)
- rdd1: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[5] at makeRDD at :21
- scala>var allsize = sc.accumulator(0)
- size: org.apache.spark.Accumulator[Int]=0
- scala>var rdd1 = sc.makeRDD(1 to 10,2)
- rdd1: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[6] at makeRDD at :21
- scala> rdd1.foreachPartition { x =>{
- | allsize += x.size
- |}}
- scala> println(allsize.value)
- 10
sortBy
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
sortBy根據給定的排序k函式將RDD中的元素進行排序。
- scala>var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
- scala> rdd1.sortBy(x => x).collect
- res1:Array[Int]=Array(0,1,2,3,6,7)//預設升序
- scala> rdd1.sortBy(x => x,false).collect
- res2:Array[Int]=Array(7,6,3,2,1,0)//降序
- //RDD[K,V]型別
- scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
- scala> rdd1.sortBy(x => x).collect
- res3:Array[(String,Int)]=Array((A,1),(A,2),(B,3),(B,6),(B,7))
- //按照V進行降序排序
- scala> rdd1.sortBy(x => x._2,false).collect
- res4:Array[(String,Int)]=Array((B,7),(B,6),(B,3),(A,2),(A,1))
更多關於Spark運算元的介紹,可參考spark運算元系列文章: