Spark job 的觸發
阿新 • • 發佈:2018-12-04
判斷是RDD action的操作的一個標誌是
其函式實現裡得有
sc.runJob
RDD 是怎麼觸發job的
以 rdd.count 為例
RDD.scala
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
SparkContext.scala
/** * Run a job on all partitions in an RDD and return the results in an array. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @return in-memory collection with a result of the job (each collection element will contain * a result from one partition) */ def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) }
DataFrame 是怎麼觸發job的
以 df.count 為例
比rdd 要囉嗦很多
DataSet.scala
/**
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
SparkPlan.scala
/** * Runs this query returning the result as an array. */ def executeCollect(): Array[InternalRow] = { val byteArrayRdd = getByteArrayRdd() val results = ArrayBuffer[InternalRow]() byteArrayRdd.collect().foreach { countAndBytes => decodeUnsafeRows(countAndBytes._2).foreach(results.+=) } results.toArray }
RDD.scala
/**
* Return an array that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
SparkContext.scala
/**
* Run a job on all partitions in an RDD and return the results in an array.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
至此殊途同歸
spark.read.parquet 也能觸發job
val df: DataFrame = spark.read.parquet("/tmp/spark-975aa02f-fa2e-4b03-a3bd-7d57e3927787/" +
"part-00000-61784fcc-e6cc-4ea8-bb14-7405f680681d.snappy.parquet"
DataFrameReader.scala
def parquet(path: String): DataFrame = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
parquet(Seq(path): _*)
}
DataFrameReader.scala
def parquet(paths: String*): DataFrame = {
format("parquet").load(paths: _*)
}
DataFrameReader.scala
def load(paths: String*): DataFrame = {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
DataSource.scala
def resolveRelation()
{
...
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
tempFileIndex.allFiles())
...
}
ParquetFileFormat.scala
override def inferSchema()
{
...
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
...
}
ParquetFileFormat.scala
def mergeSchemasInParallel()
{
...
// Issues a Spark job to read Parquet schema in parallel.
val partiallyMergedSchemas =
sparkSession
.sparkContext
.parallelize(partialFileStatusInfo, numParallelism)
.mapPartitions { iterator =>
// Resembles fake `FileStatus`es with serialized path and length information.
val fakeFileStatuses = iterator.map { case (path, length) =>
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
}.toSeq
// Reads footers in multi-threaded manner within each task
val footers =
ParquetFileFormat.readParquetFootersInParallel(
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
if (footers.isEmpty) {
Iterator.empty
} else {
var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)
footers.tail.foreach { footer =>
val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
}
}
Iterator.single(mergedSchema)
}
}.collect()
...
}
RDD.scala
/**
* Return an array that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}