Spark之常用操作
阿新 • • 發佈:2018-01-09
兩個 lis lte div nta group tin 類型 park
-- 篩選 val rdd = sc.parallelize(List("ABC","BCD","DEF")) val filtered = rdd.filter(_.contains("C")) filtered.collect() Result: Array[String] = Array(ABC, BCD) -- 相乘 val rdd=sc.parallelize(List(1,2,3,4,5)) val times2 = rdd.map(_*2) times2.collect() Result: Array[Int] = Array(2, 4, 6, 8, 10) -- 分割val rdd=sc.parallelize(List("Spark is awesome","It is fun")) val fm=rdd.flatMap(str=>str.split(" ")) fm.collect() Result: Array[String] = Array(Spark, is, awesome, It, is, fun) -- 頻數 val word1=fm.map(word=>(word,1)) val wrdCnt=word1.reduceByKey(_+_) wrdCnt.collect() Result: Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1)) -- 交換 val cntWrd = wrdCnt.map{case (word, count) => (count, word)} cntWrd.groupByKey().collect() Result: Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is))) -- 排重 fm.distinct().collect() Result: Array[String] = Array(is, It, awesome, Spark, fun) -- 並集 val rdd1=sc.parallelize(List(‘A‘,‘B‘)) val rdd2=sc.parallelize(List(‘B‘,‘C‘)) rdd1.union(rdd2).collect() -- 交集 rdd1.intersection(rdd2).collect() -- 笛卡爾積 rdd1.cartesian(rdd2).collect() -- 相減 rdd1.subtract(rdd2).collect() -- 連接 val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot"))) val personSE = sc.parallelize(Seq(("Andy", "Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista"))) personFruit.join(personSE).collect() Result: Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista))) -- 計數 val rdd = sc.parallelize(list(‘A‘,‘B‘,‘c‘)) rdd.count() Result: long = 3 -- 展示數組 val rdd = sc.parallelize(list(‘A‘,‘B‘,‘c‘)) rdd.collect() Result: Array[char] = Array(A, B, c) -- 求和 val rdd = sc.parallelize(list(1,2,3,4)) rdd.reduce(_+_) Result: Int = 10 -- 截取 val rdd = sc.parallelize(list(1,2,3,4)) rdd.take(2) Result: Array[Int] = Array(1, 2) -- 分別格式化 val rdd = sc.parallelize(list(1,2,3,4)) rdd.foreach(x=>println("%s*10=%s".format(x,x*10))) Result: 1*10=10 4*10=40 3*10=30 2*10=20 val rdd = sc.parallelize(list(1,2,3,4)) -- 首項 rdd.first() Result: Int = 1 -- 另存為 val hamlet = sc.textFile("/users/akuntamukkala/temp/gutenburg.txt")
-- 針對兩個pair RDD的轉化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)}) -- subtractByKey 刪掉RDD 中鍵與other RDD 中的鍵相同的元素 rdd.subtractByKey(other) {(1, 2)} -- join 對兩個RDD 進行內連接 rdd.join(other) {(3, (4, 9)), (3,(6, 9))} -- rightOuterJoin 對兩個RDD 進行連接操作,確保第一個RDD 的鍵必須存在(右外連接) rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))} -- leftOuterJoin 對兩個RDD 進行連接操作,確保第二個RDD 的鍵必須存在(左外連接) rdd.leftOuterJoin(other) {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))} -- cogroup 將兩個RDD 中擁有相同鍵的數據分組到一起 rdd.cogroup(other) {(1,([2],[])), (3,([4, 6],[9]))}
-- 返回RDD 中的所有元素
rdd.collect() {1, 2, 3, 3}
-- RDD 中的元素個數
rdd.count() 4
-- 各元素在RDD 中出現的次數
rdd.countByValue() {(1, 1),(2, 1),(3, 2)}
-- 從RDD 中返回num 個元素
rdd.take(2) {1, 2} top(num)
-- 從RDD 中返回最前面的num個元素
rdd.top(2) {3, 3}
-- 從RDD 中按照提供的順序返回最前面的num 個元素
rdd.takeOrdered(2)(myOrdering) {3, 3}
-- 從RDD 中返回任意一些元素
rdd.takeSample(false, 1)
-- 並行整合RDD 中所有數據(例如sum)
rdd.reduce((x, y) => x + y) 9
-- 和reduce() 一樣, 但是需要提供初始值
rdd.fold(0)((x, y) => x + y) 9
-- 和reduce() 相似, 但是通常返回不同類型的函數
rdd.aggregate((0, 0))
((x, y) =>(x._1 + y, x._2 + 1),
(x, y) =>(x._1 + y._1, x._2 + y._2))
(9,4)
-- 對RDD 中的每個元素使用給定的函數
rdd.foreach(func)
Spark之常用操作