1. 程式人生 > >Spark之常用操作

Spark之常用操作

兩個 lis lte div nta group tin 類型 park

-- 篩選
val rdd = sc.parallelize(List("ABC","BCD","DEF")) 
val filtered = rdd.filter(_.contains("C")) 
filtered.collect() 
Result:
Array[String] = Array(ABC, BCD)
-- 相乘
val rdd=sc.parallelize(List(1,2,3,4,5)) 
val times2 = rdd.map(_*2) 
times2.collect() 
Result: 
Array[Int] = Array(2, 4, 6, 8, 10)
-- 分割
val rdd=sc.parallelize(List("Spark is awesome","It is fun")) val fm=rdd.flatMap(str=>str.split(" ")) fm.collect() Result: Array[String] = Array(Spark, is, awesome, It, is, fun) -- 頻數 val word1=fm.map(word=>(word,1)) val wrdCnt=word1.reduceByKey(_+_) wrdCnt.collect() Result: Array[(String, Int)
] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1)) -- 交換 val cntWrd = wrdCnt.map{case (word, count) => (count, word)} cntWrd.groupByKey().collect() Result: Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is))) -- 排重 fm.distinct().collect() Result: Array
[String] = Array(is, It, awesome, Spark, fun) -- 並集 val rdd1=sc.parallelize(List(A,B)) val rdd2=sc.parallelize(List(B,C)) rdd1.union(rdd2).collect() -- 交集 rdd1.intersection(rdd2).collect() -- 笛卡爾積 rdd1.cartesian(rdd2).collect() -- 相減 rdd1.subtract(rdd2).collect() -- 連接 val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot"))) val personSE = sc.parallelize(Seq(("Andy", "Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista"))) personFruit.join(personSE).collect() Result: Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista))) -- 計數 val rdd = sc.parallelize(list(A,B,c)) rdd.count() Result: long = 3 -- 展示數組 val rdd = sc.parallelize(list(A,B,c)) rdd.collect() Result: Array[char] = Array(A, B, c) -- 求和 val rdd = sc.parallelize(list(1,2,3,4)) rdd.reduce(_+_) Result: Int = 10 -- 截取 val rdd = sc.parallelize(list(1,2,3,4)) rdd.take(2) Result: Array[Int] = Array(1, 2) -- 分別格式化 val rdd = sc.parallelize(list(1,2,3,4)) rdd.foreach(x=>println("%s*10=%s".format(x,x*10))) Result: 1*10=10 4*10=40 3*10=30 2*10=20 val rdd = sc.parallelize(list(1,2,3,4)) -- 首項 rdd.first() Result: Int = 1 -- 另存為 val hamlet = sc.textFile("/users/akuntamukkala/temp/gutenburg.txt")
-- 針對兩個pair RDD的轉化操作(rdd = {(1, 2), (3, 4), (3, 6)}  other = {(3, 9)})

-- subtractByKey 刪掉RDD 中鍵與other RDD 中的鍵相同的元素
rdd.subtractByKey(other) {(1, 2)}

-- join 對兩個RDD 進行內連接
rdd.join(other) {(3, (4, 9)), (3,(6, 9))}

-- rightOuterJoin 對兩個RDD 進行連接操作,確保第一個RDD 的鍵必須存在(右外連接)
rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))}

-- leftOuterJoin 對兩個RDD 進行連接操作,確保第二個RDD 的鍵必須存在(左外連接)
rdd.leftOuterJoin(other) {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))}

-- cogroup 將兩個RDD 中擁有相同鍵的數據分組到一起
rdd.cogroup(other) {(1,([2],[])), (3,([4, 6],[9]))}
-- 返回RDD 中的所有元素
rdd.collect() {1, 2, 3, 3}
-- RDD 中的元素個數
rdd.count() 4
-- 各元素在RDD 中出現的次數
rdd.countByValue() {(1, 1),(2, 1),(3, 2)}
-- 從RDD 中返回num 個元素
rdd.take(2) {1, 2}
top(num)

-- 從RDD 中返回最前面的num個元素
rdd.top(2) {3, 3}


-- 從RDD 中按照提供的順序返回最前面的num 個元素
rdd.takeOrdered(2)(myOrdering) {3, 3}


-- 從RDD 中返回任意一些元素
rdd.takeSample(false, 1)


-- 並行整合RDD 中所有數據(例如sum)
rdd.reduce((x, y) => x + y) 9


-- 和reduce() 一樣, 但是需要提供初始值
rdd.fold(0)((x, y) => x + y) 9


-- 和reduce() 相似, 但是通常返回不同類型的函數
rdd.aggregate((0, 0))
((x, y)
=>(x._1 + y, x._2 + 1),
(x, y)
=>(x._1 + y._1, x._2 + y._2))
(
9,4)
--
對RDD 中的每個元素使用給定的函數
rdd.foreach(func)

Spark之常用操作