1. 程式人生 > >Spark SQL基本操作以及函式的使用

Spark SQL基本操作以及函式的使用

引語:

  本篇部落格主要介紹了Spark SQL中的filter過濾資料、去重、集合等基本操作,以及一些常用日期函式,隨機函式,字串操作等函式的使用,並列編寫了示例程式碼,同時還給出了程式碼當中用到的一些資料,放在最文章最後。

SparkSQL簡介

  Spark SQL是Spark生態系統中非常重要的元件,其前身為Shark。Shark是Spark上的資料倉庫,最初設計成與Hive相容,但是該專案於2014年開始停止開發,轉向Spark SQL。Spark SQL全面繼承了Shark,並進行了優化。 Spark SQL增加了SchemaRDD(即帶有Schema資訊的RDD),使使用者可以在Spark SQL中執行SQL語句,資料既可以來自RDD,也可以來自Hive、HDFS、Cassandra等外部資料來源,還可以是JSON格式的資料。Spark SQL目前支援Scala、Java、Python三種語言,支援SQL-92規範。

Spark SQL的優點

  Spark SQL可以很好地支援SQL查詢,一方面,可以編寫Spark應用程式使用SQL語句進行資料查詢,另一方面,也可以使用標準的資料庫聯結器(比如JDBC或ODBC)連線Spark進行SQL查詢 。

Spark SQL基本操作

去重

  distinct:根據每條資料進行完整去重。

  dropDuplicates:根據欄位去重。

  package spark2x

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * 類名  DistinctDemo
  * 作者   彭三青
  * 建立時間  2018-11-29 15:02
  * 版本  1.0
  * 描述: $ 去重操作:distinct、drop
  */

object DistinctDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("Operations")
      .getOrCreate()
    import spark.implicits._

    val employeeDF: DataFrame = spark.read.json("E://temp/person.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]

    println("--------------------distinct---------------------")
	// 根據每條資料進行完整的去重
    employeeDS.distinct().show()

    println("--------------------dropDuplicates---------------------")
    // 根據欄位進行去重
    employeeDS.dropDuplicates(Seq("name")).show()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
過濾
filter():括號裡的引數可以是過濾函式、函式返回的Boolean值(為true則保留,false則過濾掉)、列名或者表示式。

except:過濾出當前DataSet中有,但在另一個DataSet中不存在的。

intersect:獲取兩個DataSet的交集。

  提示:except和intersect使用的時候必須要是相同的例項,如果把另外一個的Employee換成一個同樣的欄位的Person類就會報錯。

package spark2x

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * 類名  FilterDemo
  * 作者   彭三青
  * 建立時間  2018-11-29 15:09
  * 版本  1.0
  * 描述: $
  */

object FilterDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("FilterDemo")
      .getOrCreate()
    import spark.implicits._

    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
    val employee2DF: DataFrame = spark.read.json("E://temp/employee2.json")
    val employee2DS: Dataset[Employee] = employee2DF.as[Employee]

    println("--------------------employee--------------------")
    employeeDS.show()

    println("--------------------employee2--------------------")
    employee2DS.show()

    println(
      "       ┏┓   ┏┓\n" +
      "     ┏┛┻━━━┛┻┓\n" +
      "   ┃       ┃\n" +
      "   ┃   ━   ┃\n" +
      "   ┃ ┳┛ ┗┳ ┃\n" +
      "   ┃       ┃\n" +
      "   ┃   ┻   ┃\n" +
      "   ┃       ┃\n" +
      "   ┗━┓   ┏━┛\n" +
      "     ┃   ┃\n" +
      "      ┃   ┃\n" +
      "     ┃   ┗━━━┓\n" +
      "     ┃       ┣┓\n" +
      "     ┃       ┏┛\n" +
      "     ┗┓┓┏━┳┓┏┛\n" +
      "      ┃┫┫ ┃┫┫\n" +
      "      ┗┻┛ ┗┻┛\n"
    )

    println("-------------------------------------------------")

    // 如果引數返回true,就保留該元素,否則就過濾掉
    employeeDS.filter(employee => employee.age == 35).show()
    employeeDS.filter(employee => employee.age > 30).show()
    // 獲取當前的DataSet中有,但是在另外一個DataSet中沒有的元素
    employeeDS.except(employee2DS).show()
    // 獲取兩個DataSet的交集
    employeeDS.intersect(employee2DS).show()

    spark.stop()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
集合
collect_set:將一個分組內指定欄位的值都收集到一起,不去重

collect_list:講一個分組內指定欄位的值都收集到一起,會去重

package spark2x

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * 類名  CollectSetAndList
  * 作者   彭三青
  * 建立時間  2018-11-29 15:24
  * 版本  1.0
  * 描述: $ collect_list、 collect_set
  */

object CollectSetAndList {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("FilterDemo")
      .getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._

    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]

    // collect_list:將一個分組內指定欄位的值都收集到一起,不去重
    // collect_set:同上,但唯一區別是會去重
    employeeDS
      .groupBy(employeeDS("depId"))
      .agg(collect_set(employeeDS("name")), collect_list(employeeDS("name")))
      .show()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
join和sort
package spark2x

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * 類名  JoinAndSort
  * 作者   彭三青
  * 建立時間  2018-11-29 15:19
  * 版本  1.0
  * 描述: $
  */

object JoinAndSort {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("FilterDemo")
      .getOrCreate()
    import spark.implicits._

    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
    val departmentDF: DataFrame = spark.read.json("E://temp/department.json")
    val departmentDS: Dataset[Department] = departmentDF.as[Department]

    println("----------------------employeeDS----------------------")
    employeeDS.show()
    println("----------------------departmentDS----------------------")
    departmentDS.show()
    println("------------------------------------------------------------")

    // 等值連線
    employeeDS.joinWith(departmentDS, $"depId" === $"id").show()
    // 按照年齡進行排序,並降序排列
    employeeDS.sort($"age".desc).show()
  }
}
case class Department(id: Long, name: String)
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

函式的使用

日期函式

  current_time():獲取當前日期。

  current_timestamp():獲取當前時間戳。

數學函式

  rand():生成0~1之間的隨機數

  round(e: column,scale: Int ):column列名,scala精確到小數點的位數。

  round(e: column):一個引數預設精確到小數點1位。

字串函式

  concat_ws(seq: String, exprs: column*):字串拼接。引數seq傳入的拼接的字元,column傳入的需要拼接的字元,可以指定多個列,不同列之間用逗號隔開。

package spark2x

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * 類名  FunctionsDemo
  * 作者   彭三青
  * 建立時間  2018-11-29 15:56
  * 版本  1.0
  * 描述: $
  */

object FunctionsDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("Operations")
      .getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._

    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]

    employeeDS
        .select(employeeDS("name"), current_date(), current_timestamp(),
          rand(), round(employeeDS("salary"), 2),// 取隨機數,
          concat(employeeDS("gender"), employeeDS("age")),
          concat_ws("|", employeeDS("gender"), employeeDS("age"))).show()

    spark.stop()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
資料

  employee.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}

  employee2.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}

  department.json

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}