第15課:RDD建立內幕徹底解密
內容:
1.RDD建立的幾個方式
2.RDD建立實戰
3.RDD內幕
第一個RDD:代表了星火應用程式輸入資料的來源
通過轉型來對RDD進行各種運算元的轉換實現演算法
RDD的3種基本的建立方式
1,使用程式中的集合建立RDD;
2,使用本地檔案系統建立RDD;
3,使用HDS建立RDD
其他:
4,基於DB建立RDD
5,基於NoSQL的,例如HBase的
如圖6所示,基於S3建立RDD
如圖7所示,基於資料流建立RDD
1.通過集合建立RDD的實際意義:測試
2.使用本地檔案系統建立RDD的作用:測試大量資料檔案
3.使用HDFS建立RDD:生產環境最常用的RDD建立方式
hadoop是基礎設施,spark是計算核心
下面以程式碼演示通過集合建立RDD:
Object RDDBasedOnCollections {
def main(args:Array[String]) {
val conf = new SparkConf() //建立SparkConf物件
conf.setAppName(“RDDBasedOnCollections”) //設定應用程式名稱,在程式執行的監控介面可以看到這個名稱
conf.setMaster(“local”)
val sc = new SparkContext(conf) //建立SparkContext物件,通過傳入SparkConf例項來定製Spark執行的具體引數和配置資訊。
val numbers = 1 to 100 //建立一個scala集合
val rdd = sc.parallelize(numbers) //建立一個ParallelCollectionRDD
val sum = rdd.reduce(_+_) //1+2=3 3+3=6 6+4=10 ...
println(“1+2+......+99+100=” + sum)
}
}
你可以在再智慧裝置 例如手機 平板 電視 上使用Spark,也可以在PC和Server使用使用Spark。Spark可以執行在一切裝置上,只要有JVM即可。
如果是單臺機,可以通過多執行緒方式模擬分散式
Local模式 預設情況下如果失敗了 就是失敗了。
下面是SparkContext的createTaskScheduler方法的原始碼:
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
logWarning(
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
}
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = try {
val clazz =
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
case "yarn-client" =>
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = try {
val clazz =
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
case MESOS_REGEX(mesosUrl) =>
MesosNativeLibrary.load()
val scheduler = new TaskSchedulerImpl(sc)
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
} else {
new MesosSchedulerBackend(scheduler, sc, mesosUrl)
}
scheduler.initialize(backend)
(backend, scheduler)
case SIMR_REGEX(simrUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
scheduler.initialize(backend)
(backend, scheduler)
case zkUrl if zkUrl.startsWith("zk://") =>
logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
createTaskScheduler(sc, "mesos://" + zkUrl)
case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
}
}
}
通過原始碼可以看出如果使用LOCAL_N_FAILURES_REGEX模式,設定執行緒數和最大失敗次數,如果失敗了可以重試。所以Spark作為一個單機版軟體也是非常強悍的。
未指定並行度的情況下,spark看叢集有多少core就用多少個Core(並行度)。
spark會最大化使用計算資源,計算效率非常高。但如果管理不當會更耗資源。
前面的物件RDDBasedOnCollections 執行時只有一個stage。原因是
程式碼中只有一個reduce,reduce是Action,不會產生RDD,所以也沒有Shuffle。
hadoop的mr已沒有任何應用場景了。
ParallelCollectionRDD 的原始碼如下:
private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
* is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of slices required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map(i => {
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
})
}
seq match {
case r: Range => {
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}).toSeq.asInstanceOf[Seq[Seq[T]]]
}
case nr: NumericRange[_] => {
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
}
case _ => {
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map({
case (start, end) =>
array.slice(start, end).toSeq
}).toSeq
}
}
}
}
可以看出ParallelCollectionRDD可以有兩處引數,seq: Seq[T], numSlices: Int,numSlices如果不指定將會預設利用所有CPU,獲得最高並行度。如果指定numSlices將會按指定的分片(並行度)執行Spark程式
實際上Spark的並行度到底應該設定為多少呢?
最佳實踐:spark並行度:每個core可以承載2-4個partition,
例如:32個core的話可以設為64-128
跟資料規模沒有關係,只跟每個Task計算partition時使用的記憶體使用量和cpu使用時間有關。
blockmanager管理資料的優先位置,在程式啟動時就完成了這個過程。SparkContext在構建DAGScheduler對DAG進行Stage劃分時已經決定好了每一個數據分片的優先位置。
無論資料是放在記憶體還是磁碟還是Tachyon上,都由BlockManager管理。
下面再看一下ParallelCollectionPartition的原始碼:
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
// instead.
// UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
//讀取資料時呼叫ParallelCollectionRDD.slice並轉換為陣列。
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
//對陣列分片,將每一片資料變成ParallelCollectionPartition
}
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
override def getPreferredLocations(s: Partition): Seq[String] = {
locationPrefs.getOrElse(s.index, Nil)
//獲取資料的優先位置。
}
}
下面通過讀取本地檔案建立RDD:
val rdd = sc.textFile(“D://README.txt”) //注意是雙斜槓
//計算所有行的長度的總和
val lineLength = rdd.map(line => line.length)
val sum = lineLength.reduce(_+_)
println(“The total character of the file is ” + sum)
下面看一下textFile的原始碼:
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
可以看出在textFile是讀取HDFS或本地檔案系統或其他hadoop支援的檔案系統上的檔案,並將其轉換為RDD。在textFile內部呼叫了hadoopFile函式。
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
可以看出在hadoopFile內建立了一個HadoopRDD,HadoopRDD的建立要依賴於Hadoop底層本身。
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this,conf,inputFormatClass,keyClass,valueClass,minPartitions)
}
FileInputFormat是java的寫的,是org.apache.hadoop。mapred 的包。所以這裡是用星火操作的Hadoop的實現。
有人說星火的缺點是沒有檔案系統,但其實這正是星火的優點,正因為沒有檔案系統,所以才可以跨一切檔案系統。
用HBase的/ MySQL的/ ORACLE的話要考慮資料本地性,要認真寫getPreferredLacation.getPreferredLacation決定計算髮生在什麼地方.DAGScheduler在對DAG劃分不同階段時,階段內部具體任務已經決定了資料優先位置。所以MySQL的/預言資料庫機上要安裝Spark.HBase節點上也要安裝的火花。
實際生產環境下,HBase的和火花安裝在同一節點上是可能的,但MySQL的/ oracle的節點上安裝火花的可能性較小,這時就需要用的Tachyon作為中介軟體,匯入資料庫的資料,也可以把資料庫中的資料匯入配置單元中,在蜂巢節點上執行的火花。