Spark | 關於Spark常用31個transform運算元程式碼總結以及使用方法介紹
一.Transform型別運算元
1.1 Value 型別
1.1.1 map 運算元
介紹 :
返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成,也就是說RDD中每個元素都會執行一次這個方法
程式碼 :
// 建立SparkConf 設定本地執行模式
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("MapOperator")
// 建立SparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR" )
// 建立資料
val rdd = sc.parallelize(List("李白","韓信","張飛")).cache()
// 使用 Map 運算元
val result = rdd.map((x) => (x,1))
// 列印結果
result.foreach((x) => println(x.toString()))
// 關閉SparkContext
sc.stop()
複製程式碼
1.1.2 mapParatition 運算元
介紹 :
類似於 map,但獨立地在 RDD 的每一個分片上執行,因此在型別為 T 的 RDD 上 執行時,func 的函式型別必須是 Iterator[T] => Iterator[U]。假設有 N 個元素,有 M 個分割槽, 那麼 map 的函式的將被呼叫 N 次,而 mapPartitions 被呼叫 M 次,一個函式一次處理所有分割槽。
程式碼 :
// 建立SparkConf 設定本地執行模式
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("MapPartitionsOperator")
// 建立SparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
/**
* 類似於 map,但獨立地在 RDD 的每一個分片上執行,而 Map 是在
* 每一個元素上執行一次
*/
// 建立資料
val rdd = sc.parallelize(List("李白" ,"張飛")).cache()
// 使用 MapParatitions 運算元
// 補充 : { 一般是 寫程式碼塊的時候用 ( 是 單行程式碼可以直接用
def fun(x : Iterator[String]): Iterator[Tuple2[String,Int]] ={
// 建立Tuple型別的集合 用於儲存資料
// List 需要建立成 var 型別的因為拼接時需要指向新的List的物件
var list = List[Tuple2[String,Int]]()
while (x.hasNext) {
// x
var elem = x.next()
// 將資料存入List中 再使用拼接集合的方式將資料新增到
list = list.:::(List(new Tuple2[String,Int](elem,1)))
// 備註 : 這裡 ::: 和 :: 的區別是 ::: 的引數是 List,而 :: 的引數是元素
}
list.iterator
}
// 可以直接使用匿名函式或者直接定義函式傳入有
// rdd.mapPartitions(fun)
val result = rdd.mapPartitions { x =>
// 建立Tuple型別的集合 用於儲存資料
// List 需要建立成 var 型別的因為拼接時需要指向新的List的物件
var list = List[Tuple2[String,而 :: 的引數是元素
}
list.iterator
}
// 遍歷結果並列印
result.foreach(println(_))
// 關閉SparkContext
sc.stop()
複製程式碼
1.1.3 mapPartitionsWithIndex 運算元
- 作用:mapPartitionsWithIndex(func) 類似於 mapPartitions,但 func 帶有一個整數引數表示分片的索引值,相當於帶索引的
- 因此在型別為 T 的 RDD 上執行時,func 的函式型別必須是(Int,Interator[T]) => Iterator[U];
- 需求:建立一個 RDD,使每個元素跟所在分割槽形成一個元組組成一個新的 RDD
程式碼 :
// 帶索引的 MapPartitions
// 建立SparkConf 設定本地執行模式
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("MapPartitionsWithIndexOperator")
// 建立SparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// 建立資料
val rdd = sc.parallelize(List("李白","張飛")).cache()
def fun(index: Int,x: Iterator[String]): Iterator[Tuple2[Int,String]] = {
// List 需要建立成 var 型別的因為拼接時需要指向新的List的物件
var list = List[Tuple2[Int,String]]()
while(x.hasNext){
// 獲取迭代器中的元素
var elem = x.next()
// 注意 : .::() 的引數是元素,而 .:::() 的引數是 List
list = list.::(new Tuple2[Int,String](index,elem))
}
list.iterator
}
val result = rdd.mapPartitionsWithIndex(fun)
// 遍歷結果並列印
result.foreach(println(_))
// 關閉SparkContext
sc.stop()
複製程式碼
1.1.4 flatMap 運算元
類似於 map,但是每一個輸入元素可以被對映為 0 或多個輸出元素(所以 func 應 該返回一個序列,而不是單一元素)
比如 : 如果正常情況返回的是 :
List(1,2,3)
List(1,3,4)
List(1,4,5)
FlatMap 會將資料壓平,都放到一個List裡面
123 1234 12345
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("FlatMapOperator")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
/**
* 建立RDD :
* 1. 使用 paralize : seq => rdd 的本地集合 : List ...,numSlices : 分割槽數
* 2. 適用 makeRDD
*/
val rdd = sc.makeRDD(1 to 5)
val flatMapResult = rdd.flatMap(1 to _)
val mapResult = rdd.map(1 to _)
// Map 和 Flat Map 的區別
mapResult.foreach(println(_))
flatMapResult.foreach(println(_))
// 關閉 SparkContext
sc.stop()
複製程式碼
1.1.5 map 和 mapParatition的區別
- map():每次處理一條資料。
- mapPartition():每次處理一個分割槽的資料,這個分割槽的資料處理完後,原 RDD 中分割槽的 資料才能釋放,可能導致 OOM。
- 開發指導:當記憶體空間較大的時候建議使用 mapPartition(),以提高處理效率。
1.1.6 glom 運算元
- 作用:將每一個分割槽形成一個陣列,形成新的 RDD 型別時 RDD[Array[T]]
- 需求:建立一個 4 個分割槽的 RDD,並將每個分割槽的資料放到一個陣列
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
//1. 作用:將每一個分割槽形成一個陣列,形成新的 RDD 型別時 RDD[Array[T]]
//2. 需求:建立一個 4 個分割槽的 RDD,並將每個分割槽的資料放到一個陣列
// 生成rdd資料,設定分割槽為 4
val rdd = sc.parallelize(1 to 10,4)
// 注意返回值
val result : RDD[Array[Int]] = rdd.glom()
result.foreach(arr => {
// arr 是 Array 型別
for (i <- 0 until arr.length) {
// 列印結果
println(arr(i))
}
println("==========================================")
})
// 關閉SparkContext
sc.stop()
複製程式碼
1.1.7 groupBy 運算元
- 作用:分組,按照傳入函式的返回值進行分組。將相同的 key 對應的值放入一個迭代器。
- 需求:建立一個 RDD,按照元素模以 2 的值進行分組
- 補充: 這個運算元的效率並不高,不推薦使用,詳情可以檢視原始碼中對應方法的Note部分
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(1 to 10)
val result : RDD[(String,Iterable[Int])]= rdd.groupBy(x => {
var key = ""
x match {
case _ if (x < 3) => {
key = "small"
}
case _ if (x > 3 && x < 5) => {
key = "big"
}
case _ if (x > 5) => {
key = "very big"
}
case _ => {
key = "void"
}
}
key
})
// 遍歷結果
result.foreach(x => {
println("key : " + x._1 + " \t" + x._2)
})
sc.stop()
複製程式碼
1.1.8 filter 運算元
- 作用:過濾。返回一個新的 RDD,該 RDD 由經過 func 函式計算後返回值為 true 的輸入 元素組成。
- 需求:建立一個 RDD(由字串組成),過濾出一個新 RDD(包含”xiao”子串)
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
val rdd = sc.parallelize(1 to 10)
// 過濾。返回一個新的 RDD,該 RDD 由經過 func 函式計算後返回值為 true 的輸入元素組成。
val result : RDD[Int] = rdd.filter(x => {
if (x % 2 == 0){
true
}
false
})
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.1.9 sample 運算元
- 作用:以指定的隨機種子隨機抽樣出數量為 fraction 的資料,
fraction 大小在 [0,1] 代表抽出百分之 多少的資料 比如 fraction = 0.3 表示抽出 30% 的資料
withReplacement 表示是抽出的資料是否放回,true 為有放回的抽樣,false 為無放回的抽樣,seed 用於指定隨機數生成器種子。
- 需求:建立一個 RDD(1-10),從中選擇放回和不放回抽樣
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(1 to 200)
// 抽樣
/**
* 作用:以指定的隨機種子隨機抽樣出數量為 fraction * 總資料量的的資料,
* fraction 大小在 [0,1] 代表抽出百分之 多少的資料 比如 fraction = 0.3 表示抽出 30% 的資料
* withReplacement 表示是抽出的資料是否放回,true 為有放回的抽樣,false 為無放回的抽樣,seed 用於指定隨機數生成
* 器種子。
*/
val result = rdd.sample(true,0.2,1234L)
// 遍歷
result.foreach(println(_))
// 關閉SparkContext
sc.stop()
複製程式碼
1.1.10 distinct 運算元
- 作用:對源 RDD 進行去重後返回一個新的 RDD。預設情況下,只有 8 個並行任務來操 作,但是可以傳入一個可選的 numTasks 引數改變它。
- 需求:建立一個 RDD,使用 distinct() 對其去重
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(List("張","韓","李","王","王"))
// 使用去重
val result = rdd.distinct()
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.1.11 coalesce 運算元
- 作用:縮減分割槽數,用於大資料集過濾後,提高小資料集的執行效率。
如果資料較少的情況下,分割槽數太多其實並不好,
- 需求:建立一個 4 個分割槽的 RDD,對其縮減分割槽
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 產生資料
val rdd = sc.parallelize(1 to 10,4)
// 縮小分割槽
// numPartitions 是重新的分割槽資料,shuffle true 表示重新對資料進行shuffle
// 預設為 false
val result = rdd.coalesce(2,true)
// 重新分割槽後的分割槽數
val numPartitions = result.partitions.size
println(numPartitions)
sc.stop()
複製程式碼
1.1.12 repartition 運算元
- 作用:根據分割槽數,重新通過網路隨機洗牌所有資料。使用 coalesce 運算元也可以達到相同的效果
shuffle 後可以讓資料分佈更均勻
- 需求:建立一個 4 個分割槽的 RDD,對其重新分割槽
程式碼 :
def printPartition(rdd : RDD[Int]): Unit ={
rdd.foreachPartition(f => {
// 遍歷 Iterator 型別的資料的方法
while(f.hasNext){
var element = f.next()
print(element)
}
println()
})
}
def main(args: Array[String]): Unit = {
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(1 to 15,4)
// 列印各個分割槽的資料
printPartition(rdd)
// 重新分割槽 並shuffle 資料
// shuffle 後可以讓資料分佈更均勻
val result = rdd.repartition(3)
println("重新分割槽 並shuffle 資料.....")
printPartition(result)
sc.stop()
}
複製程式碼
1.1.13 coalesce 和 repartition 的區別
- coalesce 重新分割槽,可以選擇是否進行 shuffle 過程。由引數 shuffle: Boolean = false/true 決定。
- repartition 實際上是呼叫的 coalesce,預設是進行 shuffle 的。原始碼如下:
程式碼 :
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions,shuffle = true)
}
複製程式碼
1.1.14 sortBy 運算元
- 作用; sortBy(func,[ascending],[numTasks])使用 func 先對資料進行處理,按照處理後的資料比較結果排序,預設為正序。注意 : 最終返回的結果是排序後的原資料!
- 需求:建立一個 RDD,按照不同的規則進行排序
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
val rdd = sc.parallelize(List(1,4,2,5,8,3))
// 使用 函式先對資料進行處理,然後在對處理過的資料進行排序
// 注意 : 最終返回的仍然是 原資料
// 比如 : 如果是 f % 2,4 % 2 => 0,1 % 2 => 1
// 所以排名是 1 在 4前面 => 1,4
val result = rdd.sortBy(f => {
f % 2
})
// 遍歷結構
result.foreach(println(_))
sc.stop()
複製程式碼
1.1.15 pipe 運算元
- 作用:管道,針對每個分割槽,都執行一個 shell 指令碼,返回輸出的 RDD。 注意:指令碼需要放在 Worker 節點可以訪問到的位置
- 需求:編寫一個指令碼,使用管道將指令碼作用於 RDD 上。
- 注意: 這個在windows上暫時是無法執行的,因為需要執行shell指令碼
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
val rdd = sc.parallelize(List("韓信","白龍","青帝"),1)
val result = rdd.pipe("pipe.sh")
// 列印結果
result.foreach(print(_))
sc.stop()
複製程式碼
1.2 雙 Value 型別互動
1.2.1 union 運算元
- 作用:對源 RDD 和引數 RDD 求並集後返回一個新的 RDD
- 需求:建立兩個 RDD,求並集
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(1 to 5)
val otherRdd = sc.parallelize(3 to 6)
// 求兩個RDD 的並集
val result = rdd.union(otherRdd)
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.2.2 subtract 運算元
- 作用:計算差的一種函式,去除兩個 RDD 中相同的元素,不同的資料將保留下來
比如 : rdd : 1,5,6 otherRdd : 4,6,7,8
它會把 rdd 和 otherRdd 中都有的資料從 rdd 中清除然後把rdd剩餘的資料返回來
結果是 : result : 1,3
- 需求:建立兩個 RDD,求第一個 RDD 與第二個 RDD 的差集
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(1 to 6)
val otherRdd = sc.parallelize(4 to 8)
// 計算兩個rdd 的差集,去除兩個rdd的相同的資料,保留不同的資料
// 特別注意 !!!
// 比如 : rdd : 1,8
// 它會把 rdd 和 otherRdd 中都有的資料從 rdd 中清除然後把rdd剩餘的資料返回來
val result = rdd.subtract(otherRdd)
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.2.3 intersection 運算元
- 作用:對源 RDD 和引數 RDD 求交集後返回一個新的 RDD
- 需求:建立兩個 RDD,求兩個 RDD 的交集
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(1 to 6)
val otherRdd = sc.parallelize(4 to 8)
// 對源 RDD 和引數 RDD 求交集後返回一個新的 RDD
val result = rdd.intersection(otherRdd)
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.2.4 cartesian 運算元
- 作用:笛卡爾積(儘量避免使用)
- 需求:建立兩個 RDD,計算兩個 RDD 的笛卡爾積
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(1 to 6)
val otherRdd = sc.parallelize(4 to 8)
// 計算笛卡爾積
// 謹慎使用,因為計算結果的數量級會很大
// 比如 : 兩個10萬資料的rdd計算笛卡爾 結果是 10萬 * 10萬 => 100億
val result = rdd.cartesian(otherRdd)
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.2.5 zip 運算元
- 作用:將兩個 RDD 組合成 Key/Value 形式的 RDD,這裡預設兩個 RDD 的 partition 數量以 及元素數量都相同,否則會丟擲異常。
- 需求:建立兩個 RDD,並將兩個 RDD 組合到一起形成一個(k,v)RDD
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(List(1,3),1)
val otherRdd = sc.parallelize(List("A","B","C"),1)
//將兩個 RDD 組合成 Key/Value 形式的 RDD,這裡預設兩個 RDD 的 partition 數量以
//及元素數量都相同,否則會丟擲異常 !
val result = rdd.zip(otherRdd)
result.foreach(println(_))
sc.stop()
複製程式碼
1.3 Key-Value 型別
1.3.1 partitionBy 運算元
- 作用:對 pairRDD 進行分割槽操作,如果原有的 partionRDD 和現有的 partionRDD 是一致 的話就不進行分割槽, 否則會生成 ShuffleRDD,即會產生 shuffle 過程。
- 需求:建立一個 4 個分割槽的 RDD,對其重新分割槽
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(Array("AA","BB","CC"),4).map(x => (x,1))
//對 pairRDD 進行分割槽操作,如果原有的 partionRDD 和現有的 partionRDD 是一致
//的話就不進行分割槽, 否則會生成 ShuffleRDD,即會產生 shuffle 過程。
// 檢視分割槽數
println("分割槽數 : ====> " + rdd.partitions.size)
// 對rdd 重新分割槽
val result = rdd.partitionBy(new HashPartitioner(2))
println("重新分割槽數 : ====> " + result.partitions.size)
sc.stop()
複製程式碼
1.3.2 groupByKey 運算元
- 作用:groupByKey 也是對每個 key 進行操作,但只生成一個 sequence。
- 需求:建立一個 pairRDD,將相同 key 對應值聚合到一個 sequence 中,並計算相同 key 對應值的相加結果。
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val pairRdd = sc.parallelize(List("hello","word","hello"))
.map(x => (x,1))
// 將key相同的資料聚合在一起
val result = pairRdd.groupByKey(2).map(x => (x._1,x._2.sum))
// 列印資料
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.3 reduceByKey 運算元
- 在一個(K,V)的 RDD 上呼叫,返回一個(K,V)的 RDD,使用指定的 reduce 函式,將相同 key 的值聚合到一起,reduce 任務的個數可以通過第二個可選的引數來設定。
- 需求:建立一個 pairRDD,計算相同 key 對應值的相加結果
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val pairRdd = sc.parallelize(List("hello",1))
// 注意 : 這兩個傳入的都是 value的值
val result = pairRdd.reduceByKey((v1,v2) => {
v1 + v2
})
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.4 reduceByKey和groupByKey 的區別
- reduceByKey:按照 key 進行聚合,在 shuffle 之前有 combine(預聚合)操作,返回結果 是 RDD[k,v].
- groupByKey:按照 key 進行分組,直接進行 shuffle。
- 開發指導:reduceByKey 比 groupByKey,建議使用。但是需要注意是否會影響業務邏輯
1.3.5 aggregateByKey 運算元
引數:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U,V) => U,combOp: (U,U) => U)
作用:在 kv 對的 RDD 中,,按 key 將 value 進行分組合並,合併時,將每個 value 和初 始值作為 seq 函式的引數,進行計算,返回的結果作為一個新的 kv 對,然後再將結果按照 key 進行合併,最後將每個分組的 value 傳遞給 combine 函式進行計算(先將前兩個 value 進行計算,將返回結果和下一個 value 傳給 combine 函式,以此類推),將 key 與計算結果作 為一個新的 kv 對輸出。
引數描述: (1)zeroValue:給每一個分割槽中的每一個 key 一個初始值; (2)seqOp:函式用於在每一個分割槽中用初始值逐步迭代 value; (3)combOp:函式用於合併每個分割槽中的結果。
需求:建立一個 pairRDD,取出每個分割槽相同 key 對應值的最大值,然後相加
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
val rdd = sc.parallelize(List("A",2).map((x) => (x,1))
// 可以直接轉換pairRdd
val pairRdd = sc.parallelize(List(("a",("a",2),("c",4),("b",6),8)),2)
// 取出每個分割槽相同key對應的最大值,然後相加
//(1)zeroValue:給每一個分割槽中的每一個 key 一個初始值;
//(2)seqOp:函式用於在每一個分割槽中用初始值逐步迭代 value;
//(3)combOp:函式用於合併每個分割槽中的結果。
// 注意 : 整個過程中 ! pairRdd 的key是不參與運算的
val result = pairRdd.aggregateByKey(0)((k,v) => {
// k是 zeroValue,v就是 rdd的value值
math.max(k,v)
},(u1,u2) => {
// 用於合併value的值
u1 + u2
})
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.6 foldByKey 運算元
引數:(zeroValue: V)(func: (V,V) => V): RDD[(K,V)]
- 作用:aggregateByKey 的簡化操作,seqop 和 combop 相同
- 需求:建立一個 pairRDD,計算相同 key 對應值的相加結果
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 可以直接轉換pairRdd
val pairRdd = sc.parallelize(List(("a",2)
//1. 作用:aggregateByKey 的簡化操作,seqop 和 combop 相同
//2. 需求:建立一個 pairRDD,計算相同 key 對應值的相加結果
val result = pairRdd.foldByKey(0)((v1,v2) => {
v1 + v2
})
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.7 combineByKey 運算元
引數:(createCombiner: V => C,mergeValue: (C,V) => C,mergeCombiners: (C,C) => C)
作用:對相同 K,把 V 合併成一個集合。
引數描述:[結合分析圖和程式碼註釋理解] (1)createCombiner: combineByKey() 會遍歷分割槽中的所有元素,因此每個元素的鍵要麼還沒有遇到過,要麼就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫作 createCombiner()的函式來建立那個鍵對應的累加器的初始值 (2)mergeValue: 如果這是一個在處理當前分割槽之前已經遇到的鍵,它會使用 mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併 (3)mergeCombiners: 由於每個分割槽都是獨立處理的, 因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分割槽都有對應同一個鍵的累加器, 就需要使用使用者提供的 mergeCombiners() 方法將各個分割槽的結果進行合併。
需求:建立一個 pairRDD,根據 key 計算每種 key 的均值。(先計算每個 key 出現的次數 以及可以對應值的總和,再相除得到結果
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(
Array(("a",88),95),91),93),98)),2)
val result = rdd.combineByKey(
// createCombiner
// ("a",88) ("a",91) 每種 key("a") 相同的只會生成一個 (88,1) 91,95 會在merge階段合併
(x) => {(x,1)},// mergeValue 這個和階段是合併 key(key是指rdd的key 比如"a" / "b")相同的資料,
// 比如 a 的資料經過 createCombiner 得到的結果是 : (88,95
// 開始 merge acc : (88,1) v: 91,=> (179,2)
(acc:(Int,Int),v) => {(acc._1 + v,acc._2 + 1)},// 這個階段是各個分割槽中相同key的資料進行合併
// 比如 : 分割槽1的是 (179,2) 分割槽2的是 (95,1) => 合併以後就是 (274,3)
// (274,3) 274是
(acc1 : (Int,acc2 : (Int,Int)) =>{(acc1._1 + acc2._1,acc1._2 + acc2._2)}
)
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.8 sortByKey 運算元
- 作用:在一個(K,V)的 RDD 上呼叫,K 必須實現 Ordered 介面,返回一個按照 key 進行排 序的(K,V)的 RDD
- 需求:建立一個 pairRDD,按照 key 的正序和倒序進行排序
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
// 根據key排序 true 為正序,false 為倒序
val result = rdd.sortByKey(true)
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.9 mapValues 運算元
- 針對於(K,V)形式的型別只對 V 進行操作
- 需求:建立一個 pairRDD,並將 value 新增字串"|||"
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(Array((1,"a"),"d"),"b"),(3,"c")))
// 只針對value進行操作
val result = rdd.mapValues(v => {
v + "|||"
})
// 列印結果
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.10 join 運算元
- 作用:在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個相同 key 對應的所有元素對在一 起的(K,(V,W))的 RDD
- 需求:建立兩個 pairRDD,並將 key 相同的資料聚合到一個元組。
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
// 建立資料
val rdd = sc.parallelize(Array((1,"c")))
val otherRdd = sc.parallelize(Array((1,"A"),"B"),"C")))
// 在型別為(K,W)的 RDD 上呼叫,返回一個相同 key 對應的所有元素對在一
// 起的(K,W))的 RDD
val result = rdd.join(otherRdd)
result.foreach(println(_))
sc.stop()
複製程式碼
1.3.11 cogroup 運算元
- 作用:在型別為(K,W)的 RDD 上呼叫,返回一個(K,(Iterable,Iterable))類 型的 RDD
- 需求:建立兩個 pairRDD,並將 key 相同的資料聚合到一個迭代器。
程式碼 :
// 建立 SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// 設定SparkContext的列印日誌的級別
sc.setLogLevel("WARN")
//建立資料
val rdd = sc.parallelize(Array((1,(Iterable<V>,Iterable<W>))類
//型的 RDD
val result = rdd.cogroup(otherRdd)
result.foreach(value => {
val key = value._1
val v1 : Iterable[String] = value._2._1
val v2 : Iterable[String] = value._2._2
print(key + " ")
print(v1 + " ")
print(v2 + " ")
println()
})
sc.stop()
複製程式碼