第二天 -- Spark叢集啟動流程 -- 任務提交流程 -- RDD依賴關係 -- RDD快取 -- 兩個案例
第二天 – Spark叢集啟動流程 – 任務提交流程 – RDD依賴關係 – RDD快取 – 兩個案例
文章目錄
一、Spark叢集啟動流程
- 呼叫start-all.sh指令碼,啟動Master服務,首先執行preStart,檢查超時的Worker
- 開始執行receive方法,不斷接受其它Actor向它傳送過來的請求
- 在呼叫start-all.sh指令碼的時候,會解析slaves配置檔案,決定了在哪幾個節點上啟動Worker服務,Worker服務在啟動的時候,會啟動preStart方法,該方法會向Master進行註冊
- Master收到Worker的註冊資訊後,開始持久化註冊資訊,並響應給Worker
- Worker收到Master傳送過來的響應資訊(MasterUrl),
- Worker開始向Master傳送心跳資訊
二、Spark任務提交流程:
- Driver端向Master端註冊任務
- Master收到Driver端傳送過來的資訊後,把資訊封裝為真正的任務資訊並把任務資訊進行儲存
- Master通知Worker拿取任務資訊並啟動Executor
- Worker向Master拉取任務資訊的同時啟動Executor
- Executor開始向Driver進行註冊
- Driver開始把任務傳送給相應的Executor
三、RDD的依賴關係
RDD和它依賴的父RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
窄依賴
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用(一對一、多對一)
寬依賴
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition(一對多、多對多)
Lineage
RDD只支援粗粒度轉換,即在大量記錄上執行的單個操作。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分割槽。RDD的Lineage會記錄RDD的元資料資訊和轉換行為,當該RDD的部分分割槽資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的資料分割槽。
四、RDD的快取
Spark速度非常快的原因之一,就是在不同操作中可以在記憶體中持久化或快取多個數據集。當持久化某個RDD後,每一個節點都將把計算的分片結果儲存在記憶體中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得後續的動作變得更加迅速。RDD相關的持久化和快取,是Spark最重要的特徵之一。可以說,快取是Spark構建迭代式演算法和快速互動式查詢的關鍵。
RDD快取方式、級別
RDD通過persist方法或cache方法可以將前面的計算結果快取,但是並不是這兩個方法被呼叫時立即快取,而是觸發後面的action時,該RDD將會被快取在計算節點的記憶體中,並供後面重用。
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
通過檢視原始碼發現cache最終也是呼叫了persist方法,預設的儲存級別都是僅在記憶體儲存一份,Spark的儲存級別分為很多種,儲存級別在object StorageLevel中定義的。
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
class StorageLevel的主構造器引數如下:
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
快取有可能丟失,或者儲存於記憶體的資料由於記憶體不足而被刪除,RDD的快取容錯機制保證了即使快取丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的資料會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
五、案例一:基站訊號範圍
需求:求使用者在一定的時間範圍內停留的時間最長的top2的基站範圍
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:求使用者在一定的時間範圍內停留的時間最長的top2的基站範圍
* 思路:
* 1.求出在相同基站停留的總時長
* 2.把基站的經緯度join過來
* 3.按使用者分組,組內取top2
*/
object MobileLocation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 獲取使用者日誌資料檔案
val files: RDD[String] = sc.textFile("F:/scaladata/data/lacduration/log")
// 將使用者日誌資料進行劃分
val splitedLogs: RDD[((String, String), Long)] = files.map(line => {
val fields = line.split(",")
val phone = fields(0) // 手機號
val time = fields(1).toLong // 時間戳
val lac = fields(2) // 基站id
val eventType = fields(3).toInt // 事件型別
val time_long = if (eventType == 1) -time else time
((phone,lac), time_long)
}).cache()
// 使用者在相同的基站停留的總時長
val totalTimeLogs: RDD[((String, String), Long)] = splitedLogs.reduceByKey(_+_)
// 為了便於和基站資訊進行join,需要把資料進行重新整合
val lacAndPhoneAndTime: RDD[(String, (String, Long))] = totalTimeLogs.map(line => {
val phone = line._1._1
val lac = line._1._2
val time = line._2 // 停留的總時長
(lac, (phone, time))
})
// 獲取基站基礎資訊
val lacInfo = sc.textFile("F:/scaladata/data/lacduration/lac_info.txt")
.map(line => {
val fields = line.split(",")
val lac = fields(0) // 基站id
val x = fields(1) // 經度
val y = fields(2) // 維度
(lac,(x,y))
})
// 把經緯度資訊join到使用者訪問資訊中
val joinedLogs: RDD[(String, ((String, Long), (String, String)))] = lacAndPhoneAndTime.join(lacInfo)
// 為了方便將使用者進行分組,把資料進行重新整合
val phoneAndTimeAndXY: RDD[(String, Long, (String, String))] = joinedLogs.map(x => {
val phone = x._2._1._1
val lac = x._1
val time = x._2._1._2
val xy = x._2._2
(phone, time, xy)
})
// 按照使用者進行分組
val grouped: RDD[(String, Iterable[(String, Long, (String, String))])] = phoneAndTimeAndXY.groupBy(_._1)
// 按照使用者訪問基站的總時長進行降序排列
val res: RDD[(String, List[(String, Long, (String, String))])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))
res.foreach(f => println(f))
sc.stop()
}
}
兩個案例的資料檔案下載:點選下載
六、案例二:學科模組網站訪問排名
需求:求每個學科各個模組訪問量後取topN
普通實現
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:求每個學科各個模組訪問量後取topN
* 思路:
* 1.每個學科各個模組的訪問量
* 2.以學科進行分組並在組內排序取topN
*/
object SubjectCount_1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 獲取資料
val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
val fields = line.split("\t")
val url = fields(1) // 使用者請求的url
(url,1)
})
// 按照url進行聚合,得到每個學科各個模組的訪問量
val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)
// 獲取學科資訊並返回所有資料
val subjectAndUrlAndCount: RDD[(String, String, Int)] = sumedLogInfo.map(tup => {
val url = tup._1
val count = tup._2
val subject = new URL(url).getHost
(subject, url, count)
})
// 按照學科資訊進行分組
val groupedLogInfo: RDD[(String, Iterable[(String, String, Int)])] = subjectAndUrlAndCount.groupBy(_._1)
// 在學科資訊組內進行降序排序並取top3
val res: RDD[(String, List[(String, String, Int)])] = groupedLogInfo.mapValues(_.toList.sortBy(_._3).reverse.take(3))
res.foreach(f => println(f))
sc.stop()
}
}
使用快取
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:求每個學科各個模組訪問量後取topN
* 快取應用
*/
object SubjectCount_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 模擬從mysql中獲取的學科資訊
val subjects = Array("http://java.learn.com", "http://ui.learn.com", "http://bigdata.learn.com", "http://android.learn.com", "http://h5.learn.com")
// 獲取資料
val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
val fields = line.split("\t")
val url = fields(1) // 使用者請求的url
(url,1)
})
// 按照url進行聚合,得到每個學科各個模組的訪問量
val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)
for(subject <- subjects){
// 過濾出屬於該學科的各個模組對應的訪問量
val filteredSubjectInfo: RDD[(String, Int)] = sumedLogInfo.filter(_._1.startsWith(subject))
// 開始降序排序並取top3
val res: Array[(String, Int)] = filteredSubjectInfo.sortBy(_._2,false).take(3)
res.foreach(f => println(f))
}
sc.stop()
}
}
自定義分割槽實現
能解決部分資料傾斜問題
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* 需求:求每個學科各個模組訪問量後取topN
* 實現自定義分割槽器,按照學科把不同的學科資訊放到不同的分割槽中
*/
object SubjectCount_3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 獲取資料
val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
val fields = line.split("\t")
val url = fields(1) // 使用者請求的url
(url,1)
})
// 按照url進行聚合,得到每個學科各個模組的訪問量
val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)
// 獲取學科資訊並返回所有資料
val subjectAndUrlAndCount: RDD[(String, (String, Int))] = sumedLogInfo.map(tup => {
val url = tup._1
val count = tup._2
val subject = new URL(url).getHost
(subject, (url, count))
}).cache()
// 呼叫預設的分割槽器進行分割槽,雜湊碰撞導致會出現資料傾斜問題,此時需要自定義分割槽
// val partitioned = subjectAndUrlAndCount.partitionBy(new HashPartitioner(3))
// partitioned.saveAsTextFile("f:/sparkdata/out-20181120-1")
// 獲取所有的學科資訊,需要去重
val subjects: Array[String] = subjectAndUrlAndCount.keys.distinct.collect
// 呼叫自定義分割槽器
val partitioner = new SubjectPartitioner(subjects)
val partitioned = subjectAndUrlAndCount.partitionBy(partitioner)
val res: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
it.toList.sortBy(_._2._2).reverse.take(3).iterator
})
res.saveAsTextFile("f:/sparkdata/out-20181120-2")
sc.stop()
}
}
/**
* 自定義分割槽器
*/
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
// 用於儲存學科資訊和對應的分割槽號
val subjectAndNum = new mutable.HashMap[String,Int]()
// 計數器,用於指定分割槽號
var i = 0
for(subject <- subjects){
subjectAndNum += (subject -> i)
i += 1
}
// 獲取分割槽數
override def numPartitions: Int = subjects.size
// 獲取分割槽號
override def getPartition(key: Any): Int = subjectAndNum.getOrElse(key.toString, 0)
}