大資料實踐(十一)SparkSQL模組基礎
SparkSQL是Spark的一個子模組,主要用於操作結構化資料,借鑑了Hive。
此前使用的是SparkCore模組的RDD結構進行資料處理,SparkSQL提供了結構化的資料結構DataFrame、DataSet。
SparkSQL支援SQL、DSL(domain-specific language)兩種方式、多種語言(Scala、Java、Python、R)進行開發,最後底層都轉換為RDD.
SparkSQL支援多種資料來源(Hive,Avro,Parquet,ORC,JSON 和 JDBC 等)、支援 HiveQL 語法以及 Hive SerDes 和 UDF,允許你訪問現有的 Hive 倉庫.
零、SparkSession、IDEA整合Spark
SparkSession:這是一個新入口,取代了原本的SQLContext與HiveContext。
SparkContext也可以通過SparkSession獲得。
1、SparkSession
互動式環境下啟動spark
後,自帶一個變數spark
Spark context available as 'sc' (master = local[*], app id = local-1608185209816). Spark session available as 'spark'. #讀取檔案 scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt") text: org.apache.spark.sql.Dataset[String] = [value: string] scala> text.show() +--------------------+ | value| +--------------------+ |java hadoop spark...| |spark scala spark...| | scala spark hive| +--------------------+
在檔案程式中,使用builder
方法獲取,本身沒有公開的相關構造器可以被使用。
val spark: SparkSession = SparkSession.builder().master("local[3]").getOrCreate()
通過SparkSession入口可以進行資料載入、儲存、執行SQL、以及獲得其他入口(sqlContext、SparkContext)
2、使用IDEA開發Spark
Spark使用本地開發,將其整合到IDEA中,首先需要有scala
的sdk
,版本要和操作的spark
適配。
其次IDEA需要已經安裝過scala
開發外掛。
接下來在maven
專案配置檔案中匯入以下依賴地址即可。
<!-- SparkCore,基礎模組 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
<!-- SparkSQL,SQL模組 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
<!-- Spark和hive整合的模組,可用於處理hive中的資料 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0-preview2</version>
<scope>provided</scope>
</dependency>
<!--mysql驅動,與Mysql互動需要-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
如果日誌太多礙眼,可以調高日誌等級,第一行修改為ERROR,在resources
目錄下使用log4j.properties
檔案覆蓋預設日誌配置檔案。
#hadoop.root.logger=warn,console
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# Parquet related logging
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR
一、RDD、DataFrame、DataSet
RDD作為SparkCore的資料結構,可以處理結構化非結構化的資料,但效率較低。
SparkSQL中封裝了對結構化資料處理效果更好的DataFrame、DateSet.
RDD以行為單位,對行有資料型別。
DataFrame來自pandas庫的資料結構,相比於RDD提供了欄位型別,DataFrame 內部的有明確 Scheme 結構,即列名、列欄位型別都是已知的,這帶來的好處是可以減少資料讀取以及更好地優化執行計劃,從而保證查詢效率。
Dataset 也是分散式的資料集合,在 Spark 1.6 版本被引入,它集成了 RDD 和 DataFrame
的優點,具備強型別的特點,同時支援 Lambda 函式,但只能在 Scala 和 Java 語言中使用。三者可以互相轉換。
對文字檔案返回了 Dataset[String]
scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]
scala> text.show()
+--------------------+
| value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
| scala spark hive|
+--------------------+
#結構
scala> text.printSchema
root
|-- value: string (nullable = true)
#返回一個new session
scala> val nsp = spark.newSession
nsp: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c01b
#讀取json預設返回DF
scala> val user = nsp.read.json("/usr/local/bigdata/file/user.json")
user: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]
#定義一個樣例類,作為資料型別
scala> case class User(id :Long,username:String,email:String){}
defined class User
#加入資料型別,返回ds
scala> val userds = nsp.read.json("/usr/local/bigdata/file/user.json").as[User]
userds: org.apache.spark.sql.Dataset[User] = [email: string, id: bigint ... 1 more field]
內部結構
scala> userds.printSchema
root
|-- email: string (nullable = true)
|-- id: long (nullable = true)
|-- username: string (nullable = true)
互相轉換:
scala> val udf = user.rdd
udf: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at rdd at <console>:25
scala> userds.toDF
res3: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]
二、SQL、DSL
SparkSQL可以使用SQL程式設計,也可以基於特定語言,呼叫api進行程式設計。
1、SQL
使用SQL程式設計有兩步:
1、建立臨時檢視。(有表或檢視才能執行sql,spark可以建立臨時檢視)
2、編寫sql語句。
#建立檢視users
scala> userds.createOrReplaceTempView("users")
#編寫語句
scala> nsp.sql("select id,username from users order by id desc").show
+---+--------+
| id|username|
+---+--------+
| 3| Jim|
| 2| jack|
| 1| Bret|
+---+--------+
/*
createGlobalTempView createOrReplaceTempView crossJoin
createOrReplaceGlobalTempView createTempView
*/
如果在檔案程式中編寫,需要匯入以下隱式轉換。
import spark.implicits._ #spark是變數名
import org.apache.spark.sql.functions._
2、DSL
DSL是特定語言,如scala、java等。
呼叫api進行處理。
#scala DSL程式設計
scala> userds.select('id,'username).orderBy(desc("id")).show()
+---+--------+
| id|username|
+---+--------+
| 3| Jim|
| 2| jack|
| 1| Bret|
+---+--------+
使用Java進行程式設計。
final SparkSession spark = SparkSession.builder().master("local[3]").getOrCreate();
Dataset<Row> dataset = spark.read().json("spark/data/user3.json");
try{
dataset.createTempView("users");
//SQL
spark.sql("select id,username from users order by id desc").show();
}catch (Exception e){
e.printStackTrace();
}
//DSL
dataset.filter(dataset.col("id").geq(2)).orderBy(dataset.col("id").desc()).show();
spark.close();
三、UDF
UDF:Spark和Hive一樣,允許使用者自定義函式來補充功能。
SQL方式和DSL方式都支援。
1、SQL方式
#註冊函式
scala> nsp.udf.register("plus0",(id:String)=>0+id)
res11: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3815/2132917061@2c634771,StringType,List(Some(Schema(StringType,true))),Some(plus0),true,true)
scala> nsp.sql("select plus0(id),username from users order by id desc").show
+---------+--------+
|plus0(id)|username|
+---------+--------+
| 03| Jim|
| 02| jack|
| 01| Bret|
+---------+--------+
#註冊函式
scala> nsp.udf.register("plus1",(id:Int)=>1+id)
res13: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3857/1612958077@7799dc9a,IntegerType,List(Some(Schema(IntegerType,false))),Some(plus1),false,true)
scala> nsp.sql("select plus1(id),username from users order by id desc").show
+---------+--------+
|plus1(id)|username|
+---------+--------+
| 4| Jim|
| 3| jack|
| 2| Bret|
+---------+--------+
2、DSL方式
#定義函式
scala> val prefix_name =udf(
| (name:String)=>{ "user: "+name})
prefix_name: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3567/441398584@40f06edc,StringType,List(Some(Schema(StringType,true))),None,true,true)
#套入欄位名
scala> userds.select($"id",prefix_name('username).as("newname")).orderBy(desc("id")).show()
+---+----------+
| id| newname|
+---+----------+
| 3| user: Jim|
| 2|user: jack|
| 1|user: Bret|
+---+----------+
Java編寫
#接收引數名和一個函式介面
spark.udf().register("prefix_name", new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
return "user: "+s;
}
}, DataTypes.StringType);
try{
dataset.createTempView("users");
#和SQL一樣使用
spark.sql("select id,prefix_name(username) as new_name from users order by id desc").show();
}catch (Exception e){
e.printStackTrace();
}
+---+----------+
| id| new_name|
+---+----------+
| 3| user: Jim|
| 2|user: jack|
| 1|user: Bret|
+---+----------+
四、Spark on Hive
SparkSQL整合Hive,載入讀取Hive表資料進行分析,稱之為Spark on Hive;
Hive 框架底層分割槽引擎,可以將MapReduce改為Spark,稱之為Hive on Spark;
Spark on Hive 相當於Spark使用Hive的資料。
Hive on Spark相當於Hive把自身的計算從MR換成SparkRDD.
可以通過以下方式進行Spark on Hive.
1、Sparksql
spark-sql是spark提供的一種互動模式,使用sql語句進行處理,預設就是使用hive中的資料。
需要將mysql驅動放到jar目錄下,或者指定驅動。
spark-sql --master local[*]
#指定驅動
spark-sql --master local[*] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
2、在IDEA中使用spark on hive
新增spark操作hive的依賴。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
core-site.xml,hdfs-site.xml,hive-site.xml
都拷貝到Rsource
目錄下.
可以使用Spark 操作Hive資料了。
val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]")
val spark = SparkSession.builder().config(conf)
.enableHiveSupport()//!!!!預設不支援外部hive,這裡需呼叫方法支援外部hive
.getOrCreate()
import spark.implicits._
spark.sql("use spark_sql_hive")
spark.sql(
"""
|select t.id,t.name,t.email from
|(select cast('id' as INT) ,id ,name,email from user where id >1001) t
""".stripMargin).show()