spark sql原理及使用 基本使用mysql hive rdd轉為dataframe
一、spark SQL概述
1.1 什麼是spark SQL
Spark SQL是Spark用來處理結構化資料的一個模組,它提供了一個程式設計抽象叫做DataFrame並且作為分散式SQL查詢引擎的作用。類似於hive的作用。
1.2 spark SQL的特點
1、容易整合:安裝Spark的時候,已經整合好了。不需要單獨安裝。
2、統一的資料訪問方式:JDBC、JSON、Hive、parquet檔案(一種列式儲存檔案,是SparkSQL預設的資料來源,hive中也支援)
3、完全相容Hive。可以將Hive中的資料,直接讀取到Spark SQL中處理。
4、支援標準的資料連線:JDBC、ODBC
5、計算效率比基於mr的hive高,而且hive2.x版本中,hive建議使用spark作為執行引擎
二、spark SQL基本原理
2.1 DataFrame和DataSet基本概念
2.1.1 DataFrame
DataFrame是組織成命名列的資料集。它在概念上等同於關係資料庫中的表,裡面有表的結構以及資料,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,
例如:
結構化資料檔案
hive中的表
外部資料庫或現有RDDs
DataFrame API支援的語言有Scala,Java,Python和R。
比起RDD,DataFrame多了資料的結構資訊,即schema。RDD是分散式的 Java物件的集合。DataFrame是分散式的Row物件的集合。DataFrame除了提供了比RDD更豐富的運算元以外,更重要的特點是提升執行效率、減少資料讀取以及執行計劃的優化。
2.1.2 DataSet
Dataset是一個分散式的資料收集器。這是在Spark1.6之後新加的一個介面,兼顧了RDD的優點(強型別,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)
2.2 建立DataFrame的方式
2.2.1 SparkSession物件
Apache Spark 2.0引入了SparkSession,其為使用者提供了一個統一的切入點來使用Spark的各項功能,並且允許使用者通過它呼叫DataFrame和Dataset相關API來編寫Spark程式。最重要的是,它減少了使用者需要了解的一些概念,使得我們可以很容易地與Spark互動。
在2.0版本之前,與Spark互動之前必須先建立SparkConf和SparkContext。然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地建立SparkConf, SparkContext 以及 SQLContext,因為這些物件已經封裝在SparkSession中。
要注意一點,在我用的這個spark版本中,直接使用new SQLContext() 來建立SQLContext物件,會顯示該方式已經被棄用了(IDEA中會顯示已棄用),建議使用SparkSession來獲取SQLContext物件。
2.2.2 通過case class樣本類
這種方式在scala中比較常用,因為case class是scala的特色
/**
表 t_stu 的結構為:
id name age
*/
object CreateDF {
def main(args: Array[String]): Unit = {
//這是最新的獲取SQLContext物件的方式
//2、建立SparkSession物件,設定master,appname
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
//3、通過spark獲取sparkContext物件,讀取資料
val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
//4、將資料對映到case class中,也就是資料對映到表的對應欄位中
val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
//這裡必須要加上隱式轉換,否則無法呼叫 toDF 函式
import spark.sqlContext.implicits._
//5、生成df
val df2 = tb.toDF()
//相當於select name from t_stu
df1.select($"name").show()
//關閉spark物件
spark.stop()
}
}
/*1、定義case class,每個屬性對應表中的欄位名以及型別
一般生產中為了方便,會全部定義為string型別,然後有需要的時候
才根據實際情況將string轉為需要的型別
這一步相當於定義表的結構
*/
case class emp(id:Int,name:String,age:Int)
總結步驟為:
1、定義case class,用來表結構
2、建立sparkSession物件,用來讀取資料
3、將rdd中的資料和case class對映
4、呼叫 toDF 函式將rdd轉為 DataFrame
2.2.3 通過StructType類
這種方式java比較常用
package SparkSQLExer
import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
/**
* 建立dataschema方式2:
* 通過spark session物件建立,表結構通過StructType建立
*/
object CreateDF02 {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()
//1、通過StructType建立表結構schema,裡面表的每個欄位使用 StructField定義
val tbSchema = StructType(List(
StructField("id",DataTypes.IntegerType),
StructField("name",DataTypes.StringType),
StructField("age",DataTypes.IntegerType)
))
//2、讀取資料
var lines = sparkS.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
//3、將資料對映為ROW物件
val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))
//4、建立表結構和表資料對映,返回的就是df
val df2 = sparkS.createDataFrame(rdd1, tbSchema)
//打印表結構
df2.printSchema()
sparkS.stop()
}
}
總結步驟為:
1、通過StructType建立表結構schema,裡面表的每個欄位使用 StructField定義
2、通過sparkSession.sparkContext讀取資料
3、將資料對映格式為Row物件
4、將StructType和資料Row物件對映,返回df
2.2.4 使用json等有表格式的檔案型別
package SparkSQLExer
import org.apache.spark.sql.SparkSession
/**
* 建立df方式3:通過有格式的檔案直接匯入資料以及表結構,比如json格式的檔案
* 返回的直接就是一個DF
*/
object CreateDF03 {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()
//讀取json方式1:
val jsonrdd1= sparkS.read.json("path")
//讀取json方式2:
val jsonrdd1= sparkS.read.format("json").load("path")
sparkS.stop()
}
}
這種方式比較簡單,就是直接讀取json檔案而已
sparkS.read.xxxx讀取任意檔案時,返回的都是DF物件
2.3 操作DataFrame
2.3.1 DSL語句
DSL語句其實就是將sql語句的一些操作轉為類似函式的方式去呼叫,比如:
df1.select("name").show
例子:
為了方便,直接在spark-shell裡操作了,
spark-shell --master spark://bigdata121:7077
1、打印表結構
scala> df1.printSchema
root
|-- empno: integer (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: integer (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: integer (nullable = true)
|-- comm: integer (nullable = true)
|-- deptno: integer (nullable = true)
2、顯示當前df的表資料或者查詢結果的資料
scala> df1.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|1980/12/17| 800| 0| 20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30|
| 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30|
| 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0| 20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30|
| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30|
| 7876| ADAMS| CLERK|7788| 1987/5/23|1100| 0| 20|
| 7900| JAMES| CLERK|7698| 1981/12/3| 950| 0| 30|
| 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20|
| 7934|MILLER| CLERK|7782| 1982/1/23|1300| 0| 10|
+-----+------+---------+----+----------+----+----+------+
3、執行select, 相當於select xxx form xxx where xxx
scala> df1.select("ename","sal").where("sal>2000").show
+------+----+
| ename| sal|
+------+----+
| SMITH| 800|
| ALLEN|1600|
| WARD|1250|
| JONES|2975|
|MARTIN|1250|
| BLAKE|2850|
| CLARK|2450|
| SCOTT|3000|
| KING|5000|
|TURNER|1500|
| ADAMS|1100|
| JAMES| 950|
| FORD|3000|
|MILLER|1300|
+------+----+
4、對某些列進行操作
對某個指定進行操作時,需要加上$符號,然後後面才能操作
$代表 取出來以後,再做一些操作。
注意:這個 $ 的用法在ideal中無法正常使用,解決方法下面說
scala> df1.select($"ename",$"sal",$"sal"+100).show
+------+----+-----------+
| ename| sal|(sal + 100)|
+------+----+-----------+
| SMITH| 800| 900|
| ALLEN|1600| 1700|
| WARD|1250| 1350|
| JONES|2975| 3075|
|MARTIN|1250| 1350|
| BLAKE|2850| 2950|
| CLARK|2450| 2550|
| SCOTT|3000| 3100|
| KING|5000| 5100|
|TURNER|1500| 1600|
| ADAMS|1100| 1200|
| JAMES| 950| 1050|
| FORD|3000| 3100|
|MILLER|1300| 1400|
+------+----+-----------+
5、過濾行
scala> df1.filter($"sal">2000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename| job| mgr| hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES| MANAGER|7839| 1981/4/2|2975| 0| 20|
| 7698|BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30|
| 7782|CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7788|SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20|
+-----+-----+---------+----+----------+----+----+------+
6、分組以及計數
scala> df1.groupBy($"deptno").count.show
+------+-----+
|deptno|count|
+------+-----+
| 20| 5|
| 10| 3|
| 30| 6|
+------+-----+
上面說到在idea中 select($"name")中無法正常使用,解決方法為:
在該語句之前加上這麼一句:
import spark.sqlContext.implicits._
主要還是因為型別的問題,加上隱式轉換就好了
2.3.2 sql語句
df物件不能直接執行sql。需要生成一個檢視,再執行SQL。
需要指定建立的檢視名稱,後面檢視名稱就相當於表名。
檢視後面還會細說,這裡先有個概念
例子:
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
。。。。。。。。。。。。。。
//通過df物件建立臨時檢視。檢視名就相當於表名
df1.createOrReplaceTempView("emp")
//通過sparksession物件執行執行
spark.sql("select * from emp").show
spark.sql("select * from emp where sal > 2000").show
spark.sql("select deptno,count(1) from emp group by deptno").show
//可以建立多個檢視,不衝突
df1.createOrReplaceTempView("emp12345")
spark.sql("select e.deptno from emp12345 e").show
2.3.3 多表查詢
scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept
scala> val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
lines: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[68] at map at <console>:24
scala> val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
allDept: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[69] at map at <console>:28
scala> val df2 = allDept.toDF
df2: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]
scala> df2.create
createGlobalTempView createOrReplaceTempView createTempView
scala> df2.createOrReplaceTempView("dept")
scala> spark.sql("select dname,ename from emp12345,dept where emp12345.deptno=dept.deptno").show
+----------+------+
| dname| ename|
+----------+------+
| RESEARCH| SMITH|
| RESEARCH| JONES|
| RESEARCH| SCOTT|
| RESEARCH| ADAMS|
| RESEARCH| FORD|
|ACCOUNTING| CLARK|
|ACCOUNTING| KING|
|ACCOUNTING|MILLER|
| SALES| ALLEN|
| SALES| WARD|
| SALES|MARTIN|
| SALES| BLAKE|
| SALES|TURNER|
| SALES| JAMES|
+----------+------+
2.4 建立DataSet
2.4.1 通過case class
和DataFrame類似,只是把 toDF改為呼叫 toDS 方法
package SparkSQLExer
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object CreateDS {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
val tb = lines.map(t=>emp1(t(0).toInt,t(1),t(2).toInt))
import spark.sqlContext.implicits._
val df1 = tb.toDS()
df1.select($"name")
}
}
case class emp1(id:Int,name:String,age:Int)
2.4.2 通過序列Seq類物件
package SparkSQLExer
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object CreateDS {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
//建立一個序列物件,裡面都是emp1物件,對映的資料,然後直接toDS轉為DataSet
val ds1 = Seq(emp1(1,"king",20)).toDS()
ds1.printSchema()
}
}
case class emp1(id:Int,name:String,age:Int)
2.4.3 使用json格式檔案
定義case class
case class Person(name:String,age:BigInt)
使用JSON資料生成DataFrame
val df = spark.read.format("json").load("/usr/local/tmp_files/people.json")
將DataFrame轉換成DataSet
df.as[Person].show
df.as[Person] 是一個 DataSet
as[T]中的泛型需要是一個case class類,用於對映表頭
2.5 操作DataSet
DataSet支援的運算元其實就是rdd和DataFrame運算元的結合。
使用emp.json 生成DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
scala> empDF.show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename| hiredate| job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
| | 20| 7369| SMITH|1980/12/17| CLERK|7902| 800|
| 300| 30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500| 30| 7521| WARD| 1981/2/22| SALESMAN|7698|1250|
| | 20| 7566| JONES| 1981/4/2| MANAGER|7839|2975|
|1400| 30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
| | 30| 7698| BLAKE| 1981/5/1| MANAGER|7839|2850|
| | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450|
| | 20| 7788| SCOTT| 1987/4/19| ANALYST|7566|3000|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| 0| 30| 7844|TURNER| 1981/9/8| SALESMAN|7698|1500|
| | 20| 7876| ADAMS| 1987/5/23| CLERK|7788|1100|
| | 30| 7900| JAMES| 1981/12/3| CLERK|7698| 950|
| | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000|
| | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+
scala> empDF.where($"sal" >= 3000).show
+----+------+-----+-----+----------+---------+----+----+
|comm|deptno|empno|ename| hiredate| job| mgr| sal|
+----+------+-----+-----+----------+---------+----+----+
| | 20| 7788|SCOTT| 1987/4/19| ANALYST|7566|3000|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000|
+----+------+-----+-----+----------+---------+----+----+
#### empDF 轉換成 DataSet 需要 case class
scala> case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)
defined class Emp
scala> val empDS = empDF.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [comm: string, deptno: bigint ... 6 more fields]
scala> empDS.filter(_.sal > 3000).show
+----+------+-----+-----+----------+---------+---+----+
|comm|deptno|empno|ename| hiredate| job|mgr| sal|
+----+------+-----+-----+----------+---------+---+----+
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
+----+------+-----+-----+----------+---------+---+----+
scala> empDS.filter(_.deptno == 10).show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename| hiredate| job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
| | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+
多表查詢:
1、建立部門表
scala> val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
deptRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[154] at map at <console>:24
scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept
scala> val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]
scala> deptDS.show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
2、員工表
同上 empDS
empDS.join(deptDS,"deptno").where(xxxx) 連線兩個表,通過deptno欄位
empDS.joinWith(deptDS,deptDS("deptno")===empDS("deptno")) 這個用於連線的欄位名稱不一樣的情況
2.6 檢視view
如果想使用標準的sql語句來操作df或者ds物件時,必須先給df或者ds物件建立檢視,然後通過SparkSession物件的sql函式來對相應的檢視進行操作才可以。那麼檢視是什麼?
檢視是一個虛表,不儲存資料,可以當做是表的一個訪問連結。檢視有兩種型別:
普通檢視:也叫本地檢視,只在當前session會話中有效
全域性檢視:在全部session中都有效,全域性檢視建立在指定名稱空間中:global_temp 類似於一個庫
操作說明:
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
建立本地檢視:
empDF.createOrReplaceTempView(檢視名),檢視存在就會重新建立
empDF.createTempView(檢視名),如果檢視存在就不會建立
建立全域性檢視:
empDF.createGlobalTempView(檢視名)
對檢視執行sql操作,這裡檢視名就類似於表名
spark.sql("xxxxx")
例子:
empDF.createOrReplaceTempView("emp")
spark.sql("select * from emp").show
注意,只要建立了檢視,那麼就可以通過sparksession物件在任意一個類中操作檢視,也就是表。這個特性很好用,當我們要操作一些表時,可以一開始就讀取成df,然後建立成檢視,那麼就可以在任意一個地方查詢表了。
2.7 資料來源
通過SparkSession物件可以讀取不同格式的資料來源:
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
下面都用上面的spark代稱SparkSession。
2.7.1 SparkSession讀取資料的方式
1、load
spark.read.load(path):讀取指定路徑的檔案,要求檔案儲存格式為Parquet檔案
2、format
spark.read.format("格式").load(path) :指定讀取其他格式的檔案,如json
例子:
spark.read.format("json").load(path)
3、直接讀取其他格式檔案
spark.read.格式名(路徑),這是上面2中的一個簡寫方式,例子:
spark.read.json(路徑) json格式檔案
spark.read.text(路徑) 讀取文字檔案
注意:這些方式返回的都是 DataFrame 物件
2.7.2 SparkSession儲存資料的方式
可以將DataFrame 物件寫入到指定格式的檔案中,假設有個DataFrame 物件為df1.
1、save
df1.write.save(路徑)
他會將檔案儲存到這個目錄下,檔名spark隨機生成的,所以使用上面的讀取方式的時候,直接指定讀取目錄即可,不用指定檔名。輸出的檔案格式為 Parquet。可以直接指定hdfs的路徑,否則就儲存到本地
如:
df1.write.save("/test")
spark.read.load("/test")
2、直接指定格式儲存
df1.write.json(路徑) 這樣就會以json格式儲存檔案,生成的檔名的情況和上面類似
3、指定儲存模式
如果沒有指定儲存模式,輸出路徑存在的情況下,就會報錯
df1.write.mode("append").json(路徑)
mode("append") 就表示檔案存在時就追加
mode("overwrite") 表示覆蓋舊資料
4、儲存為表
df1.write.saveAsTable(表名) 會儲存在當前目錄的spark-warehouse 目錄下
5、format
df1.write.format(格式).save()
使用指定特定格式的方式來輸出儲存資料,比如儲存到MongoDB資料庫中
2.7.3 Parquet格式
這種一種列式儲存格式,具體原理可以看看之前hive的文章。這種格式是預設的儲存格式,使用load和save時預設的格式,操作方式很像前面說的,這裡不重複。這裡要講的是Parquet的一個特殊的功能,支援schema(表結構)的合併。例子:
scala> val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]
scala> df1.show
+------+------+
|single|double|
+------+------+
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
| 5| 10|
+------+------+
scala> sc.makeRDD(1 to 5)
res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:25
scala> sc.makeRDD(1 to 5).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5)
//匯出表1
scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1")
scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]
scala> df2.show
+------+------+
|single|triple|
+------+------+
| 6| 18|
| 7| 21|
| 8| 24|
| 9| 27|
| 10| 30|
+------+------+
//匯出表2
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")
scala> val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 1 more field]
//直接讀取會丟失欄位
scala> df3.show
+------+------+---+
|single|double|key|
+------+------+---+
| 8| null| 2|
| 9| null| 2|
| 10| null| 2|
| 3| 6| 1|
| 4| 8| 1|
| 5| 10| 1|
| 6| null| 2|
| 7| null| 2|
| 1| 2| 1|
| 2| 4| 1|
+------+------+---+
//加上option,指定"mergeSchema"為true,就可以合併
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]
scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
| 8| null| 24| 2|
| 9| null| 27| 2|
| 10| null| 30| 2|
| 3| 6| null| 1|
| 4| 8| null| 1|
| 5| 10| null| 1|
| 6| null| 18| 2|
| 7| null| 21| 2|
| 1| 2| null| 1|
| 2| 4| null| 1|
+------+------+------+---+
補充問題:key 是什麼?必須用key嘛?
key是不同表的一個區分欄位,在合併的時候,會作為合併之後的表的一個欄位,並且值等於key=xx 中設定的值
如果目錄下,兩個表的目錄名不一樣,是無法合併的,合併欄位名可以任意,
如:一個是key ,一個是 test 這兩個無法合併,必須統一key或者test
2.7.4 json檔案
這種一種帶表格式欄位的檔案,例子:
scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> peopleDF.createOrReplaceTempView("people")
scala> spark.sql("select * from people where age=19")
res25: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> spark.sql("select * from people where age=19").show
+---+------+
|age| name|
+---+------+
| 19|Justin|
+---+------+
scala> spark.sql("select age,count(1) from people group by age").show
+----+--------+
| age|count(1)|
+----+--------+
| 19| 1|
|null| 1|
| 30| 1|
+----+--------+
2.7.5 JDBC 連線
df物件支援通過jdbc連線資料庫,寫入資料到資料庫,或者從資料庫讀取資料。
例子:
1、通過jdbc 從mysql讀取資料:
使用 format(xx).option()的方式指定連線資料庫的一些引數,比如使用者名稱密碼,使用的連線驅動等
import java.util.Properties
import org.apache.spark.sql.SparkSession
object ConnMysql {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
//連線mysql方式1:
//建立properties配置物件,用於存放連線mysql的引數
val mysqlConn = new Properties()
mysqlConn.setProperty("user","root")
mysqlConn.setProperty("password","wjt86912572")
//使用jdbc連線,指定連線字串,表名,以及其他連線引數,並返回對應的dataframe
val mysqlDF1 = sparkS.read.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
mysqlDF1.printSchema()
mysqlDF1.show()
mysqlDF1.createTempView("customer")
sparkS.sql("select * from customer limit 2").show()
//連線mysql方式2,這種方式比較常用:
val mysqlConn2 = sparkS.read.format("jdbc")
.option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
.option("user","root")
.option("password","wjt86912572")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","customer").load()
mysqlConn2.printSchema()
}
}
這是兩種連線讀取資料的方式。
2、jdbc寫入資料到mysql
和讀取類似,只不過換成了write操作
import java.util.Properties
import org.apache.spark.sql.SparkSession
object WriteToMysql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("write to mysql").master("local").getOrCreate()
val df1 = spark.read.text("G:\\test\\t_stu.json")
//方式1:
df1.write.format("jdbc")
.option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
.option("user","root")
.option("password","wjt86912572")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","customer").save()
//方式2:
val mysqlConn = new Properties()
mysqlConn.setProperty("user","root")
mysqlConn.setProperty("password","wjt86912572")
df1.write.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
}
}
必須要保證df的表格式和寫入的mysql的表格式一樣,欄位名也要一樣
2.7.6 hive
1、通過jdbc連線hive
方式和普通jdbc類似,例如:
import java.util.Properties
import org.apache.spark.sql.SparkSession
/**
* 連線hive的情況有兩種:
* 1、如果是直接在ideal中執行spark程式的話,則必須在程式中指定jdbc連線的hiveserver的地址
* 且hiveserver必須以後臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連線hive
*
* 2、如果程式是打包到spark叢集中執行的話,一般spark叢集的conf目錄下,已經有hive client
* 的配置檔案,就會直接啟動hive client來連線hive。這時不需要啟動hiveserver服務。
* 這種方式是通過hive client連線hive
*/
object ConnHive {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
val properties = new Properties()
properties.setProperty("user","")
properties.setProperty("password","")
val hiveDF = spark.read.jdbc("jdbc:hive2://bigdata121:10000/default","customer",properties)
hiveDF.printSchema()
spark.stop()
}
}
這種方式要注意一點:
hiveserver必須以後臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連線hive。
2、通過hive client連線hive
這種方式一般用在生產中,因為任務一般都是通過spark-submit提交到叢集中執行,這時候就會直接通過hive client來連線hive,不會通過jdbc來連線了。
要注意:需要在spark的節點上都配置上hive client,然後將hive-site.xml配置檔案拷貝到 spark的conf目錄下。同時需要將hadoop的core-site.xml hdfs-site.xml也拷貝過去。另外一方面,因為要使用hive client,所以hive server那邊,一般都要配置metastore server,具體配置看hive的文章。
這樣在spark叢集中的程式就可以直接使用
spark.sql("xxxx").show
這樣的操作,預設就會從hive中讀取對應的表進行操作。不用另外做任何連線hive 的操作
或者直接到 spark-shell中,也是可以直接使用 上面的方式操作hive的表
例如:
import org.apache.spark.sql.SparkSession
object ConnHive02 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("spark sql conn hive").getOrCreate()
spark.sql("select * from customer").show()
}
}
這樣直接操作的就是 hive 的表了
2.8 小案例--讀取hive資料分析結果寫入mysql
import java.util.Properties
import org.apache.spark.sql.SparkSession
object HiveToMysql {
def main(args: Array[String]): Unit = {
//直接通過spark叢集中的hive client連線hive,不需要jdbc以及hive server
val spark = SparkSession.builder().appName("hive to mysql").enableHiveSupport().getOrCreate()
val resultDF = spark.sql("select * from default.customer")
//一般中間寫的處理邏輯都是處理從hive讀取的資料,處理完成後寫入到mysql
val mysqlConn = new Properties()
mysqlConn.setProperty("user","root")
mysqlConn.setProperty("password","wjt86912572")
//通過jdbc寫入mysql
resultDF.write.mode("append").jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn)
spark.stop()
}
}
三、效能優化
3.1 記憶體中快取資料
先啟動個spark-shell
spark-shell --master spark://bigdata121:7077
要在spark-shell中操作mysql,所以記得自己找個 mysql-connector的jar放到spark的jars目錄下
例子:
建立df,從mysql讀取表
scala> val mysqDF = spark.read.format("jdbc").option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","wjt86912572").option("driver","com.mysql.jdbc.Driver").option("dbtable","customer").load()
mysqDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> mysqDF.show
+---+------+--------------------+
| id| name| last_mod|
+---+------+--------------------+
| 1| neil|2019-07-20 17:09:...|
| 2| jack|2019-07-20 17:09:...|
| 3|martin|2019-07-20 17:09:...|
| 4| tony|2019-07-20 17:09:...|
| 5| eric|2019-07-20 17:09:...|
| 6| king|2019-07-20 17:42:...|
| 7| tao|2019-07-20 17:45:...|
+---+------+--------------------+
必須註冊成一張表,才可以快取。
scala> mysqDF.registerTempTable("customer")
warning: there was one deprecation warning; re-run with -deprecation for details
標識這張表可以被快取,但是現在資料並沒有直接快取
scala> spark.sqlContext.cacheTable("customer")
第一次查詢表,從mysql讀取資料,並快取到記憶體中
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id| name| last_mod|
+---+------+--------------------+
| 1| neil|2019-07-20 17:09:...|
| 2| jack|2019-07-20 17:09:...|
| 3|martin|2019-07-20 17:09:...|
| 4| tony|2019-07-20 17:09:...|
| 5| eric|2019-07-20 17:09:...|
| 6| king|2019-07-20 17:42:...|
| 7| tao|2019-07-20 17:45:...|
+---+------+--------------------+
這一次查詢從記憶體中返回
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id| name| last_mod|
+---+------+--------------------+
| 1| neil|2019-07-20 17:09:...|
| 2| jack|2019-07-20 17:09:...|
| 3|martin|2019-07-20 17:09:...|
| 4| tony|2019-07-20 17:09:...|
| 5| eric|2019-07-20 17:09:...|
| 6| king|2019-07-20 17:42:...|
| 7| tao|2019-07-20 17:45:...|
+---+------+--------------------+
清空快取
scala> spark.sqlContext.clearCache
3.2 調優相關引數
將資料快取到記憶體中的相關優化引數
spark.sql.inMemoryColumnarStorage.compressed
預設為 true
Spark SQL 將會基於統計資訊自動地為每一列選擇一種壓縮編碼方式。
spark.sql.inMemoryColumnarStorage.batchSize
預設值:10000
快取批處理大小。快取資料時, 較大的批處理大小可以提高記憶體利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。
其他效能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)
spark.sql.files.maxPartitionBytes
預設值:128 MB
讀取檔案時單個分割槽可容納的最大位元組數
spark.sql.files.openCostInBytes
預設值:4M
開啟檔案的估算成本, 按照同一時間能夠掃描的位元組數來測量。當往一個分割槽寫入多個檔案的時候會使用。高估更好, 這樣的話小檔案分割槽將比大檔案分割槽更快 (先被排程)。
spark.sql.autoBroadcastJoinThreshold
預設值:10M
用於配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大位元組大小。通過將這個值設定為 -1 可以禁用廣播。注意,當前資料統計僅支援已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
spark.sql.shuffle.partitions
預設值:200
用於配置 join 或聚合操作混洗(shuffle)資料時使用的分割槽數。