Spark-RDD算子
Spark課堂筆記
Spark生態圈:
Spark Core : RDD(彈性分布式數據集)
Spark SQL
Spark Streaming
Spark MLLib:協同過濾,ALS,邏輯回歸等等 --> 機器學習
Spark Graphx : 圖計算
重點在前三章
-----------------Spark Core------------------------
一、什麽是Spark?特點?
https://spark.apache.org/
Apache Spark™ is a unified analytics engine for large-scale data processing.
特點:快、易用、通用性、兼容性(完全兼容Hadoop)
快:快100倍(Hadoop 3 之前)
易用:支持多種語言開發
通用性:生態系統全。
易用性:兼容Hadoop
spark 取代 Hadoop
二、安裝和部署Spark、Spark 的 HA
1、spark體系結構
Spark的運行方式
Yarn
Standalone:本機調試(demo)
Worker:從節點。每個服務器上,資源和任務的管理者。只負責管理一個節點。
執行過程:
一個Worker 有多個 Executor。 Executor是任務的執行者,按階段(stage)劃分任務。————> RDD
客戶端:Driver Program 提交任務到集群中。
1、spark-submit
2、spark-shell
2、spark的搭建
(1)準備工作:JDK 配置主機名 免密碼登錄
(2)偽分布式模式
在一臺虛擬機上模擬分布式環境(Master和Worker在一個節點上)
export JAVA_HOME=/usr/java/jdk1.8.0_201
export SPARK_MASTER_HOST=node3
export SPARK_MASTER_PORT=7077
(3)全分布式環境
修改slave文件 拷貝到其他兩臺服務器 啟動
3、Spark的 HA
回顧HA;
(*)HDFS Yarn Hbase Spark 主從結構
(*)單點故障
(1)基於文件目錄的單點恢復
(*)本質:還是只有一個主節點Master,創建了一個恢復目錄,保存集群狀態和任務的信息。
當Master掛掉,重新啟動時,會從恢復目錄下讀取狀態信息,恢復出來原來的狀態
用途:用於開發和測試,生產用zookeeper
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM
-Dspark.deploy.recoveryDirectory=/usr/local/spark-2.1.0-bin-hadoop2.7/recovery"
(2)基於Zookeeper :和Hadoop類似
(*)復習一下zookeeper:
相當於一個數據庫,把一些信息存放在zookeeper中,比如集群的信息。
數據同步功能,選舉功能,分布式鎖功能
數據同步:給一個節點中寫入數據,可以同步到其他節點
選舉:Zookeeper中存在不同的角色,Leader Follower。如果Leader掛掉,重新選舉Leader
分布式鎖:秒殺。以目錄節點的方式來保存數據。
修改 spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181
-Dspark.deploy.zookeeper.dir=/spark"
同步到其他兩臺服務器。
在node3 start-all node3 master node4 Worker node5 Worker
在node4 start-master node3 master node4 master(standby) node4 Worker node5 Worker
在node3上kill master
node4 master(Active) node4 Worker node5 Worker
在網頁http://192.168.109.134:8080/ 可以看到相應信息
三、執行Spark的任務:兩個工具
1、spark-submit:用於提交Spark的任務
任務:jar。
舉例:蒙特卡洛求PI(圓周率)。
./spark-submit --master spark://node3:7077 --class
--class指明主程序的名字
/usr/local/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://node3:7077
--class org.apache.spark.examples.SparkPi
/usr/local/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 100
2、spark-shell 相當於REPL
作為一個獨立的Application運行
兩種模式:
(1)本地模式
spark-shell 後面不接任何參數,代表本地模式
Spark context available as ‘sc‘ (master = local[*], app id = local-1554038459298).
sc 是 SparkContext 對象名。 local[*] 代表本地模式,不提交到集群中運行。
(2)集群模式
./spark-submit --master spark://node3:7077 提交到集群中運行
Spark context available as ‘sc‘ (master = spark://node3:7077, app id = app-20190331212447-0000).
master = spark://node3:7077
Spark session available as ‘spark‘
Spark Session 是 2.0 以後提供的,利用 SparkSession 可以訪問spark所有組件。
示例:WordCount程序
(*)處理本地文件,把結果打印到屏幕上
scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.collect
res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
(*)處理HDFS文件,結果保存在hdfs上
sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")
-rw-r--r-- 3 root supergroup 0 2019-03-31 21:43 /output/0331/test_WordCount/_SUCCESS
-rw-r--r-- 3 root supergroup 40 2019-03-31 21:43 /output/0331/test_WordCount/part-00000
-rw-r--r-- 3 root supergroup 31 2019-03-31 21:43 /output/0331/test_WordCount/part-00001
_SUCCESS 代表程序執行成功
part-00000 part-00001 結果文件,分區。裏面內容不重復。
(*)單步運行WordCount ----> RDD
scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> 1+1
res2: Int = 2
scala> rdd1.collect
res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)
scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26
scala> rdd2.collect
res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28
scala> rdd3.collect
res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30
scala> rdd4.collect
res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
RDD 彈性分布式數據集
(1)依賴關系 : 寬依賴和窄依賴
(2)算子:
函數:
Transformation : 延時計算 map flatMap textFile
Action : 立即觸發計算 collect
說明:scala復習
(*)flatten:把嵌套的結果展開
scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
(*)flatmap : 相當於一個 map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)
myList.flatMap(x=>x.map(_*2))
執行過程:
1、將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調用 map(_*2) 方法。x 代表一個List
2、flatten
3、在IDE中開發scala版本和Java版本的WorkCount。
(1)scala版本的WordCount
新建一個工程,把jar引入到工程中。
export jar 點擊下一步下一步,不需要設置main class
把jar上傳到服務器上。
spark-submit --master spark://node3:7077
--class day1025.MyWordCount
/usr/local/tmp_files/Demo1.jar
hdfs://node2:8020/tmp_files/test_WordCount.txt
hdfs://node2:8020/output/1025/demo1
(2)java版本的WordCount
./spark-submit --master spark://node3:7077 --class day0330.JavaWordCount /usr/local/tmp_files/Demo2.jar
四、分析Spark的任務流程
1、分析WordCount程序處理過程
見圖片
2、Spark調度任務的過程
提交到及群眾運行任務時,spark執行任務調度。
見圖片
五、RDD和RDD特性、RDD的算子
1、RDD:彈性分布式數據集
(*)Spark中最基本的數據抽象。
(*)RDD的特性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
1、是一組分區。
RDD由分區組成,每個分區運行在不同的Worker上,通過這種方式來實現分布式計算。
* - A function for computing each split
在RDD中,提供算子處理每個分區中的數據
* - A list of dependencies on other RDDs
RDD存在依賴關系:寬依賴和窄依賴。
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
可以自定義分區規則來創建RDD
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
優先選擇離文件位置近的節點來執行
如何創建RDD?
(1)通過SparkContext.parallelize方法來創建
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
scala> rdd1.partitions.length
res35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29
scala> rdd1.partitions.length
res36: Int = 2
(2)通過外部數據源來創建
sc.textFile()
scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd2: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29
2、算子
(1)Transformation
map(func):相當於for循環,返回一個新的RDD
filter(func):過濾
flatMap(func):flat+map 壓平
mapPartitions(func):對RDD中的每個分區進行操作
mapPartitionsWithIndex(func):對RDD中的每個分區進行操作,可以取到分區號。
sample(withReplacement, fraction, seed):采樣
集合運算
union(otherDataset)
intersection(otherDataset)
distinct([numTasks])):去重
聚合操作:group by
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
排序
sortByKey([ascending], [numTasks])
sortBy(func,[ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
重分區:
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
舉例:
1、創建一個RDD,每個元素乘以2,再排序
scala> val rdd1 = sc.parallelize(Array(3,4,5,100,79,81,6,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:29
scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at map at <console>:31
scala> rdd2.collect
res37: Array[Int] = Array(6, 8, 10, 200, 158, 162, 12, 16)
scala> rdd2.sortBy(x=>x,true).collect
res39: Array[Int] = Array(6, 8, 10, 12, 16, 158, 162, 200)
scala> rdd2.sortBy(x=>x,false).collect
res40: Array[Int] = Array(200, 162, 158, 16, 12, 10, 8, 6)
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true)
過濾出大於20的元素:
scala> val rdd3 = rdd2.filter(_>20)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[53] at filter at <console>:33
scala> rdd3.collect
res41: Array[Int] = Array(200, 158, 162)
2、字符串(字符)類型的RDD
scala> val rdd4 = sc.parallelize(Array("a b c","d e f","g h i"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[54] at parallelize at <console>:29
scala> rdd4.flatMap(_.split(" ")).collect
res42: Array[String] = Array(a, b, c, d, e, f, g, h, i)
3、RDD的集合運算:
scala> val rdd6 = sc.parallelize(List(1,2,3,6,7,8,9,100))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:29
scala> val rdd7 = sc.parallelize(List(1,2,3,4))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at parallelize at <console>:29
scala> val rdd8 = rdd6.union(rdd7)
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[58] at union at <console>:33
scala> rdd8.collect
res43: Array[Int] = Array(1, 2, 3, 6, 7, 8, 9, 100, 1, 2, 3, 4)
scala> rdd8.distinct.collect
res44: Array[Int] = Array(100, 4, 8, 1, 9, 6, 2, 3, 7)
4、分組操作:reduceByKey
<key value>
scala> val rdd1 = sc.parallelize(List(("Tom",1000),("Andy",2000),("Lily",1500)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[62] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("Andy",1000),("Tom",2000),("Mike",500)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at parallelize at <console>:29
scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[64] at union at <console>:33
scala> rdd3.collect
res45: Array[(String, Int)] = Array((Tom,1000), (Andy,2000), (Lily,1500), (Andy,1000), (Tom,2000), (Mike,500))
scala> val rdd4= rdd3.groupByKey
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[65] at groupByKey at <console>:35
scala> rdd4.collect
res46: Array[(String, Iterable[Int])] = Array(
(Tom,CompactBuffer(1000, 2000)),
(Andy,CompactBuffer(2000, 1000)),
(Mike,CompactBuffer(500)), (
Lily,CompactBuffer(1500)))
scala> rdd3.reduceByKey(_+_).collect
res47: Array[(String, Int)] = Array((Tom,3000), (Andy,3000), (Mike,500), (Lily,1500))
reduceByKey will provide much better performance.
官方不推薦使用 groupByKey 推薦使用 reduceByKey
5、cogroup
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[67] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[68] at parallelize at <console>:29
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[70] at cogroup at <console>:33
scala> rdd3.collect
res48: Array[(String, (Iterable[Int], Iterable[Int]))] = Array(
(tom,(CompactBuffer(1, 2),CompactBuffer(1))),
(jerry,(CompactBuffer(3),CompactBuffer(2))),
(shuke,(CompactBuffer(),CompactBuffer(2))),
(kitty,(CompactBuffer(2),CompactBuffer())))
6、reduce操作(Action)
聚合操作
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:29
scala> rdd1.reduce(_+_)
res49: Int = 15
7、需求:按照value排序。
做法:
1、交換,把key 和 value交換,然後調用sortByKey方法
2、再次交換
scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",3),("ketty",2),("shuke",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[72] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("jerry",1),("tom",3),("shuke",5),("ketty",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[73] at parallelize at <console>:29
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[74] at union at <console>:33
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[75] at reduceByKey at <console>:35
scala> rdd4.collect
res50: Array[(String, Int)] = Array((tom,4), (jerry,4), (shuke,7), (ketty,3))
scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[80] at map at <console>:37
scala> rdd5.collect
res51: Array[(String, Int)] = Array((shuke,7), (tom,4), (jerry,4), (ketty,3))
(2)Action
reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement,num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func):與map類似,沒有返回值。
3、特性:
(1)RDD的緩存機制
(*)作用:提高性能
(*)使用:標識RDD可以被緩存 persist cache
(*)可以緩存的位置:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
舉例:測試數據,92萬條
scala> val rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29
scala> rdd1.count --> 直接出發計算
res52: Long = 923452
scala> rdd1.cache --> 標識RDD可以被緩存,不會觸發計算
res53: rdd1.type = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29
scala> rdd1.count --> 和第一步一樣,觸發計算,但是,把結果進行緩存
res54: Long = 923452
scala> rdd1.count --> 從緩存中直接讀出結果
res55: Long = 923452
(2)RDD的容錯機制
Spark課堂筆記
Spark生態圈:Spark Core : RDD(彈性分布式數據集)Spark SQLSpark StreamingSpark MLLib:協同過濾,ALS,邏輯回歸等等 --> 機器學習Spark Graphx : 圖計算
重點在前三章
-----------------Spark Core------------------------一、什麽是Spark?特點?https://spark.apache.org/Apache Spark™ is a unified analytics engine for large-scale data processing.特點:快、易用、通用性、兼容性(完全兼容Hadoop)快:快100倍(Hadoop 3 之前)易用:支持多種語言開發通用性:生態系統全。易用性:兼容Hadoopspark 取代 Hadoop
二、安裝和部署Spark、Spark 的 HA
1、spark體系結構Spark的運行方式YarnStandalone:本機調試(demo)Worker:從節點。每個服務器上,資源和任務的管理者。只負責管理一個節點。執行過程:一個Worker 有多個 Executor。 Executor是任務的執行者,按階段(stage)劃分任務。————> RDD客戶端:Driver Program 提交任務到集群中。1、spark-submit2、spark-shell
2、spark的搭建(1)準備工作:JDK 配置主機名 免密碼登錄(2)偽分布式模式在一臺虛擬機上模擬分布式環境(Master和Worker在一個節點上)export JAVA_HOME=/usr/java/jdk1.8.0_201export SPARK_MASTER_HOST=node3export SPARK_MASTER_PORT=7077(3)全分布式環境修改slave文件 拷貝到其他兩臺服務器 啟動3、Spark的 HA回顧HA;(*)HDFS Yarn Hbase Spark 主從結構(*)單點故障(1)基於文件目錄的單點恢復(*)本質:還是只有一個主節點Master,創建了一個恢復目錄,保存集群狀態和任務的信息。當Master掛掉,重新啟動時,會從恢復目錄下讀取狀態信息,恢復出來原來的狀態用途:用於開發和測試,生產用zookeeperexport SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/usr/local/spark-2.1.0-bin-hadoop2.7/recovery"(2)基於Zookeeper :和Hadoop類似(*)復習一下zookeeper:相當於一個數據庫,把一些信息存放在zookeeper中,比如集群的信息。數據同步功能,選舉功能,分布式鎖功能數據同步:給一個節點中寫入數據,可以同步到其他節點選舉:Zookeeper中存在不同的角色,Leader Follower。如果Leader掛掉,重新選舉Leader分布式鎖:秒殺。以目錄節點的方式來保存數據。修改 spark-env.shexport SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181 -Dspark.deploy.zookeeper.dir=/spark"同步到其他兩臺服務器。在node3 start-all node3 master node4 Worker node5 Worker在node4 start-master node3 master node4 master(standby) node4 Worker node5 Worker在node3上kill masternode4 master(Active) node4 Worker node5 Worker在網頁http://192.168.109.134:8080/ 可以看到相應信息三、執行Spark的任務:兩個工具1、spark-submit:用於提交Spark的任務任務:jar。舉例:蒙特卡洛求PI(圓周率)。./spark-submit --master spark://node3:7077 --class--class指明主程序的名字/usr/local/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://node3:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 1002、spark-shell 相當於REPL 作為一個獨立的Application運行兩種模式:(1)本地模式spark-shell 後面不接任何參數,代表本地模式Spark context available as ‘sc‘ (master = local[*], app id = local-1554038459298).sc 是 SparkContext 對象名。 local[*] 代表本地模式,不提交到集群中運行。(2)集群模式./spark-submit --master spark://node3:7077 提交到集群中運行Spark context available as ‘sc‘ (master = spark://node3:7077, app id = app-20190331212447-0000).master = spark://node3:7077Spark session available as ‘spark‘Spark Session 是 2.0 以後提供的,利用 SparkSession 可以訪問spark所有組件。示例:WordCount程序(*)處理本地文件,把結果打印到屏幕上scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collectres0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))(*)處理HDFS文件,結果保存在hdfs上sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")-rw-r--r-- 3 root supergroup 0 2019-03-31 21:43 /output/0331/test_WordCount/_SUCCESS-rw-r--r-- 3 root supergroup 40 2019-03-31 21:43 /output/0331/test_WordCount/part-00000-rw-r--r-- 3 root supergroup 31 2019-03-31 21:43 /output/0331/test_WordCount/part-00001_SUCCESS 代表程序執行成功part-00000 part-00001 結果文件,分區。裏面內容不重復。(*)單步運行WordCount ----> RDDscala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> 1+1res2: Int = 2
scala> rdd1.collectres3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)
scala> val rdd2 = rdd1.flatMap(_.split(" "))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26
scala> rdd2.collectres4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)
scala> val rdd3 = rdd2.map((_,1))rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28
scala> rdd3.collectres5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))
scala> val rdd4 = rdd3.reduceByKey(_+_)rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30
scala> rdd4.collectres6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
RDD 彈性分布式數據集(1)依賴關系 : 寬依賴和窄依賴(2)算子:函數:Transformation : 延時計算 map flatMap textFileAction : 立即觸發計算 collect說明:scala復習(*)flatten:把嵌套的結果展開scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flattenres21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)(*)flatmap : 相當於一個 map + flattenscala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)myList.flatMap(x=>x.map(_*2))執行過程:1、將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調用 map(_*2) 方法。x 代表一個List2、flatten3、在IDE中開發scala版本和Java版本的WorkCount。
(1)scala版本的WordCount新建一個工程,把jar引入到工程中。export jar 點擊下一步下一步,不需要設置main class 把jar上傳到服務器上。spark-submit --master spark://node3:7077 --class day1025.MyWordCount /usr/local/tmp_files/Demo1.jar hdfs://node2:8020/tmp_files/test_WordCount.txt hdfs://node2:8020/output/1025/demo1(2)java版本的WordCount./spark-submit --master spark://node3:7077 --class day0330.JavaWordCount /usr/local/tmp_files/Demo2.jar四、分析Spark的任務流程1、分析WordCount程序處理過程見圖片2、Spark調度任務的過程提交到及群眾運行任務時,spark執行任務調度。見圖片五、RDD和RDD特性、RDD的算子
1、RDD:彈性分布式數據集(*)Spark中最基本的數據抽象。(*)RDD的特性* Internally, each RDD is characterized by five main properties: * * - A list of partitions 1、是一組分區。RDD由分區組成,每個分區運行在不同的Worker上,通過這種方式來實現分布式計算。
* - A function for computing each split在RDD中,提供算子處理每個分區中的數據 * - A list of dependencies on other RDDs RDD存在依賴關系:寬依賴和窄依賴。 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 可以自定義分區規則來創建RDD * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) 優先選擇離文件位置近的節點來執行如何創建RDD?(1)通過SparkContext.parallelize方法來創建scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29scala> rdd1.partitions.lengthres35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29
scala> rdd1.partitions.lengthres36: Int = 2(2)通過外部數據源來創建sc.textFile()scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")rdd2: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:292、算子(1)Transformationmap(func):相當於for循環,返回一個新的RDDfilter(func):過濾flatMap(func):flat+map 壓平mapPartitions(func):對RDD中的每個分區進行操作mapPartitionsWithIndex(func):對RDD中的每個分區進行操作,可以取到分區號。sample(withReplacement, fraction, seed):采樣集合運算union(otherDataset)intersection(otherDataset)distinct([numTasks])):去重聚合操作:group by groupByKey([numTasks])reduceByKey(func, [numTasks])aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])排序sortByKey([ascending], [numTasks])sortBy(func,[ascending], [numTasks])join(otherDataset, [numTasks])cogroup(otherDataset, [numTasks])cartesian(otherDataset)pipe(command, [envVars])coalesce(numPartitions)重分區:repartition(numPartitions)repartitionAndSortWithinPartitions(partitioner)舉例:1、創建一個RDD,每個元素乘以2,再排序scala> val rdd1 = sc.parallelize(Array(3,4,5,100,79,81,6,8))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:29
scala> val rdd2 = rdd1.map(_*2)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at map at <console>:31
scala> rdd2.collectres37: Array[Int] = Array(6, 8, 10, 200, 158, 162, 12, 16) scala> rdd2.sortBy(x=>x,true).collectres39: Array[Int] = Array(6, 8, 10, 12, 16, 158, 162, 200)
scala> rdd2.sortBy(x=>x,false).collectres40: Array[Int] = Array(200, 162, 158, 16, 12, 10, 8, 6) def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true)過濾出大於20的元素:scala> val rdd3 = rdd2.filter(_>20)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[53] at filter at <console>:33
scala> rdd3.collectres41: Array[Int] = Array(200, 158, 162) 2、字符串(字符)類型的RDDscala> val rdd4 = sc.parallelize(Array("a b c","d e f","g h i"))rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[54] at parallelize at <console>:29
scala> rdd4.flatMap(_.split(" ")).collectres42: Array[String] = Array(a, b, c, d, e, f, g, h, i) 3、RDD的集合運算:scala> val rdd6 = sc.parallelize(List(1,2,3,6,7,8,9,100))rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:29
scala> val rdd7 = sc.parallelize(List(1,2,3,4))rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at parallelize at <console>:29
scala> val rdd8 = rdd6.union(rdd7)rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[58] at union at <console>:33
scala> rdd8.collectres43: Array[Int] = Array(1, 2, 3, 6, 7, 8, 9, 100, 1, 2, 3, 4)
scala> rdd8.distinct.collectres44: Array[Int] = Array(100, 4, 8, 1, 9, 6, 2, 3, 7) 4、分組操作:reduceByKey <key value>scala> val rdd1 = sc.parallelize(List(("Tom",1000),("Andy",2000),("Lily",1500)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[62] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("Andy",1000),("Tom",2000),("Mike",500)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at parallelize at <console>:29
scala> val rdd3 = rdd1 union rdd2rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[64] at union at <console>:33
scala> rdd3.collectres45: Array[(String, Int)] = Array((Tom,1000), (Andy,2000), (Lily,1500), (Andy,1000), (Tom,2000), (Mike,500))
scala> val rdd4= rdd3.groupByKeyrdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[65] at groupByKey at <console>:35
scala> rdd4.collectres46: Array[(String, Iterable[Int])] = Array((Tom,CompactBuffer(1000, 2000)), (Andy,CompactBuffer(2000, 1000)), (Mike,CompactBuffer(500)), (Lily,CompactBuffer(1500)))scala> rdd3.reduceByKey(_+_).collectres47: Array[(String, Int)] = Array((Tom,3000), (Andy,3000), (Mike,500), (Lily,1500))reduceByKey will provide much better performance.官方不推薦使用 groupByKey 推薦使用 reduceByKey5、cogroupscala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[67] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[68] at parallelize at <console>:29
scala> val rdd3 = rdd1.cogroup(rdd2)rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[70] at cogroup at <console>:33
scala> rdd3.collectres48: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))6、reduce操作(Action)聚合操作scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:29
scala> rdd1.reduce(_+_)res49: Int = 157、需求:按照value排序。做法:1、交換,把key 和 value交換,然後調用sortByKey方法2、再次交換scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",3),("ketty",2),("shuke",2)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[72] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("jerry",1),("tom",3),("shuke",5),("ketty",1)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[73] at parallelize at <console>:29
scala> val rdd3 = rdd1.union(rdd2)rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[74] at union at <console>:33
scala> val rdd4 = rdd3.reduceByKey(_+_)rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[75] at reduceByKey at <console>:35
scala> rdd4.collectres50: Array[(String, Int)] = Array((tom,4), (jerry,4), (shuke,7), (ketty,3))
scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[80] at map at <console>:37
scala> rdd5.collectres51: Array[(String, Int)] = Array((shuke,7), (tom,4), (jerry,4), (ketty,3)) (2)Actionreduce(func)collect()count()first()take(n)takeSample(withReplacement,num, [seed])takeOrdered(n, [ordering])saveAsTextFile(path)saveAsSequenceFile(path) saveAsObjectFile(path) countByKey()foreach(func):與map類似,沒有返回值。3、特性:(1)RDD的緩存機制(*)作用:提高性能(*)使用:標識RDD可以被緩存 persist cache(*)可以緩存的位置: val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) /** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() 舉例:測試數據,92萬條scala> val rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29
scala> rdd1.count --> 直接出發計算res52: Long = 923452
scala> rdd1.cache --> 標識RDD可以被緩存,不會觸發計算res53: rdd1.type = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29
scala> rdd1.count --> 和第一步一樣,觸發計算,但是,把結果進行緩存res54: Long = 923452
scala> rdd1.count --> 從緩存中直接讀出結果res55: Long = 923452(2)RDD的容錯機制
Spark-RDD算子