Spark SQL基本操作以及函式的使用
引語:
本篇部落格主要介紹了Spark SQL中的filter過濾資料、去重、集合等基本操作,以及一些常用日期函式,隨機函式,字串操作等函式的使用,並列編寫了示例程式碼,同時還給出了程式碼當中用到的一些資料,放在最文章最後。
SparkSQL簡介
Spark SQL是Spark生態系統中非常重要的元件,其前身為Shark。Shark是Spark上的資料倉庫,最初設計成與Hive相容,但是該專案於2014年開始停止開發,轉向Spark SQL。Spark SQL全面繼承了Shark,並進行了優化。 Spark SQL增加了SchemaRDD(即帶有Schema資訊的RDD),使使用者可以在Spark SQL中執行SQL語句,資料既可以來自RDD,也可以來自Hive、HDFS、Cassandra等外部資料來源,還可以是JSON格式的資料。Spark SQL目前支援Scala、Java、Python三種語言,支援SQL-92規範。
Spark SQL的優點
Spark SQL可以很好地支援SQL查詢,一方面,可以編寫Spark應用程式使用SQL語句進行資料查詢,另一方面,也可以使用標準的資料庫聯結器(比如JDBC或ODBC)連線Spark進行SQL查詢 。
Spark SQL基本操作
去重
distinct:根據每條資料進行完整去重。
dropDuplicates:根據欄位去重。
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 類名 DistinctDemo * 作者 彭三青 * 建立時間 2018-11-29 15:02 * 版本 1.0 * 描述: $ 去重操作:distinct、drop */ object DistinctDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("Operations") .getOrCreate() import spark.implicits._ val employeeDF: DataFrame = spark.read.json("E://temp/person.json") val employeeDS: Dataset[Employee] = employeeDF.as[Employee] println("--------------------distinct---------------------") // 根據每條資料進行完整的去重 employeeDS.distinct().show() println("--------------------dropDuplicates---------------------") // 根據欄位進行去重 employeeDS.dropDuplicates(Seq("name")).show() } } case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
過濾
filter():括號裡的引數可以是過濾函式、函式返回的Boolean值(為true則保留,false則過濾掉)、列名或者表示式。
except:過濾出當前DataSet中有,但在另一個DataSet中不存在的。
intersect:獲取兩個DataSet的交集。
提示:except和intersect使用的時候必須要是相同的例項,如果把另外一個的Employee換成一個同樣的欄位的Person類就會報錯。
package spark2x import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 類名 FilterDemo * 作者 彭三青 * 建立時間 2018-11-29 15:09 * 版本 1.0 * 描述: $ */ object FilterDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("FilterDemo") .getOrCreate() import spark.implicits._ val employeeDF: DataFrame = spark.read.json("E://temp/employee.json") val employeeDS: Dataset[Employee] = employeeDF.as[Employee] val employee2DF: DataFrame = spark.read.json("E://temp/employee2.json") val employee2DS: Dataset[Employee] = employee2DF.as[Employee] println("--------------------employee--------------------") employeeDS.show() println("--------------------employee2--------------------") employee2DS.show() println( " ┏┓ ┏┓\n" + " ┏┛┻━━━┛┻┓\n" + " ┃ ┃\n" + " ┃ ━ ┃\n" + " ┃ ┳┛ ┗┳ ┃\n" + " ┃ ┃\n" + " ┃ ┻ ┃\n" + " ┃ ┃\n" + " ┗━┓ ┏━┛\n" + " ┃ ┃\n" + " ┃ ┃\n" + " ┃ ┗━━━┓\n" + " ┃ ┣┓\n" + " ┃ ┏┛\n" + " ┗┓┓┏━┳┓┏┛\n" + " ┃┫┫ ┃┫┫\n" + " ┗┻┛ ┗┻┛\n" ) println("-------------------------------------------------") // 如果引數返回true,就保留該元素,否則就過濾掉 employeeDS.filter(employee => employee.age == 35).show() employeeDS.filter(employee => employee.age > 30).show() // 獲取當前的DataSet中有,但是在另外一個DataSet中沒有的元素 employeeDS.except(employee2DS).show() // 獲取兩個DataSet的交集 employeeDS.intersect(employee2DS).show() spark.stop() } } case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
集合
collect_set:將一個分組內指定欄位的值都收集到一起,不去重
collect_list:講一個分組內指定欄位的值都收集到一起,會去重
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 類名 CollectSetAndList
* 作者 彭三青
* 建立時間 2018-11-29 15:24
* 版本 1.0
* 描述: $ collect_list、 collect_set
*/
object CollectSetAndList {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("FilterDemo")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
// collect_list:將一個分組內指定欄位的值都收集到一起,不去重
// collect_set:同上,但唯一區別是會去重
employeeDS
.groupBy(employeeDS("depId"))
.agg(collect_set(employeeDS("name")), collect_list(employeeDS("name")))
.show()
}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
join和sort
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 類名 JoinAndSort
* 作者 彭三青
* 建立時間 2018-11-29 15:19
* 版本 1.0
* 描述: $
*/
object JoinAndSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("FilterDemo")
.getOrCreate()
import spark.implicits._
val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
val departmentDF: DataFrame = spark.read.json("E://temp/department.json")
val departmentDS: Dataset[Department] = departmentDF.as[Department]
println("----------------------employeeDS----------------------")
employeeDS.show()
println("----------------------departmentDS----------------------")
departmentDS.show()
println("------------------------------------------------------------")
// 等值連線
employeeDS.joinWith(departmentDS, $"depId" === $"id").show()
// 按照年齡進行排序,並降序排列
employeeDS.sort($"age".desc).show()
}
}
case class Department(id: Long, name: String)
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
函式的使用
日期函式
current_time():獲取當前日期。
current_timestamp():獲取當前時間戳。
數學函式
rand():生成0~1之間的隨機數
round(e: column,scale: Int ):column列名,scala精確到小數點的位數。
round(e: column):一個引數預設精確到小數點1位。
字串函式
concat_ws(seq: String, exprs: column*):字串拼接。引數seq傳入的拼接的字元,column傳入的需要拼接的字元,可以指定多個列,不同列之間用逗號隔開。
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 類名 FunctionsDemo
* 作者 彭三青
* 建立時間 2018-11-29 15:56
* 版本 1.0
* 描述: $
*/
object FunctionsDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("Operations")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
employeeDS
.select(employeeDS("name"), current_date(), current_timestamp(),
rand(), round(employeeDS("salary"), 2),// 取隨機數,
concat(employeeDS("gender"), employeeDS("age")),
concat_ws("|", employeeDS("gender"), employeeDS("age"))).show()
spark.stop()
}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
資料
employee.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}
employee2.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
department.json
{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}