1. 程式人生 > 實用技巧 >SparkSQL學習進度9-SQL實戰案例

SparkSQL學習進度9-SQL實戰案例

Spark SQL 基本操作

將下列 JSON 格式資料複製到 Linux 系統中,並儲存命名為 employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

為 employee.json 建立 DataFrame,並寫出 Scala 語句完成下列操作:

(1)查詢所有資料;

(2)查詢所有資料,並去除重複的資料;

(3)查詢所有資料,列印時去除 id 欄位;

(4)篩選出 age>30 的記錄;

(5)將資料按 age 分組;

(6)將資料按 name 升序排列;

(7)取出前 3 行資料;

(8)查詢所有記錄的 name 列,併為其取別名為 username;

(9)查詢年齡 age 的平均值;

(10) 查詢年齡 age 的最小值。

程式設計實現將 RDD 轉換為 DataFrame

原始檔內容如下(包含 id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

請將資料複製儲存到 Linux 系統中,命名為 employee.txt,實現從 RDD 轉換得到DataFrame,並按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有資料。請寫出程式程式碼。

package cn.itcast.spark.mook

import org.apache.spark.sql.SparkSession
import org.junit.Test



class RDDtoDataFrame {

  @Test
  def test(): Unit ={
    val spark=SparkSession.builder()
      .appName("datafreame1")
      .master("local[6]")
      .getOrCreate()

    import spark.implicits._
    val df
=spark.sparkContext.textFile("dataset/employee.txt").map(_.split(",")) .map(item => Employee(item(0).trim.toInt,item(1),item(2).trim.toInt)) .toDF() df.createOrReplaceTempView("employee") val dfRDD=spark.sql("select * from employee") dfRDD.map(it => "id:"+it(0) +",name:"+it(1)+",age:"+it(2) ) .show() } } case class Employee(id:Int,name:String,age:Long)

程式設計實現利用 DataFrame 讀寫 MySQL 的資料

(1)在 MySQL 資料庫中新建資料庫 sparktest,再建立表 employee,包含如表 6-2 所示的兩行資料。

表 6-2 employee 表原有資料

id name gender Age

1 Alice F 22

2 John M 25

(2)配置 Spark 通過 JDBC 連線資料庫 MySQL,程式設計實現利用 DataFrame 插入如表 6-3 所示的兩行資料到 MySQL 中,最後打印出 age 的最大值和 age 的總和。

表 6-3 employee 表新增資料

id name gender age

3 Mary F 26

4 Tom M 23

package cn.itcast.spark.mook

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}

object MysqlOp {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("mysql example")
      .master("local[6]")
      .getOrCreate()

    val schema = StructType(
      List(
        StructField("id", IntegerType),
        StructField("name", StringType),
        StructField("gender", StringType),
        StructField("age", IntegerType)
      )
    )

    val studentDF = spark.read
      //分隔符:製表符
      .option("delimiter", ",")
      .schema(schema)
      .csv("dataset/stu")

    studentDF.write
      .format("jdbc")
      .mode(SaveMode.Append)
      .option("url", "jdbc:mysql://hadoop101:3306/spark02")
      .option("dbtable", "employee")
      .option("user", "spark")
      .option("password", "fengge666")
      .save()

    spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop101:3306/spark02")
      .option("dbtable","(select max(age),SUM(age) from employee) as emp")
      .option("user", "spark")
      .option("password", "fengge666")
      .load()
      .show()


  }

}