1. 程式人生 > >SparkSQL寫入多種檔案格式

SparkSQL寫入多種檔案格式

需求:

將資料庫中的資料讀取出來並以text  json  csv三種格式寫入到本地檔案或者hdfs中

csv格式:能夠以excel的形式開啟

 程式碼實現:

package cn.ysjh0014.SparkSql

import java.util.Properties

import org.apache.spark.sql._

object SparkSqlJdbc {

  def main(args: Array[String]): Unit = {

    val session: SparkSession = SparkSession.builder().appName("SparkSqlJdbc").master("local[1]").getOrCreate()

    import session.implicits._

    val resource: DataFrame = session.read.format("jdbc").options(
      Map("url" -> "jdbc:mysql://localhost:3306/lianxi?serverTimezone=GMT%2B8",
        "driver" -> "com.mysql.jdbc.Driver",
        "dbtable" -> "table1",
        "user" -> "root",
        "password" -> "root"
      )).load()

    //lambda表示式
    val r: Dataset[Row] = resource.where($"age" <= 15)
    val s: DataFrame = r.select($"id",$"name",$"age")

    val s: DataFrame = r.select($"name")

    //將查詢到的資料再寫入到資料庫中
//    val props = new Properties()
//    props.put("user","root")
//    props.put("password","root")
//    s.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata?serverTimezone=GMT%2B8", "logs", props)

    s.write.text("D:\\測試資料\\text1")

    r.write.json("D:\\測試資料\\test")

    r.write.csv("D:\\測試資料\\test2")


    session.close()

  }
}

注意:

以text格式寫入的時候只能寫入一列,並且是String型別的,否則會報錯,因為寫入的時候不僅寫入的是資料,還會將schema資訊寫入,所以text格式的不能寫入多列,寫入一列預設是value,