spark 讀寫 parquet
阿新 • • 發佈:2019-01-13
SQLConf
// This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The default data source to use in input/output.") .stringConf .createWithDefault("parquet") ... def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) ...
test case
val df = spark.read.format("ParquetFileFormat").load(parquetFile)
df.show()
df.show() debug 出來的情況
FileScanRDD.scala
private def readCurrentFile(): Iterator[InternalRow] = { try { //readFunction 函式 readFunction(currentFile) } catch { case e: FileNotFoundException => throw new FileNotFoundException( e.getMessage + "\n" + "It is possible the underlying files have been updated. " + "You can explicitly invalidate the cache in Spark by " + "running 'REFRESH TABLE tableName' command in SQL or " + "by recreating the Dataset/DataFrame involved.") } }
ParquetFileFormat.scala
override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { ... vectorizedReader.initialize(split, hadoopAttemptContext) ... }
VectorizedParquetRecordReader.java
/**
* Implementation of RecordReader API.
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext);
initializeInternal();
}
SpecificParquetRecordReaderBase.java
//super.initialize
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
...
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
...
}
ParquetReadSupport.scala
/**
* Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
*/
override def init(context: InitContext): ReadContext = {
catalystRequestedSchema = {
val conf = context.getConfiguration
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}
val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
context.getFileSchema, catalystRequestedSchema, caseSensitive)
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}