大話Spark(1)-Spark概述與核心概念
說到Spark就不得不提MapReduce/Hadoop, 當前越來越多的公司已經把大資料計算引擎從MapReduce升級到了Spark. 至於原因當然是MapReduce的一些侷限性了, 我們一起先來看下Mapreduce的侷限性和Spark如何做的改進.
Spark概述
MapReduce侷限性
1 僅支援Map和Reduce兩種操作
2 處理效率極低
- Map中間結果寫磁碟,Reduce寫HDFS,多個MR之間通過HDFS交換資料;
- 任務排程和啟動開銷大
- 無法充分利用記憶體
- Map端和Reduce端均需要排序
3 不適合迭代計算(如機器學習,圖計算等),互動處理(資料探勘)和流式處理(實時日誌分析)
Spark的特點
1 高效(比MapReduce快10~100倍)
- 記憶體計算引擎,提供Cache機制來支援需要反覆迭代計算或多次資料共享,減少資料讀取的IO開銷
- DAG引擎,減少多次計算之間中間結果寫到HDFS的開銷
- 使用多執行緒模型來減少task啟動開銷,shuffle過程中避免不必要的sort操作以及減少磁碟IO操作
2 易用
提供了豐富的API,支援Java, Scala, Python和R四中語言
程式碼量比MapReduce少2~5倍
3 與Hadoop整合
- 讀寫HDFS/Hbase
- 與YARN整合
小結
IO和記憶體上: MapReduce資料從Map產出會寫本地磁碟,並且排序, Reduce讀取Map產出的資料計算後再產出到HDFS. 所以MapReduce的IO需要的多,並且資料來回在記憶體中載入釋放. 而Spark把資料載入到記憶體中之後(DAG計算引擎)直到計算出結果才產出到HDFS(如果資料量超過記憶體量,也會溢寫到磁碟).
排程上: Spark的每個Executor都有一個執行緒池(有一個執行緒公用的cache,省去程序頻繁啟停的開銷),每一個task佔用其中一個執行緒.
API上:MapReduce只有Map和Reduce操作, Spark有豐富的API使程式設計非常方便靈活.
Spark核心概念
RDD(Resilient Distributed Datasets)
彈性分散式資料集
- 分佈在叢集中的只讀物件集合(由多個Partition構成)
- 可以儲存在磁碟或記憶體中(多種儲存級別)
- 通過並行“轉換”操作構造
- 失效後自動重構 
RDD可以理解為一份資料在叢集上的抽象, 被分為多個分割槽, 每個分割槽分佈在叢集不同的節點上(如上圖), 從而讓RDD中的資料可以被並行操作(分散式資料集).
RDD有一個重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition, 因為節點故障 導致資料丟了, 那麼RDD會自動通過自己的資料來源重新計算該partition.
RDD的基本操作
RDD有兩種基本操作:Transformation 和 Action
Transformation
- 通過Scala集合或者Hadoop資料集構造一個新的RDD
- 通過已有的RDD產生新的RDD(RDD不可修改)
比如:
構造資料集:
val rdd1 = SparkContext.textFile("hdfs://xxx") val rdd2 = sc.parallelize( Array(1,2,3,4,5))
Transformation:
// map(輸入一行,產出一行) val a = sc.parallelize(1 to 9, 3) val b = a.map(x => x*2) a.collect = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) b.collect = Array(2, 4, 6, 8, 10, 12, 14, 16, 18) //上述例子中把原RDD中每個元素都乘以2來產生一個新的RDD //filter(過濾條件) val c = a.filter(x => x>5) c.collect = Array(6, 7, 8, 9) //上述例子中過濾支取了a中>5的值 //flatMap(輸入一行,產出多行) val d = a.flatMap(x=> Array(x, x*10)) d.collect = Array(1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 6, 60, 7, 70, 8, 80, 9, 90) //上述例子中,把a中的一個元素變成了 a 和 a的10倍 2個元素.
Action
- 通過RDD計算得到一個或者一組值
比如:
//collect(把結果拿到driver端) //比如transformation中的collect用法 //count(計算行數) scala> a.count res5:Long = 9 //reduce(reduce將RDD中元素兩兩傳遞給輸入函式,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函式直到最後只有一個值為止) a.collect res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) a.reduce((x, y) => x + y) res7: Int = 45
小結
介面定義方式不同
- Transformation: RDD[x] -> RDD[y]
- Action: RDD[x] -> Z (Z不是一個RDD,可能是基本型別,陣列等)
惰性執行(Lazy Exception)
- Transformation 只會記錄RDD轉換關係,並不會觸發計算
- Action是出發程式分散式執行的運算元
SparkRDD cache/persist
允許將RDD快取到記憶體或者磁碟上,以便於重用
Spark提供了多種快取級別,以便於使用者根據實際需求進行調整

RDD cache的使用
val data = sc.textFile(“hdfs://nn:8020/input”) data.cache() //實際上是data.persist(StorageLevel.MEMORY_ONLY) //data.persist(StorageLevel.DISK_ONLY_2)
&n