Spark從入門到精通六------RDD的運算元
-
- RDD程式設計API
- RDD運算元
- RDD程式設計API
運算元是RDD中定義的方法,分為轉換(transformantion)和動作(action)。Tranformation運算元並不會觸發Spark提交作業,直至Action運算元才提交任務執行,這是一個延遲計算的設計技巧,可以避免記憶體過快被中間計算佔滿,從而提高記憶體的利用率。
RDD擁有的操作比MR豐富的多,不僅僅包括Map、Reduce操作,還包括filter、sort、join、save、count等操作,並且中間結果不需要儲存,所以Spark比MR更容易方便完成更復雜的任務。
RDD支援兩種型別的操作:
轉換(Transformation)
轉換函式包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce等等。
動作(Action) 在RDD上執行計算,並返回結果給驅動程式(Driver)或寫入檔案系統。
動作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。
collect 該方法把資料收集到driver端 Array陣列型別
所有的transformation只有遇到action才能被執行。
當觸發執行action之後,資料型別不再是rdd了,資料就會儲存到指定檔案系統中,或者直接列印結果或者收集起來。
RDD操作流程示意:
RDD的轉換與操作
wordcount示例,檢視lazy特性。
只有在執行action時,才會真正開始運算,並得到結果或存入檔案中。
-
-
- Transformation
-
RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。
對RDD中的元素執行的操作,實際上就是對RDD中的每一個分割槽的資料進行操作,不需要關注資料在哪個分割槽中。
常用的Transformation:
轉換 |
含義 |
map(func) |
返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成 |
filter(func) |
返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成 |
flatMap(func) |
先map,再flatten壓平 |
union(otherDataset) |
對源RDD和引數RDD求並集後返回一個新的RDD |
intersection(otherDataset) |
對源RDD和引數RDD求交集後返回一個新的RDD |
subtract(otherDataset) |
求差集後返回新的RDD,出現在源rdd中,不在otherrdd中 |
distinct([numTasks])) |
對源RDD進行去重後返回一個新的RDD |
mapPartitions(func) |
類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是 (Int, Interator[T]) => Iterator[U] |
sortBy(func,[ascending], [numTasks]) |
與sortByKey類似,但是更靈活 |
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD |
join(otherDataset, [numTasks]) |
在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K,(Iterable<V>,Iterable<W>))型別的RDD |
cartesian(otherDataset) |
笛卡爾積 |
mapValues(func) |
在一個(K,V)的RDD上呼叫 |
groupBy (func, [numTasks]) |
根據自定義條件進行分組 |
groupByKey([numTasks]) |
在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
針對分割槽內部使用seqOp方法,針對最後的結果使用combOp方法。 |
coalesce(numPartitions) |
用於對RDD進行重新分割槽,第一個引數是分割槽的數量,第二個引數是是否進行shuffle,可不傳,預設不shuffle |
repartition(numPartitions) |
用於對RDD進行重新分割槽,相當於shuffle版的calesce |
groupBy的返回值型別:
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
T: 元素的型別 K : 指定的key
groupByKey
defgroupByKey(): RDD[(K, Iterable[V])]
reduceByKey
優先選擇reduceByKey, 語法更簡潔
效能優越
reduceByKey會進行分割槽內聚合,再經過網路傳輸,傳送到相對應的分割槽中。
sortBy既可以作用於RDD[K] ,還可以作用於RDD[(k,v)]
sortByKey 只能作用於 RDD[K,V] 型別上。
-
-
- Action
-
動作 |
含義 |
reduce(func) |
通過func函式聚集RDD中的所有元素 |
collect() |
在驅動程式中,以陣列的形式返回資料集的所有元素 |
collectAsMap |
類似於collect。該函式用於Pair RDD,最終返回Map型別的結果。 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(類似於take(1)) |
take(n) |
返回一個由資料集的前n個元素組成的陣列 |
saveAsTextFile(path) |
將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統 |
top(n) |
按照預設排序(降序) 取資料 |
takeOrdered(n, [ordering]) |
與top類似,順序相反 預設是升序 |
countByKey() |
針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) |
在資料集的每一個元素上,執行函式func進行更新。foreach,任務在executor中執行,列印資訊也會在executor中顯示 |
foreachPartition |
對分割槽進行操作 |
foreach和foreachPartition
foreachParititon 每次迭代一個分割槽,foreach每次迭代一個元素。
該方法沒有返回值,或者Unit
主要作用於,沒有返回值型別的操作(列印結果,寫入到mysql資料庫中)
在寫入到mysql資料庫的時候,優先使用foreachPartititon* 結果 存入到 mysql中* foreachPartition * 1,map mapPartition 轉換類的運算元, 返回值* 2, 寫mysql 資料庫的連線* 100萬 100萬次的連線* 200 個分割槽 200次連線 一個分割槽中的資料,共用一個連線
foreach和map的區別:
map有返回值,foreach沒有返回值(Unit型別)
map是transformation,lazy執行,foreach是action運算元,觸發任務執行
處理的都是每一條資料。
rdd1.foreach(println) rdd1.foreachPartition(it=>println(it.mkString(""))) |
coalesce和repartition
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
coalesce(n) 原來的分割槽中的資料,不會被分配到多個分割槽中,
將RDD分割槽的數量修改為numPartitions,常用於減少分割槽
第一個引數為重分割槽的數目,第二個為是否進行shuffle,預設為false
當需要調大分割槽時,必須設定shuffle為true,才能有效,否則分割槽數不變
隨機重新shuffle RDD中的資料,並建立numPartitions個分割槽。這個操作總會通過網路來shuffle全部資料。常用於擴大分割槽
分割槽數調大調小,都會shuffle全部資料,是重量級運算元
常用用法?
coalesce(10,true) = reparititon(10)
如果不需要資料的shuffle,減少或者合併分割槽,就使用coalesce(num)
如果需要資料的shuffle,或者需要擴大分割槽數量,優先使用repartition(num)
擴大分割槽,作用:提升並行度(業務邏輯比較複雜,需要提升並行度)
// 重分割槽的api rdd1.coalesce(2) // coalesce(numPartitions: Int, shuffle: Boolean = false rdd1.repartition(2) // coalesce(numPartitions, shuffle = true) // repartition 就是coalesce,第二個引數為true /** repartition 要進行資料的shuffle 不管是擴大分割槽,還是減少分割槽,都進行shuffle * coalesce預設沒有進行資料的shuffle 減少分割槽,直接是分割槽合併。 * coalesce 擴大分割槽, */ val f = (i:Int,it:Iterator[Int]) => it.map(t=> s"part:$i,values:$t") |
另外還有一類可以修改分割槽的方式:
在呼叫shuffle類的運算元時,可以在引數中設定分割槽的數量:
def reduceByKey(func: (V, V) => V, numPartitions: Int): |
mapPartitions和mapPartitionsWithIndex
def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
該函式和map函式類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。
如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效。
該方法,看上去是操作的每一條資料,實際上是對RDD中的每一個分割槽進行iterator,
mapPartitions( it: Iterator => {it.map(x => x * 10)})
mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
類似於mapPartitions, 不過提供了兩個引數,第一個引數為分割槽的索引。
mapPartitionsWithIndex的func接受兩個引數,第一個引數是分割槽的索引,第二個是一個數據集分割槽的迭代器。而輸出的是一個包含經過該函式轉換的迭代器。
val func = (index: Int, iter: Iterator[Int]) => {
iter.map(x => "[partID:" + index + ", val: " + x + "]")
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect
/** * map * mapValues * mapPartition 操作的是每一個分割槽 函式的輸入引數型別是Iterator[元素型別] * mapPartitionWithIndex * */ // 建立rdd 同時指定分割槽的數量為2個 資料會被打散,然後平均分配 val rdd = sc.makeRDD(List(1, 3, 5, 7, 9), 2) // part 0 1 rdd.map({ i => // 具體的元素 i * 10 }) // 該方法每次操作的物件是一個迭代器,對應的就是一個分割槽的資料 rdd.mapPartitions({ it => // 分割槽 it.map(_ * 10) }) // 函式 val f=(index:Int,it:Iterator[Int])=>{ it.map({ t=> s"part:$index,value=$t" }) } val index1: RDD[String] = rdd.mapPartitionsWithIndex({ // 第一個引數,是分割槽的索引 分割槽編號 // 第二個引數: 分割槽的資料 Iterator[Int] case (index, it) => it.map({ t => s"part:$index,value=$t" }) })// ArrayBuffer(part:0,value=1, part:0,value=3, part:1,value=5, part:1,value=7, part:1,value=9) // 收集資料並列印 println(index1.collect().toBuffer) |
collect方法:
不能直接把資料收集到driver段,然後再執行入庫操作。
效率太低,容易引起dirver端崩潰了。OOM