1. 程式人生 > 其它 >Spark sql 讀寫資料庫

Spark sql 讀寫資料庫

技術標籤:資料庫scalasparkjdbc

用JDBC連線MySql

  • 啟動MySQL
    輸入下面的語句完成資料庫和表的建立
create database spark;
use spark;
create table student(id int(4),name char(20),gender char(4),age int(4));
insert into student values(1,'Xueqian','F',23);
insert into student values(2,'Weiliang','M',24);
select * from student;

在這裡插入圖片描述

  • 啟動spark-shell
    指定MySql連線驅動jar包
 val jdbcDF=spark.read.format("jdbc").
      option("url","jdbc:mysql://master:3306/spark").
      option("driver","com.mysql.jdbc.Driver").
      option("dbtable","student").
      option("user","root"
). option("passjdword","01bk"). load() jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

在這裡插入圖片描述

  • 向MySQL中寫入資料
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

//兩個學生資訊
val studentRDD=spark.sparkContext
.
parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")) .map((_.split("")) //設定模式資訊,得到表頭 val schema = StructType(List( StructField("id",IntegerType,true), StructField("name", StringType, true), StructField("gender", IntegerType, true), StructField("age", IntegerType, true) )) //建立Row物件,每個Row物件都是rowRDD中的一行 val rowRDD: RDD[Row] = StudentRDD .map(x => Row(x(0).toInt, x(1).trim,x(2).trim,x(3).toInt)) //建立Row和模式間的對應關係 val studentDF = spark.createDataFrame(rowRDD,schema) //下面建立一個prop變數用來儲存JDBC連線引數 val prop = new Properties() prop.put("user", "root")//表示使用者名稱是root prop.put("password""01bk")//表示密碼是hadoop prop.put("driver","com.mysql.jdbc.Driver")//表示驅動程式是com.mysql.jdbc.Driver Pro.put(“驅動程式”,“com.mysql.jdbc.Driver”)//表示驅動程式是com.mysql.jdbc.Driver //下面就就可以連線資料庫,採用append模式,表示追加記錄到資料庫spark的student表 studentDF.write.mode("append"). jdbc("jdbc:mysql://master:3306/spark","spark.student",prop)