1. 程式人生 > >SparkSQL in Scala 簡單實踐(spark 2x)

SparkSQL in Scala 簡單實踐(spark 2x)

參考:https://raw.githubusercontent.com/apache/spark/master/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

 

runBasicDataFrame

通過DataFrameReader來讀取資料
   
spark.read(DataFrameReader) 載入檔案: https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameReader.html
spark dataset/dataframe基本操作:https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/Dataset.html
   
資料檔案:

# cat people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

讀取檔案:

val df = spark.read.json("people.json")  //org.apache.spark.sql.DataFrame = [age: bigint, name: string]

df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
df.createGlobalTempView("people")   // Global temporary view is cross-session
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()

 

runInferSchemaExample 

scala資料型別:http://www.runoob.com/scala/scala-data-types.html

將如下檔案組織到spark sql中:

# cat user.txt 
alex,25
mike,21
yang,15
joe,39

在spark-shell端執行:

./spark-shell --master yarn --name simpleTest --driver-memory 4G --driver-cores 4 --num-executors 4 --executor-memory 4G --executor-cores 4

1、建立一個scala類:

case class Person(name: String, age: Long)

2、RDD ----> DataFrames:

val userDF = sc.textFile("/tmp/someone/user.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
userDF.printSchema()

3、將2中的DataFrames註冊一個檢視

userDF.createOrReplaceTempView("user")

4、執行查詢操作&保留查詢結果:

spark.sql("SELECT * FROM user limit 10").show()
val resultdf = spark.sql("")
resultdf.write.format("csv").mode(SaveMode.Overwrite).save("/tmp/someone/result.tmp")
spark.sql("").coalesce(1).write.format("csv").mode(SaveMode.Overwrite).save("/tmp/someone/result.tmp")

結構化輸出:

val teenagersDF = spark.sql("SELECT name, age FROM user WHERE age BETWEEN 13 AND 19")
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()

implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()