1. 程式人生 > 其它 >Spark SQL 資料來源(三)

Spark SQL 資料來源(三)

Spark SQL 可以從多種資料來源讀取資料,也可以將資料寫入多種資料來源,如:json、txt、hdfs、parquet、jdbc、hive

1. 通用讀取與儲存

讀取

// 方法一,若沒有指定檔案格式,則預設為 parquet,也可以通過修改 spark.sql.sources.default 來修改預設檔案格式
// 檔案格式:json, parquet, jdbc, orc, libsvm, csv, text
spark.read.format(檔案格式).load(路徑)

// 讀取 json 檔案
spark.read.format("json").load(路徑)

// 方法二:讀取 json 檔案
spark.read.json(路徑)

儲存

// 方法一:儲存為 json 檔案
df.write.format("json").save(路徑)

// 方法二:指定儲存模式
df.write.mode(SaveMode.Overwrite).csv("/tmp/spark_output/zipcodes")

saveMode 模式

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error"(default) 如果檔案已經存在則丟擲異常
SaveMode.Append "append" 如果檔案已經存在則追加
SaveMode.Overwrite
"overwrite" 如果檔案已經存在則覆蓋
SaveMode.Ignore "ignore" 如果檔案已經存在則忽略

也可以直接在檔案上直接進行 select 查詢:

// json表示檔案的格式. 後面的檔案具體路徑需要用反引號括起來
spark.sql("select * from json.`examples/src/main/resources/people.json`")

參考文章:Spark Read CSV file into DataFrame

2. jdbc 讀取儲存

Spark SQL 也支援使用 JDBC 從其他的關係型資料庫中讀取資料,得到直接就是一個 df

,也支援將 df 回寫資料庫,依賴:

<!--jdbc 驅動-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

注意:在 spark-shell 操作 jdbc,需要把相關的 jdbc 驅動 copyjars 目錄下

2.1 讀取

import org.apache.spark.sql.SparkSession
import java.util.Properties

object ReadJdbc {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("read jdbc").master("local[2]").getOrCreate()
    val sc = spark.sparkContext


    val url = "jdbc:mysql://hadoop201:3306/databaseName"
    val user = "root"
    val pwd = "pwd"
    val tableName = "user"

    // 方法一
    val df = spark.read.format("jdbc")
      .option("url", url)
      .option("user", user)
      .option("password", pwd)
      .option("dbtable", tableName)
      .load()

    // 方法二
    val props: Properties = new Properties()
    props.setProperty("user", user)
    props.setProperty("password", pwd)
    val df2 = spark.read.jdbc(url, tableName, props)

    df.show()
    sc.stop()
    spark.stop()
  }
}

2.2 儲存

import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties

object WriteJdbc {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
    val sc = spark.sparkContext

    val df = spark.read.json("users.json")
    val url = "jdbc:mysql://hadoop201:3306/databaseName"
    val user = "root"
    val pwd = "pwd"
    val tableName = "user"

    // 方法一
    df.write
      .format("jdbc")
      .option("url", url)
      .option("user", user)
      .option("password", pwd)
      .option("dbtable", tableName)
      .mode("append")
      .mode(SaveMode.Overwrite)
      .save()

    // 方法二
    val props = new Properties()
    props.put("user", user)
    props.put("password", pwd)
    df.write.jdbc(url, tableName, props)

    sc.stop()
    spark.stop()
  }
}

3. hive 存取

HiveHadoop 上的 SQL 引擎,Spark SQL編譯時可以包含 Hive 支援,也可以不包含

包含 Hive 支援的 Spark SQL:支援 Hive 表訪問、UDF、Hive 查詢語言 HiveQL、HQL 等。若在 Spark SQL 中包含Hive 的庫,並不需要事先安裝 Hive。一般來說,最好還是在編譯Spark SQL時引入Hive支援,這樣就可以使用這些特性了。如果你下載的是二進位制版本的 Spark,它應該已經在編譯時添加了 Hive 支援

  • Spark SQL 可以連線到一個已經部署好的 Hive 上,即外接 Hive,推薦此種做法, 但是需要將 hive-sie.xml 配置檔案複製到 $SPARK_HOME/conf
  • 若沒有連線外接 Hive,也可以使用 Hive,會在當前 Spark SQL 工作目錄建立 Hive 的元資料倉庫 metastone_db,即內嵌 hive,不推薦;建立的表會被儲存在預設的檔案中,如:/user/hive/warehouse(若 classpath 有配置 hdfs-site.xml 則預設檔案系統為 hdfs,否則為本地)

內嵌 hive 一般不推薦使用,因此這裡討論的是外接 hive

3.1 準備工作

  • Spark 要接管 Hive 需要把 hive-site.xml copyspark/conf 目錄下
  • MySQL 驅動 copyspark/jars 目錄下
  • 若無法訪問 hdfs,還需 copy core-site.xml、hdfs-site.xmlhive/conf 目錄下

[hadoop@hadoop1 apps]$ cp hive/conf/hive-site.xml spark-2.2.0/conf/
[hadoop@hadoop1 apps]$ ls spark-2.2.0/conf/
docker.properties.template  hive-site.xml              metrics.properties.template  spark-defaults.conf           spark-env.sh
fairscheduler.xml.template  log4j.properties.template  slaves                       spark-defaults.conf.template
[hadoop@hadoop1 apps]$ cp hive/lib/mysql-connector-java-5.1.27-bin.jar spark-2.2.0/jars/
[hadoop@hadoop1 apps]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/14 22:47:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/14 22:48:13 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.131.137:4040
Spark context available as 'sc' (master = local[*], app id = local-1636901264338).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_261)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("show databases").show
+------------+
|databaseName|
+------------+
|     default|
|      hive_1|
+------------+


scala> spark.sql("use hive_1").show
++
||
++
++


scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|  hive_1|  student|      false|
+--------+---------+-----------+