SparkSQL in Scala 簡單實踐(spark 2x)
阿新 • • 發佈:2019-01-07
runBasicDataFrame
通過DataFrameReader來讀取資料
spark.read(DataFrameReader) 載入檔案: https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameReader.html
spark dataset/dataframe基本操作:https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/Dataset.html
資料檔案:
# cat people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
讀取檔案:
val df = spark.read.json("people.json") //org.apache.spark.sql.DataFrame = [age: bigint, name: string] df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
df.createGlobalTempView("people") // Global temporary view is cross-session spark.sql("SELECT * FROM global_temp.people").show() spark.newSession().sql("SELECT * FROM global_temp.people").show()
runInferSchemaExample
scala資料型別:http://www.runoob.com/scala/scala-data-types.html
將如下檔案組織到spark sql中:
# cat user.txt
alex,25
mike,21
yang,15
joe,39
在spark-shell端執行:
./spark-shell --master yarn --name simpleTest --driver-memory 4G --driver-cores 4 --num-executors 4 --executor-memory 4G --executor-cores 4
1、建立一個scala類:
case class Person(name: String, age: Long)
2、RDD ----> DataFrames:
val userDF = sc.textFile("/tmp/someone/user.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
userDF.printSchema()
3、將2中的DataFrames註冊一個檢視
userDF.createOrReplaceTempView("user")
4、執行查詢操作&保留查詢結果:
spark.sql("SELECT * FROM user limit 10").show()
val resultdf = spark.sql("")
resultdf.write.format("csv").mode(SaveMode.Overwrite).save("/tmp/someone/result.tmp")
spark.sql("").coalesce(1).write.format("csv").mode(SaveMode.Overwrite).save("/tmp/someone/result.tmp")
結構化輸出:
val teenagersDF = spark.sql("SELECT name, age FROM user WHERE age BETWEEN 13 AND 19")
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()