Spark運算元:RDD分割槽中的元素和數量統計
阿新 • • 發佈:2018-12-30
1、分割槽邏輯
Spark RDD是被分割槽的,在生成RDD時候,一般可以指定分割槽的數量,如果不指定分割槽數量,當RDD從集合建立時候,則預設為該程式所分配到的資源的CPU核數,如果是從HDFS檔案建立,預設為檔案的Block數。
2、分割槽元素統計
可以利用RDD的mapPartitionsWithIndex方法來統計每個分割槽中的元素及數量。
示例1:
假如建立一個RDD,預設分割槽15個,因為我的spark-shell指定了一共使用15個CPU資源。
(1)分割槽數
scala> var rdd1 = sc.makeRDD(1 to 50) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at :21 scala> rdd1.partitions.size res15: Int = 15
(2)分割槽中元素數量統計
rdd1.mapPartitionsWithIndex{ (partIdx,iter) => { var part_map = scala.collection.mutable.Map[String,Int]() while(iter.hasNext){ var part_name = "part_" + partIdx; if(part_map.contains(part_name)) { var ele_cnt = part_map(part_name) part_map(part_name) = ele_cnt + 1 } else { part_map(part_name) = 1 } iter.next() } part_map.iterator } }.collect res16: Array[(String, Int)] = Array((part_0,3), (part_1,3), (part_2,4), (part_3,3), (part_4,3), (part_5,4), (part_6,3), (part_7,3), (part_8,4), (part_9,3), (part_10,3), (part_11,4), (part_12,3), (part_13,3), (part_14,4)) //從part_0到part_14,每個分割槽中的元素數量
(3)分割槽中元素展示
rdd1.mapPartitionsWithIndex{ (partIdx,iter) => { var part_map = scala.collection.mutable.Map[String,List[Int]]() while(iter.hasNext){ var part_name = "part_" + partIdx; var elem = iter.next() if(part_map.contains(part_name)) { var elems = part_map(part_name) elems ::= elem part_map(part_name) = elems } else { part_map(part_name) = List[Int]{elem} } } part_map.iterator } }.collect res17: Array[(String, List[Int])] = Array((part_0,List(3, 2, 1)), (part_1,List(6, 5, 4)), (part_2,List(10, 9, 8, 7)), (part_3,List(13, 12, 11)), (part_4,List(16, 15, 14)), (part_5,List(20, 19, 18, 17)), (part_6,List(23, 22, 21)), (part_7,List(26, 25, 24)), (part_8,List(30, 29, 28, 27)), (part_9,List(33, 32, 31)), (part_10,List(36, 35, 34)), (part_11,List(40, 39, 38, 37)), (part_12,List(43, 42, 41)), (part_13,List(46, 45, 44)), (part_14,List(50, 49, 48, 47))) //從part_0到part14,每個分割槽中包含的元素
示例2:
從HDFS檔案建立一個RDD,包含65個分割槽,因為該檔案由65個Block。
(1)分割槽數
scala> var rdd2 = sc.textFile("/logs/2015-07-05/lxw1234.com.log")
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at textFile at :21
scala> rdd2.partitions.size
res18: Int = 65
(2)分割槽中元素數量統計
scala> rdd2.mapPartitionsWithIndex{
| (partIdx,iter) => {
| var part_map = scala.collection.mutable.Map[String,Int]()
| while(iter.hasNext){
| var part_name = "part_" + partIdx;
| if(part_map.contains(part_name)) {
| var ele_cnt = part_map(part_name)
| part_map(part_name) = ele_cnt + 1
| } else {
| part_map(part_name) = 1
| }
| iter.next()
| }
| part_map.iterator
|
| }
| }.collect
res19: Array[(String, Int)] = Array((part_0,202496), (part_1,225503), (part_2,214375), (part_3,215909),
(part_4,208941), (part_5,205379), (part_6,207894), (part_7,209496), (part_8,213806), (part_9,216962),
(part_10,216091), (part_11,215820), (part_12,217043), (part_13,216556), (part_14,218702), (part_15,218625),
(part_16,218519), (part_17,221056), (part_18,221250), (part_19,222092), (part_20,222339), (part_21,222779),
(part_22,223578), (part_23,222869), (part_24,221543), (part_25,219671), (part_26,222871), (part_27,223200),
(part_28,223282), (part_29,228212), (part_30,223978), (part_31,223024), (part_32,222889), (part_33,222106),
(part_34,221563), (part_35,219208), (part_36,216928), (part_37,216733), (part_38,217214), (part_39,219978),
(part_40,218155), (part_41,219880), (part_42,215833...