1. 程式人生 > >spark RDD的元素順序(ordering)測試

spark RDD的元素順序(ordering)測試

通過實驗發現:
foreach()遍歷的順序是亂的
但:
collect()取到的結果是依照原順序的
take()取到的結果是依照原順序的

為什麼呢????

另外,可以發現:
take()取到了指定數目的元素,就不再多取了

scala> val rdd = sc.makeRDD((0 to 9), 4)
scala> rdd.collect
res27: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> rdd.partitions
res13: Array[org.apache.spark.Partition
] = Array(org.apache.spark.rdd.ParallelCollectionPartition@691, org.apache.spark.rdd.ParallelCollectionPartition@692, org.apache.spark.rdd.ParallelCollectionPartition@693, org.apache.spark.rdd.ParallelCollectionPartition@694) scala> rdd.foreach(print(_)) 0178923456 scala> rdd.foreach(print(_)) 5623401789
scala> rdd.coalesce(1, false).foreach(print _) 0123456789 scala> rdd.coalesce(1, false).partitions res28: Array[org.apache.spark.Partition] = Array(CoalescedRDDPartition(0,ParallelCollectionRDD[0] at makeRDD at <console>:21,[I@63a3554,None)) scala> rdd.foreachPartition((x:Iterator[Int])=>println(x
.next)) 2 0 5 7 scala> rdd.mapPartitions((x:Iterator[Int])=>Array(x.next()).iterator).collect res4: Array[Int] = Array(0, 2, 5, 7) scala> rdd.keyBy((x:Int)=>x/4).collect res27: Array[(Int, Int)] = Array((0,0), (0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (1,7), (2,8), (2,9)) scala> rdd.groupBy(_/4).collect res7: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(0, 1, 2, 3)), (1,CompactBuffer(4, 5, 6, 7)), (2,CompactBuffer(8, 9))) scala> val jr = rdd.toJavaRDD jr: org.apache.spark.api.java.JavaRDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:21 scala> jr.collectPartitions(Array(0,1)) res20: Array[java.util.List[Int]] = Array([0, 1], [2, 3, 4])
implicit object StringAccumulator extends org.apache.spark.AccumulatorParam[String]{
def addInPlace(r1: String, r2: String) = r1 + "," + r2
def zero(initialValue: String) = ""
}

scala> val a = sc.accumulator("")
a: org.apache.spark.Accumulator[String] = 

scala> sc.parallelize(0 to 1000, 99).flatMap((i:Int)=>{a+="f1-"+i; (i*2 to i*2 + 1)}).flatMap((i:Int)=>{a+="f2-"+i; (i*2 to i*2 + 1)}).take(10)
res2: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> a
res3: org.apache.spark.Accumulator[String] = ,,f1-0,f2-0,f2-1,f1-1,f2-2,f2-3,f1-2,f2-4