Spark SQL 資料來源(三)
阿新 • • 發佈:2021-12-05
Spark SQL
可以從多種資料來源讀取資料,也可以將資料寫入多種資料來源,如:json、txt、hdfs、parquet、jdbc、hive
等
1. 通用讀取與儲存
讀取
// 方法一,若沒有指定檔案格式,則預設為 parquet,也可以通過修改 spark.sql.sources.default 來修改預設檔案格式 // 檔案格式:json, parquet, jdbc, orc, libsvm, csv, text spark.read.format(檔案格式).load(路徑) // 讀取 json 檔案 spark.read.format("json").load(路徑) // 方法二:讀取 json 檔案 spark.read.json(路徑)
儲存
// 方法一:儲存為 json 檔案
df.write.format("json").save(路徑)
// 方法二:指定儲存模式
df.write.mode(SaveMode.Overwrite).csv("/tmp/spark_output/zipcodes")
saveMode 模式
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists(default) | "error"(default) | 如果檔案已經存在則丟擲異常 |
SaveMode.Append | "append" | 如果檔案已經存在則追加 |
SaveMode.Overwrite |
"overwrite" | 如果檔案已經存在則覆蓋 |
SaveMode.Ignore | "ignore" | 如果檔案已經存在則忽略 |
也可以直接在檔案上直接進行 select
查詢:
// json表示檔案的格式. 後面的檔案具體路徑需要用反引號括起來
spark.sql("select * from json.`examples/src/main/resources/people.json`")
2. jdbc 讀取儲存
Spark SQL
也支援使用 JDBC
從其他的關係型資料庫中讀取資料,得到直接就是一個 df
df
回寫資料庫,依賴:
<!--jdbc 驅動-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
注意:在
spark-shell
操作jdbc
,需要把相關的jdbc
驅動copy
到jars
目錄下
2.1 讀取
import org.apache.spark.sql.SparkSession
import java.util.Properties
object ReadJdbc {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("read jdbc").master("local[2]").getOrCreate()
val sc = spark.sparkContext
val url = "jdbc:mysql://hadoop201:3306/databaseName"
val user = "root"
val pwd = "pwd"
val tableName = "user"
// 方法一
val df = spark.read.format("jdbc")
.option("url", url)
.option("user", user)
.option("password", pwd)
.option("dbtable", tableName)
.load()
// 方法二
val props: Properties = new Properties()
props.setProperty("user", user)
props.setProperty("password", pwd)
val df2 = spark.read.jdbc(url, tableName, props)
df.show()
sc.stop()
spark.stop()
}
}
2.2 儲存
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties
object WriteJdbc {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
val sc = spark.sparkContext
val df = spark.read.json("users.json")
val url = "jdbc:mysql://hadoop201:3306/databaseName"
val user = "root"
val pwd = "pwd"
val tableName = "user"
// 方法一
df.write
.format("jdbc")
.option("url", url)
.option("user", user)
.option("password", pwd)
.option("dbtable", tableName)
.mode("append")
.mode(SaveMode.Overwrite)
.save()
// 方法二
val props = new Properties()
props.put("user", user)
props.put("password", pwd)
df.write.jdbc(url, tableName, props)
sc.stop()
spark.stop()
}
}
3. hive 存取
Hive
是 Hadoop
上的 SQL
引擎,Spark SQL
編譯時可以包含 Hive
支援,也可以不包含
包含 Hive
支援的 Spark SQL
:支援 Hive
表訪問、UDF、Hive
查詢語言 HiveQL、HQL
等。若在 Spark SQL
中包含Hive
的庫,並不需要事先安裝 Hive
。一般來說,最好還是在編譯Spark SQL
時引入Hive
支援,這樣就可以使用這些特性了。如果你下載的是二進位制版本的 Spark
,它應該已經在編譯時添加了 Hive
支援
Spark SQL
可以連線到一個已經部署好的Hive
上,即外接Hive
,推薦此種做法, 但是需要將hive-sie.xml
配置檔案複製到$SPARK_HOME/conf
- 若沒有連線外接
Hive
,也可以使用Hive
,會在當前Spark SQL
工作目錄建立Hive
的元資料倉庫metastone_db
,即內嵌hive
,不推薦;建立的表會被儲存在預設的檔案中,如:/user/hive/warehouse
(若classpath
有配置hdfs-site.xml
則預設檔案系統為hdfs
,否則為本地)
內嵌 hive
一般不推薦使用,因此這裡討論的是外接 hive
3.1 準備工作
Spark
要接管Hive
需要把hive-site.xml copy
到spark/conf
目錄下- 將
MySQL
驅動copy
到spark/jars
目錄下 - 若無法訪問
hdfs
,還需copy core-site.xml、hdfs-site.xml
到hive/conf
目錄下
[hadoop@hadoop1 apps]$ cp hive/conf/hive-site.xml spark-2.2.0/conf/
[hadoop@hadoop1 apps]$ ls spark-2.2.0/conf/
docker.properties.template hive-site.xml metrics.properties.template spark-defaults.conf spark-env.sh
fairscheduler.xml.template log4j.properties.template slaves spark-defaults.conf.template
[hadoop@hadoop1 apps]$ cp hive/lib/mysql-connector-java-5.1.27-bin.jar spark-2.2.0/jars/
[hadoop@hadoop1 apps]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/14 22:47:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/14 22:48:13 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.131.137:4040
Spark context available as 'sc' (master = local[*], app id = local-1636901264338).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_261)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show
+------------+
|databaseName|
+------------+
| default|
| hive_1|
+------------+
scala> spark.sql("use hive_1").show
++
||
++
++
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| hive_1| student| false|
+--------+---------+-----------+