spark RDD常用運算元(一)
阿新 • • 發佈:2018-11-01
- filter
- 演算法解釋
filter 函式功能是對元素進行過濾,對每個 元 素 應 用 f 函 數, 返 回 值 為 true 的 元 素 在RDD 中保留,返回值為 false 的元素將被過濾掉。 內 部 實 現 相 當 於 生 成 FilteredRDD(this,sc.clean(f))。- 原始檔
- 過濾檔案中的INFO日誌 scala程式碼
var rdd = sc.textFile("D:\\logs\\system.log") var line = rdd.filter(lines => lines.contains("ERROR")).foreach(line => println(line))
- 過濾結果
- map
- 演算法解釋
將原來 RDD 的每個資料項通過 map 中的使用者自定義函式 f 對映轉變為一個新的元素。原始碼中 map 運算元相當於初始化一個 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。- 原始檔
| 同上 |- scala程式碼
var rdd = sc.textFile("D:\\logs\\system.log").cache() var line = rdd.filter(lines => lines.contains("ERROR")) var mspLine = line.map(line => (line.split(" ")(0),line)).foreach(l=>println(l))
- 過濾結果
返回以時間為key,以內容為內容的的元組(tuple)
- flatMap
- 演算法解釋
有時候,我們希望對某個元素生成多個元素,實現該功能的操作叫作 flatMap()
faltMap的函式應用於每一個元素,對於每一個元素返回的是多個元素組成的迭代器(想要了解更多,請參考scala的flatMap和map用法)- 原始檔
| 同上 |- scala程式碼
var flatMapLine = mspLine.flatMap(line => { for (i <- 0 until line._2.length) yield (line._1, line._2, i) }).first()
- 過濾結果
(2018-05-04,2018-05-04 09:39:27.286 [main] 14 ERROR - com.test.dao.Test - XmlFileLoader: 111,0)
- distinct
- 演算法解釋
distinct將RDD中的元素進行去重操作。圖9中的每個方框代表一個RDD分割槽,通過distinct函式,將資料去重。 例如,重複資料V1、 V1去重後只保留一份V1。- 原始檔
var array = List(1, 2, 3, 4, 5, 6, 1, 1)
- scala程式碼
var array = List(1, 2, 3, 4, 5, 6, 1, 1) var rdd = sc.parallelize(array) var rddDistinct = rdd.distinct() println(rddDistinct.collect().mkString(","))
- 過濾結果
4,6,2,1,3,5
- union
- 演算法解釋
使用 union 函式時需要保證兩個 RDD 元素的資料型別相同,返回的 RDD 資料型別和被合併的 RDD 元素資料型別相同,並不進行去重操作,儲存所有元素。如果想去重可以使用 distinct()。同時 Spark 還提供更為簡潔的使用 union 的 API,通過 ++ 符號相當於 union 函式操作- 原始檔
var array = List(1, 2, 3, 4, 5, 6, 1, 1) var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
- scala程式碼
var array = List(1, 2, 3, 4, 5, 6, 1, 1) var rdd = sc.parallelize(array) var array2 = List(1, 2, 3, 4, 5, 6, 1, 1) var rdd2 = sc.parallelize(array) var rddDistinct = rdd ++ rdd2 // or rdd.union(rdd2) println(rddDistinct.collect().mkString(","))
- 過濾結果
1,2,3,4,5,6,1,1,1,2,3,4,5,6,1,1
- intersection
- 演算法解釋
該函式返回兩個RDD的交集,並且去重。intersection 需要混洗資料,比較浪費效能- 原始檔
var array = List(1, 2, 3, 4, 5, 6, 1, 1) var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
- scala程式碼
var array = List(1, 2, 3, 4, 5, 6, 1, 1) var rdd = sc.parallelize(array) var array2 = List(1, 2, 3, 4, 5, 6, 1, 1) var rdd2 = sc.parallelize(array) var rddIntersection = rdd.intersection(rdd2) println(rddIntersection .collect().mkString(","))
- 過濾結果
4,6,2,1,3,5
- subtract
- 演算法解釋
該函式類似於intersection,但返回在RDD中出現,並且不在otherRDD中出現的元素,不去重- 原始檔
var array = List(1, 2, 3, 4, 5, 6, 1, 1) var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
- scala程式碼
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(Array(1, 2, 3)) var substractRDD = rdd .subtract(rdd2) println(substractRDD.collect().mkString(","))
- 過濾結果
4,5
- cartesian
- 演算法解釋
對 兩 個 RDD 內 的 所 有 元 素 進 行 笛 卡 爾 積 操 作。 操 作 後, 內 部 實 現 返 回CartesianRDD。開銷大
例 如: V1 和 另 一 個 RDD 中 的 W1、 W2、 Q5 進 行 笛 卡 爾 積 運 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)- 原始檔
var array = List(1, 2, 3, 4, 5, 6, 1, 1) var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
- scala程式碼
val rdd = sc.parallelize(Array("a", "b", "c", "d", "e")) val rdd2 = sc.parallelize(Array(1, 2, 3)) var cartesianRDD = rdd.cartesian(rdd2) println(substractRDD.collect().mkString(","))
- 過濾結果
(a,1),(b,1),(a,2),(a,3),(b,2),(b,3),(c,1),(d,1),(e,1),(c,2),(c,3),(d,2),(d,3),(e,2),(e,3)
- mapToPair
- 演算法解釋
scala是沒有mapToPair函式的,scala版本map。- 原始檔
val rdd = sc.parallelize(Array("a", "b", "c", "d", "e"))
- scala程式碼
val rdd = sc.parallelize(Array("a", "b", "c", "d", "e")) var mapToPairRDD = rdd.map(a => (a(0), 1)) println(mapToPairRDD.collect().mkString(","))
- 過濾結果
(a,1),(b,1),(c,1),(d,1),(e,1)
- flatMapToPair
- 演算法解釋
類似於xxx連線 mapToPair是一對一,一個元素返回一個元素,而flatMapToPair可以一個元素返回多個,相當於先flatMap,在mapToPair
例子: 將每一個單詞都分成鍵為- 原始檔
val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee"))
- scala程式碼
val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee")) val flatRDD = rdd.flatMap(f => f).map(l => (l, 1)) println(flatRDD.collect().mkString(","))
- 過濾結果
(a,1),(a,1),(b,1),(b,1),(c,1),(c,1),(d,1),(d,1),(e,1),(e,1)
- flatMapToPair
- 演算法解釋
類似於xxx連線 mapToPair是一對一,一個元素返回一個元素,而flatMapToPair可以一個元素返回多個,相當於先flatMap,在mapToPair
例子: 將每一個單詞都分成鍵為- 原始檔
val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee"))
- scala程式碼
val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee")) val flatRDD = rdd.flatMap(f => f).map(l => (l, 1)) println(flatRDD.collect().mkString(","))
- 過濾結果
(a,1),(a,1),(b,1),(b,1),(c,1),(c,1),(d,1),(d,1),(e,1),(e,1)