Spark 系列(四)—— RDD常用運算元詳解
一、Transformation
spark 常用的 Transformation 運算元如下表:
Transformation 運算元 | Meaning(含義) |
---|---|
map(func) | 對原 RDD 中每個元素運用 func 函式,並生成新的 RDD |
filter(func) | 對原 RDD 中每個元素使用func 函式進行過濾,並生成新的 RDD |
flatMap(func) | 與 map 類似,但是每一個輸入的 item 被對映成 0 個或多個輸出的 items( func 返回型別需要為 Seq )。 |
mapPartitions(func) | 與 map 類似,但函式單獨在 RDD 的每個分割槽上執行, func |
mapPartitionsWithIndex(func) | 與 mapPartitions 類似,但 func 型別為 (Int,Iterator<T>) => Iterator<U> ,其中第一個引數為分割槽索引 |
sample(withReplacement,fraction,seed) | 資料取樣,有三個可選引數:設定是否放回(withReplacement)、取樣的百分比(fraction)、隨機數生成器的種子(seed); |
union(otherDataset) | 合併兩個 RDD |
intersection(otherDataset) | 求兩個 RDD 的交集 |
distinct([numTasks])) | 去重 |
groupByKey([numTasks]) | 按照 key 值進行分割槽,即在一個 (K,V) 對的 dataset 上呼叫時,返回一個 (K,Iterable<V>) Note: 如果分組是為了在每一個 key 上執行聚合操作(例如,sum 或 average),此時使用 reduceByKey 或 aggregateByKey 效能會更好Note: 預設情況下,並行度取決於父 RDD 的分割槽數。可以傳入 numTasks |
reduceByKey(func,[numTasks]) | 按照 key 值進行分組,並對分組後的資料執行歸約操作。 |
aggregateByKey(zeroValue,numPartitions)(seqOp,combOp,[numTasks]) | 當呼叫(K,V)對的資料集時,返回(K,U)對的資料集,其中使用給定的組合函式和 zeroValue 聚合每個鍵的值。與 groupByKey 類似,reduce 任務的數量可通過第二個引數進行配置。 |
sortByKey([ascending],[numTasks]) | 按照 key 進行排序,其中的 key 需要實現 Ordered 特質,即可比較 |
join(otherDataset,[numTasks]) | 在一個 (K,V) 和 (K,W) 型別的 dataset 上呼叫時,返回一個 (K,(V,W)) pairs 的 dataset,等價於內連線操作。如果想要執行外連線,可以使用 leftOuterJoin ,rightOuterJoin 和 fullOuterJoin 等運算元。 |
cogroup(otherDataset,(Iterable<V>,Iterable<W>)) tuples 的 dataset。 | |
cartesian(otherDataset) | 在一個 T 和 U 型別的 dataset 上呼叫時,返回一個 (T,U) 型別的 dataset(即笛卡爾積)。 |
coalesce(numPartitions) | 將 RDD 中的分割槽數減少為 numPartitions。 |
repartition(numPartitions) | 隨機重新調整 RDD 中的資料以建立更多或更少的分割槽,並在它們之間進行平衡。 |
repartitionAndSortWithinPartitions(partitioner) | 根據給定的 partitioner(分割槽器)對 RDD 進行重新分割槽,並對分割槽中的資料按照 key 值進行排序。這比呼叫 repartition 然後再 sorting(排序)效率更高,因為它可以將排序過程推送到 shuffle 操作所在的機器。 |
下面分別給出這些運算元的基本使用示例:
1.1 map
val list = List(1,2,3)
sc.parallelize(list).map(_ * 10).foreach(println)
// 輸出結果: 10 20 30 (這裡為了節省篇幅去掉了換行,後文亦同)
複製程式碼
1.2 filter
val list = List(3,6,9,10,12,21)
sc.parallelize(list).filter(_ >= 10).foreach(println)
// 輸出: 10 12 21
複製程式碼
1.3 flatMap
flatMap(func)
與 map
類似,但每一個輸入的 item 會被對映成 0 個或多個輸出的 items( func 返回型別需要為 Seq
)。
val list = List(List(1,2),List(3),List(),List(4,5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
// 輸出結果 : 10 20 30 40 50
複製程式碼
flatMap 這個運算元在日誌分析中使用概率非常高,這裡進行一下演示:拆分輸入的每行資料為單個單詞,並賦值為 1,代表出現一次,之後按照單詞分組並統計其出現總次數,程式碼如下:
val lines = List("spark flume spark","hadoop flume hive")
sc.parallelize(lines).flatMap(line => line.split(" ")).
map(word=>(word,1)).reduceByKey(_+_).foreach(println)
// 輸出:
(spark,2)
(hive,1)
(hadoop,1)
(flume,2)
複製程式碼
1.4 mapPartitions
與 map 類似,但函式單獨在 RDD 的每個分割槽上執行, func函式的型別為 Iterator<T> => Iterator<U>
(其中 T 是 RDD 的型別),即輸入和輸出都必須是可迭代型別。
val list = List(1,3,4,5,6)
sc.parallelize(list,3).mapPartitions(iterator => {
val buffer = new ListBuffer[Int]
while (iterator.hasNext) {
buffer.append(iterator.next() * 100)
}
buffer.toIterator
}).foreach(println)
//輸出結果
100 200 300 400 500 600
複製程式碼
1.5 mapPartitionsWithIndex
與 mapPartitions 類似,但 func 型別為 (Int,Iterator<T>) => Iterator<U>
,其中第一個引數為分割槽索引。
val list = List(1,3).mapPartitionsWithIndex((index,iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分割槽:" + iterator.next() * 100)
}
buffer.toIterator
}).foreach(println)
//輸出
0 分割槽:100
0 分割槽:200
1 分割槽:300
1 分割槽:400
2 分割槽:500
2 分割槽:600
複製程式碼
1.6 sample
資料取樣。有三個可選引數:設定是否放回 (withReplacement)、取樣的百分比 (fraction)、隨機數生成器的種子 (seed) :
val list = List(1,6)
sc.parallelize(list).sample(withReplacement = false,fraction = 0.5).foreach(println)
複製程式碼
1.7 union
合併兩個 RDD:
val list1 = List(1,3)
val list2 = List(4,6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// 輸出: 1 2 3 4 5 6
複製程式碼
1.8 intersection
求兩個 RDD 的交集:
val list1 = List(1,5)
val list2 = List(4,6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// 輸出: 4 5
複製程式碼
1.9 distinct
去重:
val list = List(1,4)
sc.parallelize(list).distinct().foreach(println)
// 輸出: 4 1 2
複製程式碼
1.10 groupByKey
按照鍵進行分組:
val list = List(("hadoop",("spark",3),5),("storm",6),("hadoop",2))
sc.parallelize(list).groupByKey().map(x => (x._1,x._2.toList)).foreach(println)
//輸出:
(spark,List(3,5))
(hadoop,List(2,2))
(storm,List(6))
複製程式碼
1.11 reduceByKey
按照鍵進行歸約操作:
val list = List(("hadoop",2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)
//輸出
(spark,8)
(hadoop,4)
(storm,6)
複製程式碼
1.12 sortBy & sortByKey
按照鍵進行排序:
val list01 = List((100,"hadoop"),(90,"spark"),(120,"storm"))
sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
// 輸出
(120,storm)
(90,spark)
(100,hadoop)
複製程式碼
按照指定元素進行排序:
val list02 = List(("hadoop",100),90),120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
// 輸出
(storm,120)
(hadoop,100)
(spark,90)
複製程式碼
1.13 join
在一個 (K,W) 型別的 Dataset 上呼叫時,返回一個 (K,W)) 的 Dataset,等價於內連線操作。如果想要執行外連線,可以使用 leftOuterJoin
,rightOuterJoin
和 fullOuterJoin
等運算元。
val list01 = List((1,"student01"),(2,"student02"),(3,"student03"))
val list02 = List((1,"teacher01"),"teacher02"),"teacher03"))
sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)
// 輸出
(1,(student01,teacher01))
(3,(student03,teacher03))
(2,(student02,teacher02))
複製程式碼
1.14 cogroup
在一個 (K,V) 對的 Dataset 上呼叫時,返回多個型別為 (K,Iterable<W>)) 的元組所組成的 Dataset。
val list01 = List((1,"a"),(1,"b"),"e"))
val list02 = List((1,"A"),"B"),"E"))
val list03 = List((1,"[ab]"),"[bB]"),"eE"),"eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)
// 輸出: 同一個 RDD 中的元素先按照 key 進行分組,然後再對不同 RDD 中的元素按照 key 進行分組
(1,(CompactBuffer(a,a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE,eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))
複製程式碼
1.15 cartesian
計算笛卡爾積:
val list1 = List("A","B","C")
val list2 = List(1,3)
sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)
//輸出笛卡爾積
(A,1)
(A,2)
(A,3)
(B,1)
(B,2)
(B,3)
(C,1)
(C,2)
(C,3)
複製程式碼
1.16 aggregateByKey
當呼叫(K,V)對的資料集時,返回(K,U)對的資料集,其中使用給定的組合函式和 zeroValue 聚合每個鍵的值。與 groupByKey
類似,reduce 任務的數量可通過第二個引數 numPartitions
進行配置。示例如下:
// 為了清晰,以下所有引數均使用具名傳參
val list = List(("hadoop",4),8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
seqOp = math.max(_,_),combOp = _ + _
).collect.foreach(println)
//輸出結果:
(hadoop,3)
(storm,8)
(spark,7)
複製程式碼
這裡使用了 numSlices = 2
指定 aggregateByKey 父操作 parallelize 的分割槽數量為 2,其執行流程如下:
基於同樣的執行流程,如果 numSlices = 1
,則意味著只有輸入一個分割槽,則其最後一步 combOp 相當於是無效的,執行結果為:
(hadoop,3)
(storm,8)
(spark,4)
複製程式碼
同樣的,如果每個單詞對一個分割槽,即 numSlices = 6
,此時相當於求和操作,執行結果為:
(hadoop,5)
(storm,14)
(spark,7)
複製程式碼
aggregateByKey(zeroValue = 0,numPartitions = 3)
的第二個引數 numPartitions
決定的是輸出 RDD 的分割槽數量,想要驗證這個問題,可以對上面程式碼進行改寫,使用 getNumPartitions
方法獲取分割槽數量:
sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
seqOp = math.max(_,combOp = _ + _
).getNumPartitions
複製程式碼
二、Action
Spark 常用的 Action 運算元如下:
Action(動作) | Meaning(含義) |
---|---|
reduce(func) | 使用函式func執行歸約操作 |
collect() | 以一個 array 陣列的形式返回 dataset 的所有元素,適用於小結果集。 |
count() | 返回 dataset 中元素的個數。 |
first() | 返回 dataset 中的第一個元素,等價於 take(1)。 |
take(n) | 將資料集中的前 n 個元素作為一個 array 陣列返回。 |
takeSample(withReplacement,num,[seed]) | 對一個 dataset 進行隨機抽樣 |
takeOrdered(n,[ordering]) | 按自然順序(natural order)或自定義比較器(custom comparator)排序後返回前 n 個元素。只適用於小結果集,因為所有資料都會被載入到驅動程式的記憶體中進行排序。 |
saveAsTextFile(path) | 將 dataset 中的元素以文字檔案的形式寫入本地檔案系統、HDFS 或其它 Hadoop 支援的檔案系統中。Spark 將對每個元素呼叫 toString 方法,將元素轉換為文字檔案中的一行記錄。 |
saveAsSequenceFile(path) | 將 dataset 中的元素以 Hadoop SequenceFile 的形式寫入到本地檔案系統、HDFS 或其它 Hadoop 支援的檔案系統中。該操作要求 RDD 中的元素需要實現 Hadoop 的 Writable 介面。對於 Scala 語言而言,它可以將 Spark 中的基本資料型別自動隱式轉換為對應 Writable 型別。(目前僅支援 Java and Scala) |
saveAsObjectFile(path) | 使用 Java 序列化後儲存,可以使用 SparkContext.objectFile() 進行載入。(目前僅支援 Java and Scala) |
countByKey() | 計算每個鍵出現的次數。 |
foreach(func) | 遍歷 RDD 中每個元素,並對其執行fun函式 |
2.1 reduce
使用函式func執行歸約操作:
val list = List(1,5)
sc.parallelize(list).reduce((x,y) => x + y)
sc.parallelize(list).reduce(_ + _)
// 輸出 15
複製程式碼
2.2 takeOrdered
按自然順序(natural order)或自定義比較器(custom comparator)排序後返回前 n 個元素。需要注意的是 takeOrdered
使用隱式引數進行隱式轉換,以下為其原始碼。所以在使用自定義排序時,需要繼承 Ordering[T]
實現自定義比較器,然後將其作為隱式引數引入。
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
.........
}
複製程式碼
自定義規則排序:
// 繼承 Ordering[T],實現自定義比較器,按照 value 值的長度進行排序
class CustomOrdering extends Ordering[(Int,String)] {
override def compare(x: (Int,String),y: (Int,String)): Int
= if (x._2.length > y._2.length) 1 else -1
}
val list = List((1,"hadoop"),(1,"storm"),"azkaban"),"hive"))
// 引入隱式預設值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list).takeOrdered(5)
// 輸出: Array((1,hive),storm),hadoop),azkaban)
複製程式碼
2.3 countByKey
計算每個鍵出現的次數:
val list = List(("hadoop",10),("azkaban",1))
sc.parallelize(list).countByKey()
// 輸出: Map(hadoop -> 2,storm -> 2,azkaban -> 1)
複製程式碼
2.4 saveAsTextFile
將 dataset 中的元素以文字檔案的形式寫入本地檔案系統、HDFS 或其它 Hadoop 支援的檔案系統中。Spark 將對每個元素呼叫 toString 方法,將元素轉換為文字檔案中的一行記錄。
val list = List(("hadoop",1))
sc.parallelize(list).saveAsTextFile("/usr/file/temp")
複製程式碼
參考資料
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南