實現 spark DataSourceV2 的幾個環節
阿新 • • 發佈:2018-12-04
繼承 DataSourceV2
class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport {
override def createReader()
override def createWriter()
}
構造 DataSourceReader
class Reader(path: String, conf: Configuration) extends DataSourceReader { /** * Returns the actual schema of this data source reader, which may be different from the physical * schema of the underlying storage, as column pruning or other optimizations may happen. * override def readSchema() /** * Returns a list of reader factories. Each factory is responsible for creating a data reader to * output data for one RDD partition. That means the number of factories returned here is same as * the number of RDD partitions this scan outputs. override def createDataReaderFactories() }
構造 DataReaderFactory 、DataReader
class SimpleCSVDataReaderFactory(path: String, conf: SerializableConfiguration)
extends DataReaderFactory[Row] with DataReader[Row]
{
/**
* Returns a data reader to do the actual reading work.
*
override def createDataReader(): DataReader[Row]
}
構造 DataSourceWriter
class Writer(jobId: String, path: String, conf: Configuration) extends DataSourceWriter { /** * Creates a writer factory which will be serialized and sent to executors. * override def createWriterFactory(): DataWriterFactory[Row] override def commit(messages: Array[WriterCommitMessage]): Unit override def abort(messages: Array[WriterCommitMessage]): Unit }
構造 DataWriterFactory
class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[Row] {
/**
* Returns a data writer to do the actual writing work.
*
override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
}
}
構造 DataWriter
class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] {
}