Spark學習(六)常用運算元整理
阿新 • • 發佈:2018-12-04
常用運算元
- 1、MapPartition
- 2、Map
- 3、MapPartitionWithIndex
- 4、getNumPartitions
- 5、coalesce
- 6、union
- 7、zip
- 8、zipWitIndex
- 9、zipWithUniqueId
- 10、take(n)
- 11、first
- 12、combineByKey
1、MapPartition
遍歷的單位是每一個partition。
遍歷原理:將每一個partition的資料先載入到記憶體,然後再一條一條遍歷。
rdd.mapPartitions((elems:Iterator[Int]) => { println("建立連線") while(elems.hasNext){ println("拼接SQL語句 " + elems.next) } println("提交") elems })
2、Map
遍歷單位是每一條記錄。
3、MapPartitionWithIndex
在遍歷每一個partition的時候能夠拿到每一個分割槽的ID號,這個運算元一般用於測試環境。
rdd.mapPartitionsWithIndex((index,iterator) =>{
println("partitonId: " + index)
while(iterator.hasNext){
println(iterator.next)
}
}).count()
4、getNumPartitions
獲取RDD的分割槽數
val partitionNum1 = rdd. val partitionNum2 = rdd.partitions.length
5、coalesce
coalesce(…,true)若引數為true,說明分割槽的時候需要產生shuffle,若引數為false說明不需要產生shuffle。
增加RDD的分割槽數使用coalesce(…,true)或者repartition
val coalesceRDD1 = facePowerRDD.coalesce(6, true) println("coalesceRDD1.getNumPartitions:" + coalesceRDD1.getNumPartitions) coalesceRDD1.mapPartitionsWithIndex((index,iterator)=>{ println("partitionId" + index) while(iterator.hasNext){ println(iterator.next) } iterator }).count()
減少RDD的分割槽數,使用coalesce(…,false),也可以使用coalesce(…,true)但是效率會降低。
facePowerRDD
.coalesce(2, false)
.mapPartitionsWithIndex((index,iterator)=>{
println("partitionId" + index)
while(iterator.hasNext){
println(iterator.next)
}
iterator
}).count()
6、union
合併,他只是將rdd1與rdd2在邏輯上進行合併,並不會真正進行資料的合併以傳輸。
val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = sc.makeRDD(11 to 20,3)
val unionRDD = rdd1.union(rdd2)
println(unionRDD.getNumPartitions)
7、zip
將兩個RDD進行橫向合併,但是zip是對應位置合併。
比如:非KV格式的RDD1、RDD2 zip KV格式的RDD
val zipRDD = rdd1.zip(rdd2)
zipRDD.foreach(println)
注意:
- 要進行zip的兩個RDD的元素數必須一致。
- 要進行zip的兩個RDD的分割槽數必須一致。
8、zipWitIndex
給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD。
val zipWithIndexRDD = rdd1.zipWithIndex()
zipWithIndexRDD.foreach(println)
zipWithIndexRDD.map(_.swap).lookup(2).foreach(println)
9、zipWithUniqueId
給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD。
每一個分割槽的第一個元素的索引號就是當前分割槽的分割槽號;
每一個分割槽的第二個元素的索引號就是第一個元素+分割槽數。
rdd1
.zipWithUniqueId()
.mapPartitions(iterator=>{
while(iterator.hasNext){
println(iterator.next)
}
iterator
}).count()
10、take(n)
取這個RDD中前n個元素,是action類運算元。
11、first
取這個RDD中第一個元素,與task(1)一樣,也是action類運算元。
rdd1.take(5).foreach(println)
//first = take(1)
println(rdd1.first())
12、combineByKey
rdd.combineByKey(初始化函式,combiner聚合函式,reduce大聚合函式)
combineByKey作用步驟:
- 分組完成後,初始化函式會作用到每組資料的第一個元素上。
- combiner聚合函式作用到每組資料上,得到最終的combiner小聚合結果。
- 將reduce大聚合函式作用在每組資料上。
總結:
val conf = new SparkConf().setMaster(“local”)
- local:使用1個執行緒來模擬。
- local[10]:程式碼在本機使用10個執行緒來模擬spark的執行。
- local[*]:電腦還剩下幾個core,那麼就啟動多少個執行緒來模擬。