Spark中讀寫mysql資料庫
阿新 • • 發佈:2019-02-19
Spark中讀寫MySQL資料庫
一.使用Intellij編寫Spark程式讀取MySQL資料庫
1.在windows系統中,安裝有mysql資料庫。主要情況如下:
mysql> show databases;
+------------------------+
| Database |
+------------------------+
| information_schema |
| dbgirl |
| mydatabase |
| mysql |
| performance_ schema |
| runoob |
| sakila |
| test |
+------------------------+
10 rows in set (0.09 sec)
mysql> use mydatabase;
Database changed
mysql> show tables;
+----------------------+
| Tables_in_mydatabase |
+----------------------+
| country |
| course |
| grade |
| myworld |
| score |
| student |
| teacher |
| view_grade |
+----------------------+
8 rows in set (0.00 sec)
2.現在準備讀取mydatabase.score中的資料
3.程式碼如下:
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.spark.{SparkConf, SparkContext}
object ReadFromMysql {
//define a function to get an Connection of mysql
def getConnection ():Connection={
//you should the database'name not the database and table' name
DriverManager.getConnection("jdbc:mysql://localhost:3306/mydatabase","root","")
}
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("ReadFromMysql").setMaster("local")
val sc = new SparkContext(conf)
val connection = getConnection()
val preparedStatement: PreparedStatement = connection.prepareStatement(
"select * from score" )
val result: ResultSet = preparedStatement.executeQuery()
println("sno\tcno\tdegree")
while(result.next()){
print(result.getString("sno")+" ")
print(result.getString("cno")+" ")
print(result.getString("degree")+" ")
println()
}
}
}
4.執行結果如下:
Intellij編寫Spark程式,往MySQL資料庫中寫入資料
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 1.insert data into table by programs
*/
object WriteToMysql {
//get a connection to mysql database
def getConnection():Connection={
DriverManager.getConnection("jdbc:mysql://localhost:3306/mydatabase","root","")
}
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("WriteToMysql").setMaster("local")
val sc = new SparkContext(conf)
//the data should be inserted into MySQL
//the database is named "mydatabase",the table's name is employee
//the database and table should be created in advance
val connection = getConnection()//invoke a function to get a connection
val prepareSta: PreparedStatement = connection.prepareStatement("insert into employee values('LittleLawson',22, 'Enmoster')," +
"('Ltt',22,'baidu'),('dinglei',40,'NetEase'),('Jack',52,'alibaba');");
}
}
- 執行結果如下:
並且語句執行帶來的改變row有如下行數: