SparkSQL學習進度9-SQL實戰案例
Spark SQL 基本操作
將下列 JSON 格式資料複製到 Linux 系統中,並儲存命名為 employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
為 employee.json 建立 DataFrame,並寫出 Scala 語句完成下列操作:
(1)查詢所有資料;
(2)查詢所有資料,並去除重複的資料;
(3)查詢所有資料,列印時去除 id 欄位;
(4)篩選出 age>30 的記錄;
(5)將資料按 age 分組;
(6)將資料按 name 升序排列;
(7)取出前 3 行資料;
(8)查詢所有記錄的 name 列,併為其取別名為 username;
(9)查詢年齡 age 的平均值;
(10) 查詢年齡 age 的最小值。
程式設計實現將 RDD 轉換為 DataFrame
原始檔內容如下(包含 id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
請將資料複製儲存到 Linux 系統中,命名為 employee.txt,實現從 RDD 轉換得到DataFrame,並按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有資料。請寫出程式程式碼。
package cn.itcast.spark.mook
import org.apache.spark.sql.SparkSession
import org.junit.Test
class RDDtoDataFrame {
@Test
def test(): Unit ={
val spark=SparkSession.builder()
.appName("datafreame1")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val df =spark.sparkContext.textFile("dataset/employee.txt").map(_.split(","))
.map(item => Employee(item(0).trim.toInt,item(1),item(2).trim.toInt))
.toDF()
df.createOrReplaceTempView("employee")
val dfRDD=spark.sql("select * from employee")
dfRDD.map(it => "id:"+it(0) +",name:"+it(1)+",age:"+it(2) )
.show()
}
}
case class Employee(id:Int,name:String,age:Long)
程式設計實現利用 DataFrame 讀寫 MySQL 的資料
(1)在 MySQL 資料庫中新建資料庫 sparktest,再建立表 employee,包含如表 6-2 所示的兩行資料。
表 6-2 employee 表原有資料
id name gender Age
1 Alice F 22
2 John M 25
(2)配置 Spark 通過 JDBC 連線資料庫 MySQL,程式設計實現利用 DataFrame 插入如表 6-3 所示的兩行資料到 MySQL 中,最後打印出 age 的最大值和 age 的總和。
表 6-3 employee 表新增資料
id name gender age
3 Mary F 26
4 Tom M 23
package cn.itcast.spark.mook
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}
object MysqlOp {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("mysql example")
.master("local[6]")
.getOrCreate()
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("gender", StringType),
StructField("age", IntegerType)
)
)
val studentDF = spark.read
//分隔符:製表符
.option("delimiter", ",")
.schema(schema)
.csv("dataset/stu")
studentDF.write
.format("jdbc")
.mode(SaveMode.Append)
.option("url", "jdbc:mysql://hadoop101:3306/spark02")
.option("dbtable", "employee")
.option("user", "spark")
.option("password", "fengge666")
.save()
spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop101:3306/spark02")
.option("dbtable","(select max(age),SUM(age) from employee) as emp")
.option("user", "spark")
.option("password", "fengge666")
.load()
.show()
}
}