RDD彈性分布式數據集的基本操作
RDD的中文解釋是彈性分布式數據集。
構造的數據集的時候用的是List(鏈表)或者Array數組類型
/* 使用makeRDD創建RDD */ /* List */ val rdd01 = sc.makeRDD(List(1,2,3,4,5,6)) val r01 = rdd01.map { x => x * x } println(r01.collect().mkString(",")) /* Array */ val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6)) val r02 = rdd02.filter { x => x < 5} println(r02.collect().mkString(",")) val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1) val r03 = rdd03.map { x => x + 1 } println(r03.collect().mkString(",")) /* Array */ val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1) val r04 = rdd04.filter { x => x > 3 } println(r04.collect().mkString(","))
也可以直接用文件系統來構造
1 val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1) 2 val r:RDD[String] = rdd.flatMap { x => x.split(",") } 3 println(r.collect().mkString(","))
RDD的操作分為轉化操作(transformation)和行為操作(action),
轉化操作和行為操作的本質區別
轉化操作使一個RDD轉化為另一個RDD而行動操作就是進行實際的計算
1 val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))2 val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1) 3 val rddFile:RDD[String] = sc.textFile(path, 1) 4 5 val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3)) 6 val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1)) 7 8 /* map操作 */參數是函數,函數應用於RDD每一個元素,返回值是新的RDD 9 println("======map操作======") 10 println(rddInt.map(x => x + 1).collect().mkString(",")) 11 println("======map操作======") 12 /* filter操作 */參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD 13 println("======filter操作======") 14 println(rddInt.filter(x => x > 4).collect().mkString(",")) 15 println("======filter操作======") 16 /* flatMap操作 */參數是函數,函數應用於RDD每一個元素,將元素數據進行拆分,變成叠代器,返回值是新的RDD 17 println("======flatMap操作======") 18 println(rddFile.flatMap { x => x.split(",") }.first()) 19 println("======flatMap操作======") 20 /* distinct去重操作 */沒有參數,將RDD裏的元素進行去重操作方法轉換操作生成一個只包含不同元素的一個新的RDD。開銷很大。 21 println("======distinct去重======") 22 println(rddInt.distinct().collect().mkString(",")) 23 println(rddStr.distinct().collect().mkString(",")) 24 println("======distinct去重======") 25 /* union操作 */會返回一個包含兩個RDD中所有元素的RDD,包含重復數據。 26 println("======union操作======") 27 println(rdd01.union(rdd02).collect().mkString(",")) 28 println("======union操作======") 29 /* intersection操作 */只返回兩個RDD中都有的元素。可能會去掉所有的重復元素。通過網絡混洗來發現共有元素 30 println("======intersection操作======") 31 println(rdd01.intersection(rdd02).collect().mkString(",")) 32 println("======intersection操作======") 33 /* subtract操作 */返回只存在第一個RDD中而不存在第二個RDD中的所有的元素組成的RDD。也需要網絡混洗 34 println("======subtract操作======") 35 println(rdd01.subtract(rdd02).collect().mkString(",")) 36 println("======subtract操作======") 37 /* cartesian操作 */計算兩個RDD的笛卡爾積,轉化操作會返回所有可能的(a,b)對,其中a是源RDD中的元素,而b則來自於另一個RDD。 38 println("======cartesian操作======") 39 println(rdd01.cartesian(rdd02).collect().mkString(",")) 40 println("======cartesian操作======")
以下是行動操作代碼
1 val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) 2 val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1) 3 4 /* count操作 */返回RDD所有元素的個數 5 println("======count操作======") 6 println(rddInt.count()) 7 println("======count操作======") 8 /* countByValue操作 */各元素在RDD中出現次數 9 println("======countByValue操作======") 10 println(rddInt.countByValue()) 11 println("======countByValue操作======") 12 /* reduce操作 */並行整合所有RDD數據,例如求和操作 13 println("======reduce操作======") 14 println(rddInt.reduce((x ,y) => x + y)) 15 println("======reduce操作======") 16 /* fold操作 */和reduce功能一樣,不過fold帶有初始值 17 println("======fold操作======") 18 println(rddInt.fold(0)((x ,y) => x + y)) 19 println("======fold操作======") 20 /* aggregate操作 */和reduce功能一樣,不過fold帶有初始值 21 println("======aggregate操作======") 22 val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2)) 23 println(res._1 + "," + res._2) 24 println("======aggregate操作======") 25 /* foeach操作 */對RDD每個元素都是使用特定函數就是遍歷 26 println("======foeach操作======") 27 println(rddStr.foreach { x => println(x) }) 28 println("======foeach操作======")
.mapValues(x=>(x,1)).//mapValues是對值的操作,不操作key使數據變成(Tom,(26,1))
map()指的是對key進行操作
mapValues()指的是對Values進行操作
first()返回的是dataset中的第一個元素
take(n)返回前n個elements,這個是driverprogram返回的
takeSample(withReplacementnum,seed)抽樣返回一個dataset中的num個元素,隨機種子seed
saveAsTextFile(path)把dataset寫到一個textfile中,或者HDFS支持的文件系統中,spark把每條記錄都轉換為一行記錄,然後寫到file中
saveAsTextFile(path)只能用在key-value對上,然後生成SequenceFile寫到本地或者hadoop文件系統
saveAsObjectFile(path)把dataset寫到一個java序列化的文件中,用sparkContext,objectFile()裝載
countByKey()返回的是key對應的個數的一個map.,作用與一個RDD
參考https://www.cnblogs.com/sharpxiajun/p/5506822.html加上自己的理解
transformation和action的主要區別
接口定義方式不同
1.Transformation:RDD[X]->RDD[Y]
2.Action:RDD[X]->Z(Z不是一個RDD,可能是基本類型,數組等)
執行方式也不同
Transformation只會記錄RDD轉化關系,並不會產生計算(惰性執行,LazyExecution)
Action是觸發程序執行(分布式)的算子
RDD彈性分布式數據集的基本操作