SparkSQL(5)——Spark SQL程式設計方式執行查詢
編寫Spark SQL程式實現RDD轉換成DataFrame
Spark官網提供了兩種方法來實現從RDD轉換得到DataFrame,第一種方法是利用反射機制,推導包含某種型別的RDD,通過反射將其轉換為指定型別的DataFrame,適用於提前知道RDD的schema。第二種方法通過程式設計介面與RDD進行互動獲取schema,並動態建立DataFrame,在執行時決定列及其型別。
一、新增maven依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency>
通過反射推斷Schema
Scala支援使用case class型別匯入RDD轉換為DataFrame,通過case class建立schema,case class的引數名稱會被利用反射機制作為列名。這種RDD可以高效的轉換為DataFrame並註冊為表。
package com.fgm.sparksql
import org.apache.spark.sql.SparkSession //利用反射,將rdd轉換成dataFrame case class Person(val id:Int,val name:String,val age:Int) object SchemaDemo { def main(args: Array[String]): Unit = { //建立SparkSession物件 val sparkSession = SparkSession.builder().appName("Schema").master("local[2]").getOrCreate() //建立SparkContext物件 val sc = sparkSession.sparkContext sc.setLogLevel("WARN") //讀取資料檔案 val rdd1 = sc.textFile("D:\\tmp\\person.txt").map(_.split(" ")) //將rdd與樣例類關聯 val personRDD = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //將personRDD轉換成DataFrame,需匯入隱式轉換 import sparkSession.implicits._ val personDF = personRDD.toDF() //dataFrame操作 //DSL風格 personDF.printSchema() personDF.show() personDF.select("name","age").show() personDF.select($"age">30).show() //sql風格語法 personDF.createTempView("person") sparkSession.sql("select * from person").show() sparkSession.sql("select * from person where age>30").show() sparkSession.sql("select * from person where id=3").show() sparkSession.stop() } }
通過StructType直接指定Schema
當case class不能提前定義好時,可以通過以下三步建立DataFrame
(1)將RDD轉為包含Row物件的RDD
(2)基於StructType型別建立schema,與第一步建立的RDD相匹配
(3)通過sparkSession的createDataFrame方法對第一步的RDD應用schema建立DataFrame
package com.fgm.sparksql import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} /** *通過StructType指定schema,將rdd轉換成dataFrame * @Auther: fgm */ object StructTypeSchema { def main(args: Array[String]): Unit = { //建立SparkSession物件 val spark = SparkSession.builder().appName("StructTypSchema").master("local[2]").getOrCreate() //建立SparkContext val sc = spark.sparkContext sc.setLogLevel("WARN") //讀取資料 val rdd1 = sc.textFile("D:\\tmp\\person.txt").map(_.split(" ")) //將rdd與rowd物件關聯 val rowRDD = rdd1.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //指定schema val schema=(new StructType).add(StructField("id",IntegerType,true)) .add(StructField("name",StringType,false)) .add(StructField("age",IntegerType,true)) val dataFrame = spark.createDataFrame(rowRDD,schema) dataFrame.printSchema() dataFrame.show() dataFrame.createTempView("person") spark.sql("select * from person").show() spark.stop() } }
編寫程式操作HiveContext
HiveContext是對應spark-hive這個專案,與hive有部分耦合, 支援hql,是SqlContext的子類,在Spark2.0之後,HiveContext和SqlContext在SparkSession進行了統一,可以通過操作SparkSession來操作HiveContext和SqlContext。
新增依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.3</version>
</dependency>
實現
package com.fgm.sparksql
import org.apache.spark.sql.SparkSession
/**
*SparkSql操作
*
* @Auther: fgm
*/
object HiveSparkSql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("HiveSparkSql").master("local[2]").enableHiveSupport().getOrCreate()
spark.sql("create table user(id int,name string,age int) row format delimited fields terminated by ','")
spark.sql("load data local inpath './data/user.txt' into table user")
spark.sql("select * from user").show()
spark.stop()
}
}
注意:這裡首先在專案根目錄下建立data目錄(和src同級),然後在data中穿件user.txt檔案,並寫入相關資料(1,zhangsan,22)。不然會報錯。另外需要開啟HiveSupport服務:enableHiveSupport()