Spark-Core應用詳解之高階篇
三、RDD高階應用
1.RDD的分片數量
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
numSlices: 分片數,一個分片就是一個任務,所以defaultParallelisms是分片數也是相當於並行度。
2.RDD的函式傳遞問題
傳遞RDD函式的時候,要繼承java的Serializable介面,也就是序列化。
序列化
而產生序列化的原因是因為需要分散式讀取,在兩臺節點相互合作的時候,就需要把命令轉換成為二進位制碼,令另一臺機器的jvm接收轉換為可以使用的物件,這也就是反序列化。
所以在我們打成jar包的時候就會產生序列化問題,當我們在spark上執行jar的時候,需要把它傳給很多worker,也就是我們要運用Serializable的原因。
而這個過程也就叫做RDD的傳遞操作。
import org.apache.spark.rdd
class SearchFunctions (val query: String) extends java.io.Serializable{
def isMatch(s: String): Boolean = {
s.contains(query) }
def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 問題:"isMatch"表示"this.isMatch",因此我們要傳遞整個"this"
rdd.filter(isMatch)
}
def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 問題:"query"表示"this.query",因此我們要傳遞整個"this"
rdd.filter(x => x.contains(query))
}
def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 安全:只把我們需要的欄位拿出來放入區域性變數中
val query_ = this.query
rdd.filter(x => x.contains(query_))
}
}
解析一下這個SearchFunction類,首先傳入一個String型別的s,去匹配是否含有query,返回一個boolean值。
getMatchesFunctionReference方法主要用來過濾傳入進來的rdd,因為他在filter內寫進了isMatch方法做過濾的詳細指標。而isMatch又可以表示為this.isMatch,因為rdd.filter這個操作是要分散式執行到很多機器上去,所以這也就是我們要是用序列化操作的原因。
getMatchesFieldReference方法的query也可以表示為this.query,像上所述的一樣,也是要分散式執行到很多臺機器上去。
但是有一個方法,可以不採用序列化操作,那就是產生一個新的引數被this.所賦值,就可以不用序列化了。
小結:
<1>如果RDD的轉換操作中使用到了class中的方法或者變數,那麼該class需要支援例項化。
<2>如果通過區域性變數的方式將class中的變數賦值為區域性變數,那麼就不需要傳遞物件。
3.RDD的執行方式
(1)RDD的依賴關係
窄依賴
:父類的RDD的Partition最多被子RDD的一個Partition使用。
寬依賴
:指的是多個子RDD的Partition會依賴同一個父RDD的Partition會引起shuffle。
只要是xxbyKey基本都是存在shuffle過程的,因為存在混洗。
(2)DAG 有向無環圖
當程序互相矛盾,資源排程出現先後順序問題的時候,需要使用oozie進行資源排程。
(3)RDD的任務劃分
Application:一個執行的jar就是一個應用。
Job:一個Action操作就是一個Job,也就是Hadoop的MR
Stage:按照看窄依賴劃分,下面會詳講。
Task:一個程序就是一個Task。
Stage
以wordCount核心演算法為例:
val file = sc.textFile("hdfs://Master:8020/person.txt")
val words = file.flatMap(_.split(" "))
val word2count = words.map((_,1))
val result = word2count.reduceByKey(_+_)
result.saveAsTextFile("path")
執行的時候是從上往下執行的,但是劃分stage的時候,是從下往上去劃分,如圖
因為最後的saveAsTextFile是一個Action操作,所以被劃分在最外面,也就是藍色的背景部分。
往上倒,之後是reduceByKey,也就是一個寬依賴,混洗操作,所以劃分在Stage2。
在往上看,上面的textFile,flatMap,map操作都是一個窄依賴,所以可以被共同劃分在Stage1。
Stage的結構就像棧結構一樣,先進後出,stage2先被壓入棧底,然後再壓stage1。
4.RDD的持久化
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
RDD的持久化也就是RDD的快取操作,其中,RDD有兩個快取運算元,一個是cache,一個是persist,這兩個的關係就像makeRDD和 parallelize一樣,可以直接呼叫cache,這樣預設的persist引數為void,直接將StorageLevel的儲存級別設定為記憶體儲存(最好的一種儲存),而呼叫persist,填了引數的話,如圖介紹:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
2的意思也就是儲存兩份的意思
scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:26
scala> val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26
scala> cache.cache
res1: cache.type = MapPartitionsRDD[2] at map at <console>:26
scala> nocache.collect
res2: Array[String] = Array(1[1546694698266], 2[1546694698266], 3[1546694698266], 4[1546694698266], 5[1546694698266], 6[1546694698273], 7[1546694698273], 8[1546694698273], 9[1546694698273], 10[1546694698273])
scala> nocache.collect
res3: Array[String] = Array(1[1546694699746], 2[1546694699746], 3[1546694699746], 4[1546694699746], 5[1546694699746], 6[1546694699747], 7[1546694699747], 8[1546694699747], 9[1546694699747], 10[1546694699747])
scala> cache.collect
res4: Array[String] = Array(1[1546694705677], 2[1546694705677], 3[1546694705677], 4[1546694705677], 5[1546694705678], 6[1546694705678], 7[1546694705678], 8[1546694705678], 9[1546694705679], 10[1546694705679])
scala> cache.collect
res5: Array[String] = Array(1[1546694705677], 2[1546694705677], 3[1546694705677], 4[1546694705677], 5[1546694705678], 6[1546694705678], 7[1546694705678], 8[1546694705678], 9[1546694705679], 10[1546694705679])
從程式中,我們可以看出,使用了cache運算元進行快取的,時間不會改變,因為collect輸出的是快取的時間,是不經過計算的,而沒有經過cache進行快取的,所collect的時間是隨時都會變化的。
5.RDD的checkpoint機制
checkpoint和cache都是給RDD做快取作用的,但是他們還是有著顯著區別的,最明顯的區別就是cache把快取寫在了memory中,而checkpoint寫在了hdfs中。
我個人感覺,如果要是小專案的話,還可以,但是要是大專案的話,會導致記憶體超載,如果使用cache進行快取,當某個節點的executor宕機,RDD就會丟失,資料也會沒,而這時候,cache一種自帶的容錯機制,也就是依賴鏈就會起作用,重新把記憶體還原繼續計算,這倒是也可以,但是很浪費資源,浪費記憶體,相反,checkpoint一開始就把快取寫在了hdfs中,也就沒有依賴鏈一說,保證了高容錯性。
scala> sc.setCheckpointDir("hdfs://linux01:8020/checkpoint")
scala> val ch1=sc.parallelize(1 to 2)
ch1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:26
scala> val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at map at <console>:26
scala> ch3.checkpoint
scala> ch2.collect
res10: Array[String] = Array(1[1546695226909], 2[1546695226909])
scala> ch2.collect
res11: Array[String] = Array(1[1546695231735], 2[1546695231736])
scala> ch3.collect
res12: Array[String] = Array(1[1546695237730], 2[1546695237728])
scala> ch3.collect
res13: Array[String] = Array(1[1546695237805], 2[1546695237800])
scala> ch3.collect
res14: Array[String] = Array(1[1546695237805], 2[1546695237800])
scala> ch3.collect
res15: Array[String] = Array(1[1546695237805], 2[1546695237800])
scala>
被checkpoint的RDD第一次collect的時候我們發現時間還是變了,但是第二次就開始執行快取機制了,因為他內部有一個觸發器,並且根據hdfs的儲存目錄可知,最後快取的資料的確被存入了hdfs中。
6.鍵值對RDD資料分割槽
Spark目前可以使用HashPartition和RangePartition進行分割槽,使用者也可以自定義分割槽方法,Hash分割槽為當前的預設分割槽,Spark中分割槽器直接決定了RDD中分割槽的個數、RDD中的每條資料經過shuffle過程屬於哪個分割槽和Reduce的個數。
但是在這裡,HashPartition有一個弊端,就是會導致資料傾斜,因為Hash的本質是除留取餘法進行儲存,所以就會產生這種偶然性,導致大量偶然的資料進來之後會讓其中一個執行緒被擠爆,而其他執行緒佔用的很少。
所以,我們更傾向使用RangePartition,這種分割槽方法採用了水塘抽樣隨機演算法進行資料的儲存,可以讓資料平均的儲存到每一個分割槽中。
注意:
1.只有K-V型別的RDD才有分割槽,非K-V型別的RDD分割槽的值就是None
2.每個RDD 分割槽ID範圍:0~numPartitions-1,決定這個值是屬於哪個分割槽的。
3.當我們自己想製造一個分割槽方法的時候,只需要繼承Partitioner這個抽象類就可以了
具體程式碼:
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
class CustomerPartitioner(numPartition:Int) extends Partitioner{
// 返回分割槽的總數
override def numPartitions: Int = {
numPartition
}
// 根據傳入的key返回分割槽的索引
override def getPartition(key: Any): Int = {
key.toString.toInt % numPartition
}
}
object CustomerPartitioner{
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setAppName("Partition").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(0 to 10,1).zipWithIndex()//把下標拉到一起
print(rdd.mapPartitionsWithIndex((index,items)=>Iterator(index+":"+items.mkString(","))).collect())
val rdd2 = rdd.partitionBy(new CustomerPartitioner(5))
print(rdd2.mapPartitionsWithIndex((index,items)=>Iterator(index+":"+items.mkString(","))).collect())
sc.stop()
}
}
7.RDD的累加器和廣播變數
(1)RDD 的累加器
Spark內部提供了一個累加器,但是隻能用於求和
使用方法:
scala> val blank = sc.textFile("./NOTICE")
notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32
scala> val blanklines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blanklines: org.apache.spark.Accumulator[Int] = 0
scala> val tmp = blank.flatMap(line => {
| if (line == "") {
| blanklines += 1
| }
| line.split(" ")
| })
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36
scala> tmp.count()
res1: Long = 3213
scala> blanklines.value
res2: Int = 171
累加器也是懶執行,所以需要Action操作觸發出來
自定義累加器
程式碼:
package Mapreduce
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
class CustomerAcc extends AccumulatorV2[String,mutable.HashMap[String,Int]] {
private val _hash = new mutable.HashMap[String,Int]()
// 檢測是否為空
override def isZero: Boolean = {
_hash.isEmpty
}
// 拷貝一個新的累加器
override def copy(): AccumulatorV2[String,mutable.HashMap[String,Int]] = {
val copyHash = new CustomerAcc()
// 創造一個copy的累加器,然後用synchronized方法設定同步操作
_hash.synchronized{
copyHash._hash++=_hash
}
copyHash
}
// 重置累加器
override def reset(): Unit = {
_hash.clear()
}
// 每一個分割槽中用於新增資料的方法 小Sum
override def add(v: String) ={
_hash.get(v) match {
case None=>_hash+=((v,1))
case Some(x)=>_hash+=((v,x+1))
}
}
// 合併每一個分割槽的輸出 總Sum
override def merge(other: AccumulatorV2[String,mutable.HashMap[String,Int]]) = {
other match{
case o:AccumulatorV2[String,mutable.HashMap[String,Int]]=>{
for ((k,v)<- o.value){
_hash.get(k) match {
case None=>_hash+=((k,v))
case Some(x)=>_hash+=((k,v+x))
}
}
}
}
}
// 輸出值
override def value(): mutable.HashMap[String,Int] = {
_hash
}
}
object CustomerAcc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Partition1").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val hash = new CustomerAcc()
sc.register(hash)
val rdd = sc.makeRDD(Array("a","b","c","a","b","c","d"))
rdd.foreach(hash.add(_))
for((k,v)<-hash.value){
println("["+k+":"+v+"]")
}
sc.stop()
}
}
總結:
1.建立一個累加器的例項
2.通過sc.register()註冊一個累加器
3.通過累加器例項名.add新增資料
4.通過累加器例項名.value來獲取累加器的值
注:1.不要在轉換中訪問累加器,要在行動中訪問。
2.轉換或者行動中不能訪問累加器的值,只能.add
(2)廣播變數
1.當在定義的方法中定義了一個本地變數,想要和RDD中變數結合傳送給其他節點,那麼這個本地變數會在每一個分割槽中產生一個拷貝
2.但是在使用了廣播變數的情況下,每一個Executor中會有該變數的次copy,[大大節約在分割槽中佔有的快取]
使用方法
scala> val broadcaster = sc.broadcast(Array(1,2,3))
broadcaster: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(2)
scala> broadcaster.value
res2: Array[Int] = Array(1, 2, 3)
適用於高效分發較大的資料物件。