1. 程式人生 > >大話Spark(1)-Spark概述與核心概念

大話Spark(1)-Spark概述與核心概念

說到Spark就不得不提MapReduce/Hadoop, 當前越來越多的公司已經把大資料計算引擎從MapReduce升級到了Spark. 至於原因當然是MapReduce的一些侷限性了, 我們一起先來看下Mapreduce的侷限性和Spark如何做的改進.

Spark概述

MapReduce侷限性

1 僅支援Map和Reduce兩種操作
2 處理效率極低

  • Map中間結果寫磁碟,Reduce寫HDFS,多個MR之間通過HDFS交換資料;
  • 任務排程和啟動開銷大
  • 無法充分利用記憶體
  • Map端和Reduce端均需要排序

3 不適合迭代計算(如機器學習,圖計算等),互動處理(資料探勘)和流式處理(實時日誌分析)

4 MapReduce程式設計不夠靈活

Spark的特點

1 高效(比MapReduce快10~100倍)

  • 記憶體計算引擎,提供Cache機制來支援需要反覆迭代計算或多次資料共享,減少資料讀取的IO開銷
  • DAG引擎,減少多次計算之間中間結果寫到HDFS的開銷
  • 使用多執行緒模型來減少task啟動開銷,shuffle過程中避免不必要的sort操作以及減少磁碟IO操作

2 易用
提供了豐富的API,支援Java, Scala, Python和R四中語言
程式碼量比MapReduce少2~5倍
3 與Hadoop整合

  • 讀寫HDFS/Hbase
  • 與YARN整合

小結

IO和記憶體上: MapReduce資料從Map產出會寫本地磁碟,並且排序, Reduce讀取Map產出的資料計算後再產出到HDFS. 所以MapReduce的IO需要的多,並且資料來回在記憶體中載入釋放. 而Spark把資料載入到記憶體中之後(DAG計算引擎)直到計算出結果才產出到HDFS(如果資料量超過記憶體量,也會溢寫到磁碟).
排程上: Spark的每個Executor都有一個執行緒池(有一個執行緒公用的cache,省去程序頻繁啟停的開銷),每一個task佔用其中一個執行緒.
API上:MapReduce只有Map和Reduce操作, Spark有豐富的API使程式設計非常方便靈活.

Spark核心概念

RDD(Resilient Distributed Datasets)

彈性分散式資料集

  • 分佈在叢集中的只讀物件集合(由多個Partition構成)
  • 可以儲存在磁碟或記憶體中(多種儲存級別)
  • 通過並行“轉換”操作構造
  • 失效後自動重構 

RDD可以理解為一份資料在叢集上的抽象, 被分為多個分割槽, 每個分割槽分佈在叢集不同的節點上(如上圖), 從而讓RDD中的資料可以被並行操作(分散式資料集).
RDD有一個重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition, 因為節點故障 導致資料丟了, 那麼RDD會自動通過自己的資料來源重新計算該partition.

RDD的基本操作

RDD有兩種基本操作:Transformation 和 Action

Transformation

  • 通過Scala集合或者Hadoop資料集構造一個新的RDD
  • 通過已有的RDD產生新的RDD(RDD不可修改)

比如:
構造資料集:

val rdd1 = SparkContext.textFile("hdfs://xxx")
val rdd2 = sc.parallelize( Array(1,2,3,4,5))

 

Transformation:

// map(輸入一行,產出一行)
val a = sc.parallelize(1 to 9, 3)
val b = a.map(x => x*2)
a.collect = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
b.collect = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
//上述例子中把原RDD中每個元素都乘以2來產生一個新的RDD

//filter(過濾條件)
val c = a.filter(x => x>5)
c.collect = Array(6, 7, 8, 9)
//上述例子中過濾支取了a中>5的值

//flatMap(輸入一行,產出多行)
val d = a.flatMap(x=> Array(x, x*10))
d.collect = Array(1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 6, 60, 7, 70, 8, 80, 9, 90)
//上述例子中,把a中的一個元素變成了 a 和 a的10倍 2個元素.

 

Action

  • 通過RDD計算得到一個或者一組值
    比如:
//collect(把結果拿到driver端)
//比如transformation中的collect用法

//count(計算行數)
scala> a.count
res5:Long = 9

//reduce(reduce將RDD中元素兩兩傳遞給輸入函式,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函式直到最後只有一個值為止)
a.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
a.reduce((x, y) => x + y)
res7: Int = 45

 

小結

介面定義方式不同
  • Transformation: RDD[x] -> RDD[y]
  • Action: RDD[x] -> Z (Z不是一個RDD,可能是基本型別,陣列等)
惰性執行(Lazy Exception)
  • Transformation 只會記錄RDD轉換關係,並不會觸發計算
  • Action是出發程式分散式執行的運算元

SparkRDD cache/persist

允許將RDD快取到記憶體或者磁碟上,以便於重用
Spark提供了多種快取級別,以便於使用者根據實際需求進行調整

RDD cache的使用

 val data = sc.textFile(“hdfs://nn:8020/input”)
data.cache() //實際上是data.persist(StorageLevel.MEMORY_ONLY) 
//data.persist(StorageLevel.DISK_ONLY_2)

&n