Spark Sql 連線mysql
阿新 • • 發佈:2019-02-09
1、基本概念和用法(摘自spark官方文件中文版)
Spark SQL 還有一個能夠使用 JDBC 從其他資料庫讀取資料的資料來源。當使用 JDBC 訪問其它資料庫時,應該首選 JdbcRDD。這是因為結果是以資料框(DataFrame)返回的,且這樣 Spark SQL操作輕鬆或便於連線其它資料來源。因為這種 JDBC 資料來源不需要使用者提供 ClassTag,所以它也更適合使用 Java 或 Python 操作。(注意,這與允許其它應用使用 Spark SQL 執行查詢操作的 Spark SQL JDBC 伺服器是不同的)。
使用 JDBC 訪問特定資料庫時,需要在 spark classpath 上新增對應的 JDBC 驅動配置。例如,為了從 Spark Shell 連線 postgres,你需要執行如下命令 :
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
通過呼叫資料來源API,遠端資料庫的表可以被載入為DataFrame 或Spark SQL臨時表。支援的引數有 :
屬性名 | 含義 |
---|---|
url | 要連線的 JDBC URL。 |
dbtable | 要讀取的 JDBC 表。 注意,一個 SQL 查詢的 From 分語句中的任何有效表都能被使用。例如,既可以是完整表名,也可以是括號括起來的子查詢語句。 |
driver | 用於連線 URL 的 JDBC 驅動的類名。 |
partitionColumn, lowerBound, upperBound, numPartitions | 這幾個選項,若有一個被配置,則必須全部配置。它們描述了當從多個 worker 中並行的讀取表時,如何對它分割槽。partitionColumn 必須時所查詢表的一個數值欄位。注意,lowerBound 和 upperBound 都只是用於決定分割槽跨度的,而不是過濾表中的行。因此,表中的所有行將被分割槽並返回。 |
fetchSize | JDBC fetch size,決定每次讀取多少行資料。 預設將它設為較小值(如,Oracle上設為 10)有助於 JDBC 驅動上的效能優化。 |
2、scala程式碼實現連線mysql
2.1 新增mysql 依賴
在sbt 配置檔案裡新增:
"mysql" % "mysql-connector-java" % "6.0.6"
然後執行:
sbt eclipse
2.2 建表並初始化資料
DROP TABLE IF EXISTS `USER_T`;
CREATE TABLE `USER_T` (
`ID` INT(11) NOT NULL,
`USER_NAME` VARCHAR(40) NOT NULL,
PRIMARY KEY (`ID`)
) ENGINE=INNODB DEFAULT CHARSET=UTF8;
INSERT INTO `USER_T`(`ID`,`USER_NAME`) VALUES (1,'測試1');
INSERT INTO `USER_T`(`ID`,`USER_NAME`) VALUES (2,'測試2');
2.3 程式碼
2.3.1 查詢
package com.dkl.leanring.spark.sql
import org.apache.spark.sql.SparkSession
/**
* spark查詢mysql測試
*/
object MysqlQueryDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("MysqlQueryDemo").master("local").getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8")
.option("dbtable", "USER_T")
.option("user", "root")
.option("password", "Root-123456")
.load()
jdbcDF.show()
}
}
2.3.2 插入資料
package com.dkl.leanring.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import java.util.Properties
/**
* 從USER_T.csv讀取資料並插入的mysql表中
*/
object MysqlInsertDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("MysqlInsertDemo").master("local").getOrCreate()
val df = spark.read.option("header", "true").csv("src/main/resources/scala/USER_T.csv")
df.show()
val url = "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8"
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "Root-123456")
df.write.mode(SaveMode.Append).jdbc(url, "USER_T", prop)
}
}
再查詢一次,就會發現表裡多了幾條資料
3、注意
上面的程式碼在本地eclipse執行是沒有問題的,如果放在伺服器上用spark-submit提交的話,可能會報異常
java.sql.SQLException:No suitable driver
解決方法是在程式碼裡新增
mysql:
.option("driver", "com.mysql.jdbc.Driver")
oracle:
.option("driver", "oracle.jdbc.driver.OracleDriver")