1. 程式人生 > 其它 >Spark 入門詳解 α

Spark 入門詳解 α

什麼是Spark

spark是基於記憶體的用於大規模資料處理(離線計算、實時計算、快速查詢)的統一分析引擎。
也是一個生態系統。

Spark的特點

1、速度快
比MapReduce塊10-100倍
2、易用(演算法多)
MR只支援一種計算 演算法,Spark支援多種演算法。
3、通用
Spark可以支援離線計算、實時計算、快速查詢(互動式)、機器學習、圖計算
4、相容性強
支援大資料中現有的Yarn. Mesos等多種排程平臺,可以處理hadoop支援的資料。

Spark發展史

2009 年誕生於加州大學伯克利分校AMP 實驗室
2014年成為 Apache 的頂級專案

Spark為什麼會流行

原因1:優秀的資料模型和計算抽
支援多種計算模型,而且基於記憶體(記憶體比硬碟速度快)
RDD 是一個可以容錯且並行的資料結構
原因2:完善的生態圈(Spark生態圈)

Spark Core:實現Spark 基本功能(RDD)SparK SQL: 操作結構化資料Spark Streaming : 對實時資料進行流式計算Spark MLlib : 機器學習(ML)功能GraphX(圖計算) : 用於圖計算的API

Hadoop VS Spark

Hadoop(HDFS-MR-YARN)Spark
型別基礎平臺, 包含計算, 儲存, 排程分散式計算工具
場景大規模資料集上的批處理迭代計算, 互動式計算, 流計算
價格對機器要求低, 便宜對記憶體有要求, 相對較貴
程式設計正規化Map+Reduce, API 較為底層, 演算法適應性差RDD組成DAG有向無環圖, API 較為頂層, 方便使用
資料儲存結構MapReduce中間計算結果存在HDFS磁碟上, 延遲大RDD中間運算結果存在記憶體中 , 延遲小
執行方式Task以程序方式維護, 任務啟動慢Task以執行緒方式維護, 任務啟動快

Spark執行模式

1.local本地模式(單機)--開發測試使用
2.standalone獨立叢集模式--開發測試使用
3.standalone-HA高可用模式--生產環境使用
4.on yarn叢集模式--生產環境使用

5.on mesos叢集模式--國內使用較少
6.on cloud叢集模式--中小公司未來會更多的使用雲服務

Spark安裝部署

Local模式安裝部署

使用CDH5.14.0-Spark2.2版本

第一步:上傳解壓
第二步:開箱即用(local模式)
進入spark-shell方式
1、Spark-shell
2、Spark-shell --master local[*]
*表示使用當前機器上所有可用的資源
3、Spark-shell --master local[n]
數字N表示在本地模擬N個執行緒來運行當前任務

本地資料計算
val textFile = sc.textFile("file:opt/spark01/tt.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
叢集上的資料計算
val textFile = sc.textFile("hdfs://node01:8020/tt.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://node01:8020/ttt")

standalone叢集模式部署

第一步:上傳並解壓
第二步:修改配置

#配置java環境變數
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的埠
export SPARK_MASTER_PORT=7077

第三步:分發到其他節點
scp -r /export/servers/spark node02:/export/servers
scp -r /export/servers/spark node03:/export/servers

說明:Spark的環境變數可以新增到伺服器環境變數內,但是spark和hadoop有部分腳衝突,需要修改衝突的指令碼中的一個。
第四步:啟動
sbin/start-all.sh
sbin/stop-all.sh

standaloneHA叢集模式部署

第一步:上傳並解壓
第二步:修改配置

#配置java環境變數
export JAVA_HOME=/export/servers/jdk1.8
#指定zookeeper
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
#指定spark Master的埠
export SPARK_MASTER_PORT=7077

第三步同步到其他節點
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0 node02:$PWD
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0 node03:$PWD

第四步: 先啟動ZK ,再啟動spark

on yarn叢集模式 安裝部署

準備工作
Hadoop正常安裝、 單機版本的spark安裝成功
第一步:上傳解壓
第二步:修改配置

#配置java環境變數
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的埠
export SPARK_MASTER_PORT=7077
#設定hadoop配置路徑
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop

第三步:使用spark-submit提交任務(不需要開啟spark)

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master S \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10

引數說明

-deploy-mode 任務執行模式
cluster模式:生產環境中使用該模式
1.Driver程式在YARN叢集中
2.應用的執行結果不能在客戶端顯示
3.該模式下Driver執行ApplicattionMaster這個程序中,如果出現問題,yarn會重啟ApplicattionMaster(Driver)

●client模式:
1.Driver執行在Client上的Spark Submit程序中
2.應用程式執行結果會在客戶端顯示

spark-submit命令用來提交jar包給spark叢集/YARN

--master spark://node01:7077 指定 Master 的地址
--name "appName" 指定程式執行的名稱
--class 程式的main方法所在的類
--jars xx.jar 程式額外使用的 jar 包
--driver-memory 512m Driver執行所需要的記憶體, 預設1g
--executor-memory 2g 指定每個 executor 可用記憶體為 2g, 預設1g
--executor-cores 1 指定每一個 executor 可用的核數
--total-executor-cores 2 指定整個叢集執行任務使用的 cup 核數為 2 個
--queue default 指定任務的對列
--deploy-mode S 指定執行模式(client/cluster)

編寫spark程式碼的流程

前提:建立一個maven專案
編寫程式碼
1、建立spark conf
2、例項一個sparkcontext
3、讀物資料,對資料進行操作(業務邏輯)
4、儲存最終的結果

Jar包執行
講程式碼到匯出成為jar檔案,上傳到叢集,通過spark-submit提交任務

。。。。。。

SparkCore

什麼是RDD
彈性分散式資料集(儲存在記憶體中)
彈性的,RDD中的資料可以儲存在記憶體中或者磁盤裡面
分散式儲存的,可以用於分散式計算
集合,可以存放很多元素
代表一個不可變、可分割槽、裡面的元素可平行計算的集合。
rdd1 rdd2 rdd3 不能改變

RDD的主要屬性

1、資料集的基本組成單位,一組分片或多分割槽
每個分片(每個分割槽)都會被一個計算任務處理,分片數決定並行度(與kafka相同)
使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值(預設值是2)
2、Spark中RDD的計算是以分割槽為單位的,計算函式會被作用在每一個分割槽。
3、一個RDD會依賴於其他多個RDD。
RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算(Spark的容錯機制)

4、對於KV型別的RDD會有一個Partitioner函式, 即RDD的分割槽函式(可選項)
非key-value的RDD的Parititioner的值是None
Partitioner函式決定了RDD本身的分割槽數量,也決定了parent RDD Shuffle輸出時的分割槽數量。

5、一個列表,儲存每個Partition的位置(preferred location)。
計算程式通過列表找到資料

RDD-API

建立RDD
1、由外部儲存系統的資料集建立
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
2、通過已有的RDD經過運算元轉換生成新的RDD
val rdd2=rdd1.flatMap(_.split(" "))
3、由一個已經存在的Scala集合建立
A: val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
B: val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

RDD的方法/運算元分類

RDD的運算元分為兩類:
1.Transformation轉換操作:返回一個新的RDD

2.Action動作操作:返回值不是RDD(無返回值或返回其他的)

如何判斷一個方法是Transformation?還是Action?
當經過轉換後返回 值是rdd表示此操作是個Transformation,反之就是一個Actions。

如何理解Spark的惰性計算?
RDD中的所有轉換都是惰性求值/延遲執行的,也就是說並不會直接計算
遇到Action動作時,這些轉換才會真正執行。沒有遇到不執行。

之所以使用惰性求值/延遲執行,是因為這樣可以在Action時對RDD操作形成DAG有向無環圖進行Stage的劃分和並行優化,這種設計讓Spark更加有效率地執行。

詳細的Transformation和Action

Transformation

轉換含義
map(func)返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成
filter(func)返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成
flatMap(func)類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func)類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)根據fraction指定的比例對資料進行取樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset)對源RDD和引數RDD求並集後返回一個新的RDD
intersection(otherDataset)對源RDD和引數RDD求交集後返回一個新的RDD
distinct([numTasks]))對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks])在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])與sortByKey類似,但是更靈活
join(otherDataset, [numTasks])在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K,(Iterable<V>,Iterable<W>))型別的RDD
cartesian(otherDataset)笛卡爾積
pipe(command, [envVars])對rdd進行管道操作
coalesce(numPartitions)減少 RDD 的分割槽數到指定值。在過濾大量資料之後,可以執行此操作
repartition(numPartitions)重新給 RDD 分割槽

Action

動作含義
reduce(func)通過func函式聚集RDD中的所有元素,這個功能必須是可交換且可並聯的
collect()在驅動程式中,以陣列的形式返回資料集的所有元素
count()返回RDD的元素個數
first()返回RDD的第一個元素(類似於take(1))
take(n)返回一個由資料集的前n個元素組成的陣列
takeSample(withReplacement,num, [seed])返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering])返回自然順序或者自定義順序的前 n 個元素
saveAsTextFile(path)將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字
saveAsSequenceFile(path)將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
saveAsObjectFile(path)將資料集的元素,以 Java 序列化的方式儲存到指定的目錄下
countByKey()針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func)在資料集的每一個元素上,執行函式func進行更新。
foreachPartition(func)在資料集的每一個分割槽上,執行函式func

Spark RDD分割槽原則

1.啟動的時候指定的CPU核數確定了一個引數值:
spark.default.parallelism=指定的CPU核數(叢集模式最小2)
2.對於Scala集合呼叫parallelize(集合,分割槽數)方法,
如果沒有指定分割槽數,就使用spark.default.parallelism,
如果指定了就使用指定的分割槽數(不要指定大於spark.default.parallelism)
3.對於textFile(檔案,分割槽數) defaultMinPartitions
如果沒有指定分割槽數sc.defaultMinPartitions=min(defaultParallelism,2)
如果指定了就使用指定的分割槽數sc.defaultMinPartitions=指定的分割槽數
rdd的分割槽數
對於本地檔案:
rdd的分割槽數 = max(本地file的分片數, sc.defaultMinPartitions)
對於HDFS檔案:
rdd的分割槽數 = max(hdfs檔案的block數目, sc.defaultMinPartitions)
所以如果分配的核數為多個,且從檔案中讀取資料建立RDD,即使hdfs檔案只有1個切片,最後的Spark的RDD的partition數也有可能是2