1. 程式人生 > >spark RDD 基本操作

spark RDD 基本操作

1. map:    對當前元素做一個對映    val array = Array(1,2,3)    val rdd = sc.parallelize(array).map(r => 2*r) 2. filter:    過濾出符合條件的元組  val array = Array(1,2,3)    val rdd = sc.parallelize(array).filter(r => r > 2) 3. flatMap:     壓縮行, 相當於拆分某一個列欄位     val array = Array(Array(1,2), Array(3,4), Array(5,6)) sc.parallelize(array).flatMap(r => r) 4. reduceByKey:    相當於group by 同一個key 的分組資料進行聚合, key 為第一個 val array = Array(Array(1,2), Array(3,4), Array(5,6))
sc.parallelize(array).flatMap(r => r).map(r => (r, 1)).reduceByKey((a, b) => a + b)
5. reduce:    每個元組只有一個元素的時候,使用reduce val array = Array(Array(1,2), Array(3,4), Array(5,6))
sc.parallelize(array).flatMap(r => r).reduce((a, b) => a + b) 6.mapPartitions:  對於每一個partition 進行處理,外層基於partitions 進行處理, 內層基於每個patition進行處理 val array = Array(Array(1,2), Array(3,4), Array(5,6))
 val rdd = sc.parallelize(array).flatMap(r => r).mapPartitions(rows => rows.map(row => 3*row)) 7. mapPartitionsWithIndex:
不同的patitions可能有不同的處理的方式的時候使用 val array = Array(Array(1,2), Array(3,4), Array(5,6))
 sc.parallelize(array).flatMap(r => r).mapPartitionsWithIndex((index, rows) => rows.map(row => {if(index == 2) 3*row else 4*row})) 8.  sample: 抽樣操作,從RDD的資料集中出去多大比例的資料 val array = Array(Array(1,2), Array(3,4), Array(5,6))
rdd = sc.parallelize(array).flatMap(r => r).sample(false, 0.5, 10000) 9. union: 合併兩個RDD val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd = sc.parallelize(array).flatMap(r => r).union(sc.parallelize(Array(1, 2, 3, 10, 20, 30))) 10. intersection: 選擇兩個資料集的交集 val array = Array(Array(1,2), Array(3,4), Array(5,6))
sc.parallelize(array).flatMap(r => r).intersection(sc.parallelize(Array(1, 2, 3, 10, 20, 30))) 11. distinct: 元組去重 val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd = sc.parallelize(array).flatMap(r => r).distinct 12. groupByKey: 返回元組序列,其中一個是Iterable型別, 可以給予Iterable 進一步處理 val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd1 = sc.parallelize(array).flatMap(r => r).map(r => (r, 1)).groupByKey() val rdd2 = sc.parallelize(array).flatMap(r => r).map(r => (r, 1)).groupByKey().map(r => (r._1, r._2.toArray.sum)).collect 13. aggregateByKey: vardata=sc.parallelize(List((1,3),(1,2),(1,4),(2,3)))
defseq(a:Int, b:Int):Int={
  println("seq: "+ a +"\t "+ b)   math.max(a,b) } defcomb(a:Int, b:Int):Int={
    println("comb: "+ a +"\t "+ b)
        a + b } data.aggregateByKey(1)(seq, comb).collect 上訴程式碼的執行過程是這樣的: 每組的value值與1 進行比較, 取最大替換value; 然後執行reduceByKey操作,相同key的value 相加, seq對value進行對映, comb對value進行聚合 14.sortByKey: 按照key 重新排列RDD的元組, 可以選擇是升序還是降序 val array = Array(Array(1,2), Array(3,4), Array(5,6))
sc.parallelize(array).flatMap(r => r).map(r => (r, 1)).sortByKey().collect() 15. join: 同一個key的value 合併 (K, V)和(K, W) => (K, (V, W)) val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd = sc.parallelize(array).flatMap(r => r).map(r => (r, 1)).join(sc.parallelize(Array(1,2,3,4)).map(r => (r, 2*r))).collect 16. cogroup: val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd = sc.parallelize(array).flatMap(r => r).map(r => (r, 1)).cogroup(sc.parallelize(Array(1,2,3,4)).map(r => (r, 2*r))).collect 返回: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(1),CompactBuffer(2))), (2,(CompactBuffer(1),CompactBuffer(4))), (3,(CompactBuffer(1, 1),CompactBuffer(6))), (4,(CompactBuffer(1, 1),CompactBuffer(8))), (10,(CompactBuffer(1),CompactBuffer()) 17.cartesian: 返回連個RDD的笛卡爾集 val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd = sc.parallelize(array).flatMap(r => r).map(r => (r, 1)).cartesian(sc.parallelize(Array(1,2,3,4)).map(r => (r, 2*r))).collect 18.pipe: 傳遞一個命令來處理RDD的每一個patition 19. coalesce: 削減分割槽,通常用在過濾完大資料集後,在進行其他操作 val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd = sc.parallelize(array).flatMap(r => r).map(r => (r, 2*r)).coalesce(1) 20. repartition: 重新balance 資料 val array = Array(Array(1,2), Array(3,4), Array(5,6))
val rdd = sc.parallelize(array).flatMap(r => r).map(r => (r, 2*r)).repartition(1) 21. repartitionAndSortWithinPartitions:
重新patition後,在每一個patition上,按key 排序 import org.apache.spark.HashPartitioner val array = Array(Array(1,2), Array(3,4), Array(5,6))
val c = new HashPartitioner(5) val rdd = sc.parallelize(array).flatMap(r => r).map(r => (r, 2*r)).repartitionAndSortWithinPartitions(c)