1. 程式人生 > 其它 >spark sql原理及使用 基本使用mysql hive rdd轉為dataframe

spark sql原理及使用 基本使用mysql hive rdd轉為dataframe

技術標籤:sparksqlsparkspark

一、spark SQL概述

1.1 什麼是spark SQL

​ Spark SQL是Spark用來處理結構化資料的一個模組,它提供了一個程式設計抽象叫做DataFrame並且作為分散式SQL查詢引擎的作用。類似於hive的作用。

1.2 spark SQL的特點

1、容易整合:安裝Spark的時候,已經整合好了。不需要單獨安裝。
2、統一的資料訪問方式:JDBC、JSON、Hive、parquet檔案(一種列式儲存檔案,是SparkSQL預設的資料來源,hive中也支援)
3、完全相容Hive。可以將Hive中的資料,直接讀取到Spark SQL中處理。

一般在生產中,基本都是使用hive做資料倉庫儲存資料,然後用spark從hive讀取資料進行處理。
4、支援標準的資料連線:JDBC、ODBC
5、計算效率比基於mr的hive高,而且hive2.x版本中,hive建議使用spark作為執行引擎

二、spark SQL基本原理

2.1 DataFrame和DataSet基本概念

2.1.1 DataFrame

DataFrame是組織成命名列的資料集。它在概念上等同於關係資料庫中的表,裡面有表的結構以及資料,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,
例如:
結構化資料檔案
hive中的表
外部資料庫或現有RDDs
DataFrame API支援的語言有Scala,Java,Python和R。

​ 比起RDD,DataFrame多了資料的結構資訊,即schema。RDD是分散式的 Java物件的集合。DataFrame是分散式的Row物件的集合。DataFrame除了提供了比RDD更豐富的運算元以外,更重要的特點是提升執行效率、減少資料讀取以及執行計劃的優化。

2.1.2 DataSet

Dataset是一個分散式的資料收集器。這是在Spark1.6之後新加的一個介面,兼顧了RDD的優點(強型別,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)

2.2 建立DataFrame的方式

2.2.1 SparkSession物件

​ Apache Spark 2.0引入了SparkSession,其為使用者提供了一個統一的切入點來使用Spark的各項功能,並且允許使用者通過它呼叫DataFrame和Dataset相關API來編寫Spark程式。最重要的是,它減少了使用者需要了解的一些概念,使得我們可以很容易地與Spark互動。
​在2.0版本之前,與Spark互動之前必須先建立SparkConf和SparkContext。然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地建立SparkConf, SparkContext 以及 SQLContext,因為這些物件已經封裝在SparkSession中。
​ 要注意一點,在我用的這個spark版本中,直接使用new SQLContext() 來建立SQLContext物件,會顯示該方式已經被棄用了(IDEA中會顯示已棄用),建議使用SparkSession來獲取SQLContext物件。

2.2.2 通過case class樣本類

這種方式在scala中比較常用,因為case class是scala的特色

/**
表 t_stu 的結構為:
id name age
*/

object CreateDF {
  def main(args: Array[String]): Unit = {
    //這是最新的獲取SQLContext物件的方式
    //2、建立SparkSession物件,設定master,appname
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    //3、通過spark獲取sparkContext物件,讀取資料
    val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //4、將資料對映到case class中,也就是資料對映到表的對應欄位中
    val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
    //這裡必須要加上隱式轉換,否則無法呼叫 toDF 函式
    import spark.sqlContext.implicits._

    //5、生成df
    val df2 = tb.toDF()

    //相當於select name from t_stu
    df1.select($"name").show()

    //關閉spark物件
    spark.stop()
  }
}

/*1、定義case class,每個屬性對應表中的欄位名以及型別
     一般生產中為了方便,會全部定義為string型別,然後有需要的時候
     才根據實際情況將string轉為需要的型別
   這一步相當於定義表的結構
*/
case class emp(id:Int,name:String,age:Int)

總結步驟為:

1、定義case class,用來表結構
2、建立sparkSession物件,用來讀取資料
3、將rdd中的資料和case class對映
4、呼叫 toDF 函式將rdd轉為 DataFrame

2.2.3 通過StructType類

這種方式java比較常用

package SparkSQLExer

import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

/**
  * 建立dataschema方式2:
  * 通過spark session物件建立,表結構通過StructType建立
  */
object CreateDF02 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()

    //1、通過StructType建立表結構schema,裡面表的每個欄位使用 StructField定義
    val tbSchema = StructType(List(
        StructField("id",DataTypes.IntegerType),
        StructField("name",DataTypes.StringType),
        StructField("age",DataTypes.IntegerType)
      ))

    //2、讀取資料
    var lines = sparkS.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //3、將資料對映為ROW物件
    val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))

    //4、建立表結構和表資料對映,返回的就是df
    val df2 = sparkS.createDataFrame(rdd1, tbSchema)

    //打印表結構
    df2.printSchema()

    sparkS.stop()

  }

}

總結步驟為:

1、通過StructType建立表結構schema,裡面表的每個欄位使用 StructField定義
2、通過sparkSession.sparkContext讀取資料
3、將資料對映格式為Row物件
4、將StructType和資料Row物件對映,返回df

2.2.4 使用json等有表格式的檔案型別

package SparkSQLExer

import org.apache.spark.sql.SparkSession

/**
  * 建立df方式3:通過有格式的檔案直接匯入資料以及表結構,比如json格式的檔案
  * 返回的直接就是一個DF
  */
object CreateDF03 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()

    //讀取json方式1:
    val jsonrdd1= sparkS.read.json("path")

    //讀取json方式2:
    val jsonrdd1= sparkS.read.format("json").load("path")

    sparkS.stop()
  }
}

這種方式比較簡單,就是直接讀取json檔案而已
sparkS.read.xxxx讀取任意檔案時,返回的都是DF物件

2.3 操作DataFrame

2.3.1 DSL語句

DSL語句其實就是將sql語句的一些操作轉為類似函式的方式去呼叫,比如:

df1.select("name").show

例子:

為了方便,直接在spark-shell裡操作了,
spark-shell --master spark://bigdata121:7077

1、打印表結構
scala> df1.printSchema
root
|-- empno: integer (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: integer (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: integer (nullable = true)
|-- comm: integer (nullable = true)
|-- deptno: integer (nullable = true)

2、顯示當前df的表資料或者查詢結果的資料
scala> df1.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|   0|    20|
| 7900| JAMES|    CLERK|7698| 1981/12/3| 950|   0|    30|
| 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|   0|    10|
+-----+------+---------+----+----------+----+----+------+

3、執行select, 相當於select xxx form  xxx where xxx
scala> df1.select("ename","sal").where("sal>2000").show
+------+----+
| ename| sal|
+------+----+
| SMITH| 800|
| ALLEN|1600|
|  WARD|1250|
| JONES|2975|
|MARTIN|1250|
| BLAKE|2850|
| CLARK|2450|
| SCOTT|3000|
|  KING|5000|
|TURNER|1500|
| ADAMS|1100|
| JAMES| 950|
|  FORD|3000|
|MILLER|1300|
+------+----+

4、對某些列進行操作
對某個指定進行操作時,需要加上$符號,然後後面才能操作
$代表 取出來以後,再做一些操作。
注意:這個 $ 的用法在ideal中無法正常使用,解決方法下面說
scala> df1.select($"ename",$"sal",$"sal"+100).show
+------+----+-----------+
| ename| sal|(sal + 100)|
+------+----+-----------+
| SMITH| 800|        900|
| ALLEN|1600|       1700|
|  WARD|1250|       1350|
| JONES|2975|       3075|
|MARTIN|1250|       1350|
| BLAKE|2850|       2950|
| CLARK|2450|       2550|
| SCOTT|3000|       3100|
|  KING|5000|       5100|
|TURNER|1500|       1600|
| ADAMS|1100|       1200|
| JAMES| 950|       1050|
|  FORD|3000|       3100|
|MILLER|1300|       1400|
+------+----+-----------+

5、過濾行
scala> df1.filter($"sal">2000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7698|BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782|CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788|SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7902| FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
+-----+-----+---------+----+----------+----+----+------+

6、分組以及計數
scala> df1.groupBy($"deptno").count.show
+------+-----+                                                                  
|deptno|count|
+------+-----+
|    20|    5|
|    10|    3|
|    30|    6|
+------+-----+

上面說到在idea中 select($"name")中無法正常使用,解決方法為:

在該語句之前加上這麼一句:
import spark.sqlContext.implicits._

主要還是因為型別的問題,加上隱式轉換就好了

2.3.2 sql語句

df物件不能直接執行sql。需要生成一個檢視,再執行SQL。
需要指定建立的檢視名稱,後面檢視名稱就相當於表名。
檢視後面還會細說,這裡先有個概念
例子:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
。。。。。。。。。。。。。。
//通過df物件建立臨時檢視。檢視名就相當於表名
df1.createOrReplaceTempView("emp")

//通過sparksession物件執行執行
spark.sql("select * from emp").show
spark.sql("select * from emp where sal > 2000").show
spark.sql("select deptno,count(1) from emp group by deptno").show

//可以建立多個檢視,不衝突
df1.createOrReplaceTempView("emp12345")
spark.sql("select e.deptno from emp12345 e").show

2.3.3 多表查詢

scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept

scala> val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
lines: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[68] at map at <console>:24

scala> val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
allDept: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[69] at map at <console>:28

scala> val df2 = allDept.toDF
df2: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]

scala> df2.create
createGlobalTempView   createOrReplaceTempView   createTempView

scala> df2.createOrReplaceTempView("dept")

scala> spark.sql("select dname,ename from emp12345,dept where emp12345.deptno=dept.deptno").show
+----------+------+                                                             
|     dname| ename|
+----------+------+
|  RESEARCH| SMITH|
|  RESEARCH| JONES|
|  RESEARCH| SCOTT|
|  RESEARCH| ADAMS|
|  RESEARCH|  FORD|
|ACCOUNTING| CLARK|
|ACCOUNTING|  KING|
|ACCOUNTING|MILLER|
|     SALES| ALLEN|
|     SALES|  WARD|
|     SALES|MARTIN|
|     SALES| BLAKE|
|     SALES|TURNER|
|     SALES| JAMES|
+----------+------+

2.4 建立DataSet

2.4.1 通過case class

和DataFrame類似,只是把 toDF改為呼叫 toDS 方法

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
    val tb = lines.map(t=>emp1(t(0).toInt,t(1),t(2).toInt))
    import spark.sqlContext.implicits._
    val df1 = tb.toDS()
    df1.select($"name")

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.2 通過序列Seq類物件

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

    //建立一個序列物件,裡面都是emp1物件,對映的資料,然後直接toDS轉為DataSet
    val ds1 = Seq(emp1(1,"king",20)).toDS()
    ds1.printSchema()

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.3 使用json格式檔案

定義case class
case class Person(name:String,age:BigInt)

使用JSON資料生成DataFrame
val df = spark.read.format("json").load("/usr/local/tmp_files/people.json")

將DataFrame轉換成DataSet
df.as[Person].show

df.as[Person] 是一個 DataSet
as[T]中的泛型需要是一個case class類,用於對映表頭

2.5 操作DataSet

DataSet支援的運算元其實就是rdd和DataFrame運算元的結合。

使用emp.json 生成DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")

scala> empDF.show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    20| 7369| SMITH|1980/12/17|    CLERK|7902| 800|
| 300|    30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500|    30| 7521|  WARD| 1981/2/22| SALESMAN|7698|1250|
|    |    20| 7566| JONES|  1981/4/2|  MANAGER|7839|2975|
|1400|    30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
|    |    30| 7698| BLAKE|  1981/5/1|  MANAGER|7839|2850|
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    20| 7788| SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|   0|    30| 7844|TURNER|  1981/9/8| SALESMAN|7698|1500|
|    |    20| 7876| ADAMS| 1987/5/23|    CLERK|7788|1100|
|    |    30| 7900| JAMES| 1981/12/3|    CLERK|7698| 950|
|    |    20| 7902|  FORD| 1981/12/3|  ANALYST|7566|3000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

scala> empDF.where($"sal" >= 3000).show
+----+------+-----+-----+----------+---------+----+----+
|comm|deptno|empno|ename|  hiredate|      job| mgr| sal|
+----+------+-----+-----+----------+---------+----+----+
|    |    20| 7788|SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839| KING|1981/11/17|PRESIDENT|    |5000|
|    |    20| 7902| FORD| 1981/12/3|  ANALYST|7566|3000|
+----+------+-----+-----+----------+---------+----+----+

#### empDF 轉換成 DataSet 需要 case class

scala> case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)
defined class Emp

scala> val empDS = empDF.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [comm: string, deptno: bigint ... 6 more fields]

scala> empDS.filter(_.sal > 3000).show
+----+------+-----+-----+----------+---------+---+----+
|comm|deptno|empno|ename|  hiredate|      job|mgr| sal|
+----+------+-----+-----+----------+---------+---+----+
|    |    10| 7839| KING|1981/11/17|PRESIDENT|   |5000|
+----+------+-----+-----+----------+---------+---+----+

scala> empDS.filter(_.deptno == 10).show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

多表查詢:

1、建立部門表
scala> val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
deptRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[154] at map at <console>:24

scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept

scala> val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]

scala> deptDS.show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

2、員工表
同上 empDS

empDS.join(deptDS,"deptno").where(xxxx) 連線兩個表,通過deptno欄位
empDS.joinWith(deptDS,deptDS("deptno")===empDS("deptno")) 這個用於連線的欄位名稱不一樣的情況

2.6 檢視view

​ 如果想使用標準的sql語句來操作df或者ds物件時,必須先給df或者ds物件建立檢視,然後通過SparkSession物件的sql函式來對相應的檢視進行操作才可以。那麼檢視是什麼?
​ 檢視是一個虛表,不儲存資料,可以當做是表的一個訪問連結。檢視有兩種型別:
普通檢視:也叫本地檢視,只在當前session會話中有效
全域性檢視:在全部session中都有效,全域性檢視建立在指定名稱空間中:global_temp 類似於一個庫
操作說明:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")

建立本地檢視:
empDF.createOrReplaceTempView(檢視名),檢視存在就會重新建立
empDF.createTempView(檢視名),如果檢視存在就不會建立

建立全域性檢視:
empDF.createGlobalTempView(檢視名)

對檢視執行sql操作,這裡檢視名就類似於表名
spark.sql("xxxxx")

例子:
empDF.createOrReplaceTempView("emp")
spark.sql("select * from emp").show

注意,只要建立了檢視,那麼就可以通過sparksession物件在任意一個類中操作檢視,也就是表。這個特性很好用,當我們要操作一些表時,可以一開始就讀取成df,然後建立成檢視,那麼就可以在任意一個地方查詢表了。

2.7 資料來源

通過SparkSession物件可以讀取不同格式的資料來源:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

下面都用上面的spark代稱SparkSession。

2.7.1 SparkSession讀取資料的方式

1、load
spark.read.load(path):讀取指定路徑的檔案,要求檔案儲存格式為Parquet檔案

2、format
spark.read.format("格式").load(path) :指定讀取其他格式的檔案,如json
例子:
spark.read.format("json").load(path)

3、直接讀取其他格式檔案
spark.read.格式名(路徑),這是上面2中的一個簡寫方式,例子:
spark.read.json(路徑)  json格式檔案
spark.read.text(路徑)  讀取文字檔案

注意:這些方式返回的都是 DataFrame 物件

2.7.2 SparkSession儲存資料的方式

可以將DataFrame 物件寫入到指定格式的檔案中,假設有個DataFrame 物件為df1.

1、save
df1.write.save(路徑) 
他會將檔案儲存到這個目錄下,檔名spark隨機生成的,所以使用上面的讀取方式的時候,直接指定讀取目錄即可,不用指定檔名。輸出的檔案格式為 Parquet。可以直接指定hdfs的路徑,否則就儲存到本地
如:
df1.write.save("/test")
spark.read.load("/test")

2、直接指定格式儲存
df1.write.json(路徑)  這樣就會以json格式儲存檔案,生成的檔名的情況和上面類似

3、指定儲存模式
如果沒有指定儲存模式,輸出路徑存在的情況下,就會報錯
df1.write.mode("append").json(路徑) 
mode("append") 就表示檔案存在時就追加
mode("overwrite") 表示覆蓋舊資料

4、儲存為表
df1.write.saveAsTable(表名) 會儲存在當前目錄的spark-warehouse 目錄下

5、format
df1.write.format(格式).save()
使用指定特定格式的方式來輸出儲存資料,比如儲存到MongoDB資料庫中

2.7.3 Parquet格式

​ 這種一種列式儲存格式,具體原理可以看看之前hive的文章。這種格式是預設的儲存格式,使用load和save時預設的格式,操作方式很像前面說的,這裡不重複。這裡要講的是Parquet的一個特殊的功能,支援schema(表結構)的合併。例子:

scala> val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]

scala> df1.show
+------+------+
|single|double|
+------+------+
|     1|     2|
|     2|     4|
|     3|     6|
|     4|     8|
|     5|    10|
+------+------+

scala> sc.makeRDD(1 to 5)
res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:25

scala> sc.makeRDD(1 to 5).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5)

//匯出表1
scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1")

scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]

scala> df2.show
+------+------+
|single|triple|
+------+------+
|     6|    18|
|     7|    21|
|     8|    24|
|     9|    27|
|    10|    30|
+------+------+

//匯出表2
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")

scala> val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 1 more field]

//直接讀取會丟失欄位
scala> df3.show
+------+------+---+
|single|double|key|
+------+------+---+
|     8|  null|  2|
|     9|  null|  2|
|    10|  null|  2|
|     3|     6|  1|
|     4|     8|  1|
|     5|    10|  1|
|     6|  null|  2|
|     7|  null|  2|
|     1|     2|  1|
|     2|     4|  1|
+------+------+---+

//加上option,指定"mergeSchema"為true,就可以合併
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]

scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
|     8|  null|    24|  2|
|     9|  null|    27|  2|
|    10|  null|    30|  2|
|     3|     6|  null|  1|
|     4|     8|  null|  1|
|     5|    10|  null|  1|
|     6|  null|    18|  2|
|     7|  null|    21|  2|
|     1|     2|  null|  1|
|     2|     4|  null|  1|
+------+------+------+---+

補充問題:key 是什麼?必須用key嘛?
key是不同表的一個區分欄位,在合併的時候,會作為合併之後的表的一個欄位,並且值等於key=xx 中設定的值
如果目錄下,兩個表的目錄名不一樣,是無法合併的,合併欄位名可以任意,
如:一個是key ,一個是 test 這兩個無法合併,必須統一key或者test

2.7.4 json檔案

這種一種帶表格式欄位的檔案,例子:

scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

scala> peopleDF.createOrReplaceTempView("people")

scala> spark.sql("select * from people where age=19")
res25: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> spark.sql("select * from people where age=19").show
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+

scala> spark.sql("select age,count(1) from people group by  age").show
+----+--------+                                                                 
| age|count(1)|
+----+--------+
|  19|       1|
|null|       1|
|  30|       1|
+----+--------+

2.7.5 JDBC 連線

df物件支援通過jdbc連線資料庫,寫入資料到資料庫,或者從資料庫讀取資料。
例子:
1、通過jdbc 從mysql讀取資料:

使用 format(xx).option()的方式指定連線資料庫的一些引數,比如使用者名稱密碼,使用的連線驅動等

import java.util.Properties

import org.apache.spark.sql.SparkSession

object ConnMysql {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    //連線mysql方式1:
    //建立properties配置物件,用於存放連線mysql的引數
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //使用jdbc連線,指定連線字串,表名,以及其他連線引數,並返回對應的dataframe
    val mysqlDF1 = sparkS.read.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
    mysqlDF1.printSchema()
    mysqlDF1.show()
    mysqlDF1.createTempView("customer")
    sparkS.sql("select * from customer limit 2").show()

    //連線mysql方式2,這種方式比較常用:
    val mysqlConn2 = sparkS.read.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")              
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").load()

    mysqlConn2.printSchema()
  }
}

這是兩種連線讀取資料的方式。

2、jdbc寫入資料到mysql

和讀取類似,只不過換成了write操作

import java.util.Properties

import org.apache.spark.sql.SparkSession

object WriteToMysql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("write to mysql").master("local").getOrCreate()

    val df1 = spark.read.text("G:\\test\\t_stu.json")

    //方式1:
    df1.write.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").save()

    //方式2:
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    df1.write.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)

  }

}

必須要保證df的表格式和寫入的mysql的表格式一樣,欄位名也要一樣

2.7.6 hive

1、通過jdbc連線hive
方式和普通jdbc類似,例如:

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  * 連線hive的情況有兩種:
  * 1、如果是直接在ideal中執行spark程式的話,則必須在程式中指定jdbc連線的hiveserver的地址
  * 且hiveserver必須以後臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連線hive
  *
  * 2、如果程式是打包到spark叢集中執行的話,一般spark叢集的conf目錄下,已經有hive client
  * 的配置檔案,就會直接啟動hive client來連線hive。這時不需要啟動hiveserver服務。
  * 這種方式是通過hive client連線hive
  */
object ConnHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    val properties = new Properties()
    properties.setProperty("user","")
    properties.setProperty("password","")

    val hiveDF = spark.read.jdbc("jdbc:hive2://bigdata121:10000/default","customer",properties)
    hiveDF.printSchema()
    spark.stop()
  }
}

這種方式要注意一點:
hiveserver必須以後臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連線hive。

2、通過hive client連線hive
​ 這種方式一般用在生產中,因為任務一般都是通過spark-submit提交到叢集中執行,這時候就會直接通過hive client來連線hive,不會通過jdbc來連線了。
​要注意:需要在spark的節點上都配置上hive client,然後將hive-site.xml配置檔案拷貝到 spark的conf目錄下。同時需要將hadoop的core-site.xml hdfs-site.xml也拷貝過去。另外一方面,因為要使用hive client,所以hive server那邊,一般都要配置metastore server,具體配置看hive的文章。
​ 這樣在spark叢集中的程式就可以直接使用

spark.sql("xxxx").show

這樣的操作,預設就會從hive中讀取對應的表進行操作。不用另外做任何連線hive 的操作

或者直接到 spark-shell中,也是可以直接使用 上面的方式操作hive的表
例如:

import org.apache.spark.sql.SparkSession

object ConnHive02 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("spark sql conn hive").getOrCreate()
    spark.sql("select * from customer").show()
  }

}

這樣直接操作的就是 hive 的表了

2.8 小案例--讀取hive資料分析結果寫入mysql

import java.util.Properties

import org.apache.spark.sql.SparkSession

object HiveToMysql {
  def main(args: Array[String]): Unit = {
    //直接通過spark叢集中的hive client連線hive,不需要jdbc以及hive server
    val spark = SparkSession.builder().appName("hive to mysql").enableHiveSupport().getOrCreate()
    val resultDF = spark.sql("select * from default.customer")

    //一般中間寫的處理邏輯都是處理從hive讀取的資料,處理完成後寫入到mysql

    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //通過jdbc寫入mysql
  resultDF.write.mode("append").jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn)

    spark.stop()
  }

}

三、效能優化

3.1 記憶體中快取資料

先啟動個spark-shell

spark-shell --master spark://bigdata121:7077

要在spark-shell中操作mysql,所以記得自己找個 mysql-connector的jar放到spark的jars目錄下

例子:

建立df,從mysql讀取表
scala> val mysqDF = spark.read.format("jdbc").option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","wjt86912572").option("driver","com.mysql.jdbc.Driver").option("dbtable","customer").load()
mysqDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> mysqDF.show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

必須註冊成一張表,才可以快取。
scala> mysqDF.registerTempTable("customer")
warning: there was one deprecation warning; re-run with -deprecation for details

標識這張表可以被快取,但是現在資料並沒有直接快取
scala> spark.sqlContext.cacheTable("customer")

第一次查詢表,從mysql讀取資料,並快取到記憶體中
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

這一次查詢從記憶體中返回
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

清空快取
scala> spark.sqlContext.clearCache

3.2 調優相關引數

將資料快取到記憶體中的相關優化引數
   spark.sql.inMemoryColumnarStorage.compressed
   預設為 true
   Spark SQL 將會基於統計資訊自動地為每一列選擇一種壓縮編碼方式。

   spark.sql.inMemoryColumnarStorage.batchSize
   預設值:10000
   快取批處理大小。快取資料時, 較大的批處理大小可以提高記憶體利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。

其他效能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)
   spark.sql.files.maxPartitionBytes
   預設值:128 MB
   讀取檔案時單個分割槽可容納的最大位元組數

   spark.sql.files.openCostInBytes
   預設值:4M
   開啟檔案的估算成本, 按照同一時間能夠掃描的位元組數來測量。當往一個分割槽寫入多個檔案的時候會使用。高估更好, 這樣的話小檔案分割槽將比大檔案分割槽更快 (先被排程)。

spark.sql.autoBroadcastJoinThreshold
   預設值:10M
   用於配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大位元組大小。通過將這個值設定為 -1 可以禁用廣播。注意,當前資料統計僅支援已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

spark.sql.shuffle.partitions
   預設值:200
   用於配置 join 或聚合操作混洗(shuffle)資料時使用的分割槽數。