1. 程式人生 > 實用技巧 >大資料實踐(十一)SparkSQL模組基礎

大資料實踐(十一)SparkSQL模組基礎

SparkSQL是Spark的一個子模組,主要用於操作結構化資料,借鑑了Hive。

此前使用的是SparkCore模組的RDD結構進行資料處理,SparkSQL提供了結構化的資料結構DataFrame、DataSet。

SparkSQL支援SQL、DSL(domain-specific language)兩種方式、多種語言(Scala、Java、Python、R)進行開發,最後底層都轉換為RDD.

SparkSQL支援多種資料來源(Hive,Avro,Parquet,ORC,JSON 和 JDBC 等)、支援 HiveQL 語法以及 Hive SerDes 和 UDF,允許你訪問現有的 Hive 倉庫.

零、SparkSession、IDEA整合Spark

SparkSession:這是一個新入口,取代了原本的SQLContext與HiveContext。

SparkContext也可以通過SparkSession獲得。

1、SparkSession

互動式環境下啟動spark後,自帶一個變數spark

Spark context available as 'sc' (master = local[*], app id = local-1608185209816).
Spark session available as 'spark'.

#讀取檔案
scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]  

scala> text.show()
+--------------------+                                                          
|               value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
|    scala spark hive|
+--------------------+

在檔案程式中,使用builder方法獲取,本身沒有公開的相關構造器可以被使用。

val spark: SparkSession = SparkSession.builder().master("local[3]").getOrCreate()

通過SparkSession入口可以進行資料載入、儲存、執行SQL、以及獲得其他入口(sqlContext、SparkContext)

2、使用IDEA開發Spark

Spark使用本地開發,將其整合到IDEA中,首先需要有scalasdk,版本要和操作的spark適配。

其次IDEA需要已經安裝過scala開發外掛。

接下來在maven專案配置檔案中匯入以下依賴地址即可。

 	<!-- SparkCore,基礎模組 -->
	<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0-preview2</version>
        </dependency>

        <!-- SparkSQL,SQL模組 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0-preview2</version>
        </dependency>


        <!-- Spark和hive整合的模組,可用於處理hive中的資料 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0-preview2</version>
            <scope>provided</scope>
        </dependency>
<!--mysql驅動,與Mysql互動需要-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.22</version>
        </dependency>

如果日誌太多礙眼,可以調高日誌等級,第一行修改為ERROR,在resources目錄下使用log4j.properties檔案覆蓋預設日誌配置檔案。

#hadoop.root.logger=warn,console
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# Parquet related logging
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR

一、RDD、DataFrame、DataSet

RDD作為SparkCore的資料結構,可以處理結構化非結構化的資料,但效率較低。

SparkSQL中封裝了對結構化資料處理效果更好的DataFrame、DateSet.

RDD以行為單位,對行有資料型別。

DataFrame來自pandas庫的資料結構,相比於RDD提供了欄位型別,DataFrame 內部的有明確 Scheme 結構,即列名、列欄位型別都是已知的,這帶來的好處是可以減少資料讀取以及更好地優化執行計劃,從而保證查詢效率。

Dataset 也是分散式的資料集合,在 Spark 1.6 版本被引入,它集成了 RDD 和 DataFrame
的優點,具備強型別的特點,同時支援 Lambda 函式,但只能在 Scala 和 Java 語言中使用。

三者可以互相轉換。

對文字檔案返回了 Dataset[String]

scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]

scala> text.show()
+--------------------+                                                          
|               value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
|    scala spark hive|
+--------------------+


#結構
scala> text.printSchema
root
 |-- value: string (nullable = true)
#返回一個new session
scala> val nsp = spark.newSession
nsp: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c01b

#讀取json預設返回DF
scala> val user = nsp.read.json("/usr/local/bigdata/file/user.json")
user: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]

#定義一個樣例類,作為資料型別
scala> case class User(id :Long,username:String,email:String){}
defined class User

#加入資料型別,返回ds
scala> val userds = nsp.read.json("/usr/local/bigdata/file/user.json").as[User]
userds: org.apache.spark.sql.Dataset[User] = [email: string, id: bigint ... 1 more field]

內部結構

scala> userds.printSchema
root
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- username: string (nullable = true)

互相轉換:

scala> val udf = user.rdd
udf: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at rdd at <console>:25

scala> userds.toDF
res3: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]

二、SQL、DSL

SparkSQL可以使用SQL程式設計,也可以基於特定語言,呼叫api進行程式設計。

1、SQL

使用SQL程式設計有兩步:

​ 1、建立臨時檢視。(有表或檢視才能執行sql,spark可以建立臨時檢視)

​ 2、編寫sql語句。

#建立檢視users
scala> userds.createOrReplaceTempView("users")

#編寫語句
scala> nsp.sql("select id,username from users order by id desc").show
+---+--------+
| id|username|
+---+--------+
|  3|     Jim|
|  2|    jack|
|  1|    Bret|
+---+--------+

/*
createGlobalTempView            createOrReplaceTempView   crossJoin   
createOrReplaceGlobalTempView   createTempView 
*/

如果在檔案程式中編寫,需要匯入以下隱式轉換。

    import spark.implicits._ #spark是變數名
    import org.apache.spark.sql.functions._
2、DSL

DSL是特定語言,如scala、java等。

呼叫api進行處理。

#scala DSL程式設計
scala> userds.select('id,'username).orderBy(desc("id")).show()
+---+--------+
| id|username|
+---+--------+
|  3|     Jim|
|  2|    jack|
|  1|    Bret|
+---+--------+

使用Java進行程式設計。

 final SparkSession spark = SparkSession.builder().master("local[3]").getOrCreate();

        Dataset<Row> dataset = spark.read().json("spark/data/user3.json");

        try{
            dataset.createTempView("users");
			//SQL
            spark.sql("select id,username from users order by id desc").show();
        }catch (Exception e){
            e.printStackTrace();
        }
		
		//DSL
		dataset.filter(dataset.col("id").geq(2)).orderBy(dataset.col("id").desc()).show();
		
        spark.close();

三、UDF

UDF:Spark和Hive一樣,允許使用者自定義函式來補充功能。

SQL方式和DSL方式都支援。

1、SQL方式

#註冊函式
scala> nsp.udf.register("plus0",(id:String)=>0+id)
res11: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3815/2132917061@2c634771,StringType,List(Some(Schema(StringType,true))),Some(plus0),true,true)

scala> nsp.sql("select plus0(id),username from users order by id desc").show
+---------+--------+
|plus0(id)|username|
+---------+--------+
|       03|     Jim|
|       02|    jack|
|       01|    Bret|
+---------+--------+

#註冊函式
scala> nsp.udf.register("plus1",(id:Int)=>1+id)
res13: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3857/1612958077@7799dc9a,IntegerType,List(Some(Schema(IntegerType,false))),Some(plus1),false,true)

scala> nsp.sql("select plus1(id),username from users order by id desc").show
+---------+--------+
|plus1(id)|username|
+---------+--------+
|        4|     Jim|
|        3|    jack|
|        2|    Bret|
+---------+--------+


2、DSL方式

                       
#定義函式
scala> val prefix_name =udf(
     | (name:String)=>{ "user: "+name})

prefix_name: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3567/441398584@40f06edc,StringType,List(Some(Schema(StringType,true))),None,true,true)

#套入欄位名
scala> userds.select($"id",prefix_name('username).as("newname")).orderBy(desc("id")).show()
+---+----------+
| id|   newname|
+---+----------+
|  3| user: Jim|
|  2|user: jack|
|  1|user: Bret|
+---+----------+

Java編寫

	#接收引數名和一個函式介面
spark.udf().register("prefix_name", new UDF1<String, String>() {

           @Override
           public String call(String s) throws Exception {
               return "user: "+s;
           }
       }, DataTypes.StringType);


        try{
            dataset.createTempView("users");
	#和SQL一樣使用
            spark.sql("select id,prefix_name(username) as new_name from users order by id desc").show();
        }catch (Exception e){
            e.printStackTrace();
        }

+---+----------+
| id|  new_name|
+---+----------+
|  3| user: Jim|
|  2|user: jack|
|  1|user: Bret|
+---+----------+

四、Spark on Hive

SparkSQL整合Hive,載入讀取Hive表資料進行分析,稱之為Spark on Hive;

Hive 框架底層分割槽引擎,可以將MapReduce改為Spark,稱之為Hive on Spark;

Spark on Hive 相當於Spark使用Hive的資料。

Hive on Spark相當於Hive把自身的計算從MR換成SparkRDD.

可以通過以下方式進行Spark on Hive.

1、Sparksql

spark-sql是spark提供的一種互動模式,使用sql語句進行處理,預設就是使用hive中的資料。

需要將mysql驅動放到jar目錄下,或者指定驅動。

spark-sql --master local[*]

#指定驅動
spark-sql  --master local[*] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar

2、在IDEA中使用spark on hive

新增spark操作hive的依賴。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.0.0-preview2</version>
    </dependency>

core-site.xml,hdfs-site.xml,hive-site.xml都拷貝到Rsource目錄下.

可以使用Spark 操作Hive資料了。

 	val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf)
			 .enableHiveSupport()//!!!!預設不支援外部hive,這裡需呼叫方法支援外部hive
			.getOrCreate()

    import spark.implicits._


    spark.sql("use spark_sql_hive") 


    spark.sql(
      """
        |select t.id,t.name,t.email from
        |(select cast('id' as INT) ,id ,name,email from user where id >1001) t
      """.stripMargin).show()