Spark之RDD程式設計
RDD,全稱Resilient Distributed
Datasets(彈性分散式資料集),是Spark最為核心的概念,是Spark對資料的抽象。RDD是分散式的元素集合,每個RDD只支援讀操作,且每個RDD都被分為多個分割槽儲存到叢集的不同節點上。除此之外,RDD還允許使用者顯示的指定資料儲存到記憶體和磁碟中,掌握了RDD程式設計是SPARK開發的第一步。
1.RDD讀取資料
檔案讀取
scala> val lines = sc.textFile("README.md")
scala> lines.collect()
並行化讀取
scala> var lines = sc.parallelize(List("i love you"))
scala> lines.collect()
2.RDD操作之transformation
RDD的操作可分兩種:
1 轉化操作(transformation) : 由一個RDD生成一個新的RDD
2 行動操作(action) : 對RDD中的元素進行計算,並把結果返回
參考api: org.apache.spark.rdd.RDD
3.下劃線用法
//以下兩句等同
_.split(" ")
line => line.split(" ")
//等同
_ + _
(left, right ) => left + right
2.1.map()
返回:一個新的RDD
scala> val rdd = sc.parallelize(List(1,2,3,3))
scala> val rrr=rdd.map(x => x +1 )
scala> rrr.collect()
Array[Int] = Array(2, 3, 4, 4)
scala> rrr.foreach(println)
2
4
4
3
2.2.flatmap()
返回:迭代器的所有元素組成一個新的RDD
val rdd1 = sc.parallelize(List (1,2,3,3))
scala> rdd1.map(x=>x+1).collect
Array[Int] = Array(2, 3, 4, 4)
scala> rdd1.flatMap(x=>x.to(3)).collect
Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
從上面可以看出flatMap先map然後再flat
2.3.filter()
對每個元素進行篩選,返回符合條件的元素組成的一個新RDD
scala> val rdd = sc.parallelize(List(1,2,3,3))
scala> rdd.filter(x => x != 1).collect()
Array[Int] = Array(2, 3, 3)
兩變數過濾操作
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
對value做控制,key不加限制條件
scala> val result = rdd.filter{case(x,y)=>y%3==0}
scala> result.foreach(println)
(3,6)
scala> val result = rdd.filter{case(x,y)=>y<=4}
scala> result.foreach(println)
(1,2)
(3,4)
對key做控制,value不控制
scala> val result = rdd.filter{case(x,y)=>x<3}
scala> result.foreach(println)
(1,2)
2.4 distinct()
distinct() : 去掉重複元素
scala> rdd.distinct().collect()
Array[Int] = Array(1, 2, 3)
2.5 sample(withReplacement,fration,[seed])
sample():對RDD取樣
- 第一個引數如果為true,可能會有重複的元素,如果為false,不會有重複的元素;
- 第二個引數取值為[0,1],最後的資料個數大約等於第二個引數乘總數;
- 第三個引數為隨機因子。
scala> rdd.sample(false,0.5).collect()
Array[Int] = Array(3, 3)
scala> rdd.sample(false,0.5).collect()
Array[Int] = Array(1, 2)
scala> rdd.sample(false,0.5,10).collect()
Array[Int] = Array(2, 3)
2.6排序
rdd.sortByKey():返回一個根據鍵排序的RDD
資料排序,可以通過接受ascending的引數表示我們是否想要結果按升序排序(預設是true)
scala> val result = rdd.sortByKey().collect()
Array[(Int, Int)] = Array((1,2), (3,4), (3,6))
2.7reduce
rdd.reduceByKey(func):合併具有相同key的value值
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val result = rdd.reduceByKey((x,y)=>x+y)
scala> result.foreach(println)
(1,2)
(3,10)
2.8group
rdd.groupByKey():對具有相同鍵的進行分組 [資料分組]
scala> val result = rdd.groupByKey()
scala> result.foreach(println)
(3,CompactBuffer(4, 6))
(1,CompactBuffer(2))
2.9 mapValues
rdd.mapValues(func):對pairRDD中的每個值應用func 鍵不改變
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val result = rdd.mapValues(x=>x+1)
scala> result.foreach(println)
(1,3)
(3,5)
(3,7)
3.針對兩個pair RDD 的轉化操作
3.1 rdd.subtractByKey( other )
刪除掉RDD中與other RDD中鍵相同的元素
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val other = sc.parallelize(List((3,9)))
scala> val result = rdd.subtractByKey(other)
scala> result.foreach(println)
(1,2)
3.2 join內連線
rdd,other是3.1的變數
scala> val result = rdd.join(other)
scala> result.foreach(println)
(3,(4,9))
(3,(6,9))
3.3 右外連線
rdd,other是3.1的變數
scala> val result = rdd.rightOuterJoin(other)
scala> result.foreach(println)
(3,(Some(4),9))
(3,(Some(6),9))
3.4 左外連線
scala> val result = rdd.leftOuterJoin(other)
scala> result.foreach(println)
(3,(4,Some(9)))
(3,(6,Some(9)))
(1,(2,None))
3.5按相同鍵分組
scala> val result = rdd.cogroup(other)
scala> result.foreach(println)
(1,(CompactBuffer(2),CompactBuffer()))
(3,(CompactBuffer(4, 6),CompactBuffer(9)))
3.6 使用reduceByKey()和mapValues()計算每個鍵對應累加值
x._1表示取元組的第一個值
scala> val rdd = sc.parallelize(List(Tuple2("panda",0),Tuple2("pink",3),Tuple2("pirate",3),Tuple2("panda",1),Tuple2("pink",4)))
scala> val result = rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
scala> result.foreach(println)
(pirate,(3,1))
(panda,(1,2))
(pink,(7,2))
3.7計單詞(wordcount)
用map reduce實現:
scala> val rdd = sc.parallelize(List("i am thinkgamer, i love cyan"))
scala> val words = rdd.flatMap(line => line.split(" "))
scala> val result = words.map(x=>(x,1)).reduceByKey((x,y) => x+y)
scala> result.foreach(println)
(cyan,1)
(love,1)
(thinkgamer,,1)
(am,1)
(i,2)
用countByValue實現:
scala> val rdd = sc.parallelize(List("i am thinkgamer, i love cyan"))
scala> val result = rdd.flatMap(x=>x.split(" ")).countByValue()
scala> result.foreach(println)
(am,1)
(thinkgamer,,1)
(i,2)
(love,1)
(cyan,1)
4.RDD操作之action
下面例子以 val rdd = sc.parallelize(List(1,2,3,3))為輸入
4.1 collect()返回所有元素
scala> rdd.collect()
Array[Int] = Array(1, 2, 3, 3)
4.2 count() :返回元素個數
scala> rdd.count()
Long = 4
4.3 countByValue() : 各個元素出現的次數
scala> rdd.countByValue()
scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
4.4 take(num) : 返回num個元素
scala> rdd.take(2)
Array[Int] = Array(1, 2)
4.5 top(num) : 返回前num個元素
scala> rdd.top(2)
Array[Int] = Array(3, 3)
4.6 foreach(func):對每個元素使用func
scala> rdd.foreach(x => println(x*2))
相關推薦
Spark之RDD程式設計
RDD,全稱Resilient Distributed Datasets(彈性分散式資料集),是Spark最為核心的概念,是Spark對資料的抽象。RDD是分散式的元素集合,每個RDD只支援讀操作,且每個RDD都被分為多個分割槽儲存到叢集的不同節點上
跟我一起學Spark之——RDD Join中寬依賴與窄依賴的判斷
1.規律 如果JoinAPI之前被呼叫的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分割槽數量一致,join結果的rdd分割槽數量也一樣,這個時候join api是窄依賴 除此之外的,rdd 的join api是寬依賴 2.Join的理解
Spark之RDD運算元-轉換運算元
RDD-Transformation 轉換(Transformation)運算元就是對RDD進行操作的介面函式,其作用是將一個或多個RDD變換成新的RDD。 使用Spark進行資料計算,在利用建立運算元生成RDD後,資料處理的演算法設計和程式編寫的最關鍵部分,就是利用
Spark之RDD的屬性
1.一組分片(Partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。
【spark你媽喊你回家吃飯-05】RDD程式設計之旅基礎篇-01
1.RDD工作流程 1.1 RDD理解 RDD是spark特有的資料模型,談到RDD就會提到什麼彈性分散式資料集,什麼有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,大家可以就把RDD當作一個數組,這樣的理解對我們學習RDD的API是非常有幫助的。本文所有示例程式
[2.3]Spark DataFrame操作(二)之通過程式設計動態完成RDD與DataFrame的轉換
參考 場景 一、上一篇部落格將待分析資料影射成JavaBean的欄位,然後通過def createDataFrame(data:java.util.List[_],beanClass:Class[_]):DataFrame完成了RDD與DataFra
Spark核心程式設計之RDD持久化詳解
RDD持久化原理 Spark非常重要的一個功能特性就是可以將RDD持久化在記憶體中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition。這樣的
[1.2]Spark core程式設計(一)之RDD總論與建立RDD的三種方式
參考 場景 RDD的理解 一、RDD是基於工作集的應用抽象;是分散式、函數語言程式設計的抽象。 MapReduce:基於資料集的處理。兩者的共同特徵:位置感知(具體資料在哪裡)、容錯、負載均衡。 基於資料集的處理:從物理儲存裝置上載入資料,然
Spark快速大數據分析之RDD基礎
數學 ref 內存 相關 應用 級別 要求 分數 png Spark 中的RDD 就是一個不可變的分布式對象集合。每個RDD 都被分為多個分區,這些分區運行在集群中的不同節點上。RDD 可以包含Python、Java、Scala中任意類型的對象,甚至可以包含用
spark筆記之RDD的緩存
process color RoCE 就是 發現 mark 其他 動作 blog Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或者緩存數據集。當持久化某個RDD後,每一個節點都將把計算分區結果保存在內存中,對此RDD或衍生出的RDD進行的其他動作中重用
spark core之RDD編程
緩存 code 會有 核心 hdf 機器 end action rdd spark提供了對數據的核心抽象——彈性分布式數據集(Resilient Distributed Dataset,簡稱RDD)。RDD是一個分布式的數據集合,數據可以跨越集群中的
spark筆記之RDD容錯機制之checkpoint
原理 chain for 機制 方式 方法 相對 例如 contex 10.checkpoint是什麽(1)、Spark 在生產環境下經常會面臨transformation的RDD非常多(例如一個Job中包含1萬個RDD)或者具體transformation的RDD本身計算
11.spark sql之RDD轉換DataSet
Once lds nco ldd 方法 att context gin statement 簡介 ??Spark SQL提供了兩種方式用於將RDD轉換為Dataset。 使用反射機制推斷RDD的數據結構 ??當spark應用可以推斷RDD數據結構時,可使用這種方式。這種
Spark函數詳解系列之RDD基本轉換
9.png cal shuff reac 數組a water all conn data 摘要: RDD:彈性分布式數據集,是一種特殊集合 ? 支持多種來源 ? 有容錯機制 ? 可以被緩存 ? 支持並行操作,一個RDD代表一個分區裏的數據集 RDD有兩種操作算子: Tra
學習大資料課程 spark 基於記憶體的分散式計算框架(二)RDD 程式設計基礎使用
學習大資料課程 spark 基於記憶體的分散式計算框架(二)RDD 程式設計基礎使用 1.常用的轉換 假設rdd的元素是: {1,2,2,3} 很多初學者,對大資料的概念都是模糊不清的,大資料是什麼,能做什麼,學的時候,該按照什麼線路去學習,學完
spark RDD官網RDD程式設計指南
http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在較高的層次上,每個Spark應用程式都包含一個驅動程式,該程式執行使用者的主要功能並在叢集上執行各
零基礎入門大資料之spark中rdd部分運算元詳解
先前文章介紹過一些spark相關知識,本文繼續補充一些細節。 我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉
零基礎入門大資料探勘之spark的rdd
本節簡單介紹一下spark下的基本資料結構RDD,方便理解後續的更多操作。 那麼第一個問題,什麼是rdd。我們知道,大資料一般儲存在分散式叢集裡面,那麼你在對其進行處理的時候總得把它讀出來吧,讀出來後總得把它存成某種格式的檔案吧,就好比程式語言裡面的,這個資料是陣列,那麼你可以以陣列
Spark函式詳解系列之RDD基本轉換
摘要: RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集 RDD有兩種操作運算元: Transformation(轉換):Transformation屬於延遲計
# Apache Spark系列技術直播# 第五講【 Spark RDD程式設計入門 】
內容提要:本次講座主要涵蓋Spark RDD程式設計入門基礎,包括: Spark、RDD簡介 RDD API簡介 打包與spark-submit 效能分析與調優基礎 主講人:王道遠(健身) 阿里巴巴計算平臺EMR技術專家 直播時間:2018.12.13(本週四)19:00 - 20