Spark-SQL 讀寫csv檔案
阿新 • • 發佈:2021-01-05
name,age,fv_value libai,18,9999.99 xuance,30,99.99 diaochan,28,99.99
libai,18,9999.99
xuance,30,99.99
diaochan,28,99.99
-
讀csv檔案
import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDataFrameFromCsv { def main(args: Array[String]): Unit = { //建立SparkSession(是對SparkContext的包裝和增強) val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() //①檔案中有表頭 val df: DataFrame = spark.read .option("header", "true") //將第一行當作表頭 .option("inferSchema", "true") //推斷資料型別 .csv("src/main/scala/data/user.csv") // df.printSchema() // df.show() //②檔案中沒有表頭 val df2: DataFrame = spark.read .option("inferSchema","true") .csv("src/main/scala/data/user2.csv") df2.toDF("name","age","fv") df2.printSchema() df2.show() spark.stop() } }
-
寫入csv型別的檔案中
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType} object WriteToCsv{ def main(args: Array[String]): Unit = { //建立SparkSession val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() val sc: SparkContext = spark.sparkContext val lines: RDD[String] = sc.textFile("src/main/scala/data/user.txt") //row的欄位沒有名字 沒有型別 val rdd1: RDD[Row] = lines.map(e => { val split = e.split(",") Row(split(0), split(1).toInt, split(2).toDouble) }) //關聯schema(欄位名稱、欄位型別、是否可以為空) val schema: StructType = StructType( Array( StructField("name", StringType), StructField("age", IntegerType), StructField("fv", DoubleType) ) ) //將RowRDD與StructType中的schema關聯 val df1: DataFrame = spark.createDataFrame(rdd1, schema) df1.write.csv("src/main/scala/data/outcsv") } }