1. 程式人生 > >spark 讀寫 parquet

spark 讀寫 parquet

SQLConf

// This is used to set the default data source
  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
    .doc("The default data source to use in input/output.")
    .stringConf
    .createWithDefault("parquet")
...
def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
...

test case

val df = spark.read.format("ParquetFileFormat").load(parquetFile)
df.show()

df.show() debug 出來的情況

FileScanRDD.scala

private def readCurrentFile(): Iterator[InternalRow] = {
        try {
        //readFunction 函式
          readFunction(currentFile)
        } catch {
          case e: FileNotFoundException =>
            throw new FileNotFoundException(
              e.getMessage + "\n" +
                "It is possible the underlying files have been updated. " +
                "You can explicitly invalidate the cache in Spark by " +
                "running 'REFRESH TABLE tableName' command in SQL or " +
                "by recreating the Dataset/DataFrame involved.")
        }
      }

ParquetFileFormat.scala

  override def buildReaderWithPartitionValues(
      sparkSession: SparkSession,
      dataSchema: StructType,
      partitionSchema: StructType,
      requiredSchema: StructType,
      filters: Seq[Filter],
      options: Map[String, String],
      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
      ...
      vectorizedReader.initialize(split, hadoopAttemptContext)
      ...

}

VectorizedParquetRecordReader.java

/**
   * Implementation of RecordReader API.
   */
  @Override
  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
      throws IOException, InterruptedException, UnsupportedOperationException {
    super.initialize(inputSplit, taskAttemptContext);
    initializeInternal();
  }

SpecificParquetRecordReaderBase.java

//super.initialize
@Override
  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
      throws IOException, InterruptedException {
      ...
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
        taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
...
}

ParquetReadSupport.scala

/**
   * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
   * readers.  Responsible for figuring out Parquet requested schema used for column pruning.
   */
  override def init(context: InitContext): ReadContext = {
    catalystRequestedSchema = {
      val conf = context.getConfiguration
      val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
      assert(schemaString != null, "Parquet requested schema not set.")
      StructType.fromString(schemaString)
    }

    val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
      SQLConf.CASE_SENSITIVE.defaultValue.get)
    val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
      context.getFileSchema, catalystRequestedSchema, caseSensitive)

    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
  }