Spark sql 讀寫資料庫
阿新 • • 發佈:2021-01-11
用JDBC連線MySql
- 啟動MySQL
輸入下面的語句完成資料庫和表的建立
create database spark;
use spark;
create table student(id int(4),name char(20),gender char(4),age int(4));
insert into student values(1,'Xueqian','F',23);
insert into student values(2,'Weiliang','M',24);
select * from student;
- 啟動spark-shell
val jdbcDF=spark.read.format("jdbc").
option("url","jdbc:mysql://master:3306/spark").
option("driver","com.mysql.jdbc.Driver").
option("dbtable","student").
option("user","root" ).
option("passjdword","01bk").
load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
- 向MySQL中寫入資料
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//兩個學生資訊
val studentRDD=spark.sparkContext
. parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27"))
.map((_.split(""))
//設定模式資訊,得到表頭
val schema = StructType(List(
StructField("id",IntegerType,true),
StructField("name", StringType, true),
StructField("gender", IntegerType, true),
StructField("age", IntegerType, true)
))
//建立Row物件,每個Row物件都是rowRDD中的一行
val rowRDD: RDD[Row] = StudentRDD
.map(x => Row(x(0).toInt, x(1).trim,x(2).trim,x(3).toInt))
//建立Row和模式間的對應關係
val studentDF = spark.createDataFrame(rowRDD,schema)
//下面建立一個prop變數用來儲存JDBC連線引數
val prop = new Properties()
prop.put("user", "root")//表示使用者名稱是root
prop.put("password","01bk")//表示密碼是hadoop
prop.put("driver","com.mysql.jdbc.Driver")//表示驅動程式是com.mysql.jdbc.Driver
Pro.put(“驅動程式”,“com.mysql.jdbc.Driver”)//表示驅動程式是com.mysql.jdbc.Driver
//下面就就可以連線資料庫,採用append模式,表示追加記錄到資料庫spark的student表
studentDF.write.mode("append").
jdbc("jdbc:mysql://master:3306/spark","spark.student",prop)