Spark-SQL 讀寫Parquet檔案
阿新 • • 發佈:2021-01-05
-
讀Parquet格式wenjian
import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDataFrameFromParquet { def main(args: Array[String]): Unit = { //建立SparkSession(是對SparkContext的包裝和增強) val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() val df: DataFrame = spark.read.parquet("src/main/scala/data/user.parquet") df.show(2) df.printSchema() spark.stop() } }
-
寫入到Parquet格式檔案中
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType} object WriteToParquet { def main(args: Array[String]): Unit = { //建立SparkSession val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() val sc: SparkContext = spark.sparkContext val lines: RDD[String] = sc.textFile("src/main/scala/data/user.txt") //row的欄位沒有名字 沒有型別 val rdd1: RDD[Row] = lines.map(e => { val split = e.split(",") Row(split(0), split(1).toInt, split(2).toDouble) }) //關聯schema(欄位名稱、欄位型別、是否可以為空) val schema: StructType = StructType( Array( StructField("name", StringType), StructField("age", IntegerType), StructField("fv", DoubleType) ) ) //將RowRDD與StructType中的schema關聯 val df1: DataFrame = spark.createDataFrame(rdd1, schema) df1.write.parquet("src/main/scala/data/outpar") sc.stop() spark.stop() } }