1. 程式人生 > 其它 >PySpark 入門:通過JDBC連線資料庫(DataFrame)

PySpark 入門:通過JDBC連線資料庫(DataFrame)

這裡以關係資料庫MySQL為例。首先,本部落格教程(Ubuntu 20.04 安裝MySQL 8.X),在Linux系統中安裝好MySQL資料庫。這裡假設你已經成功安裝了MySQL資料庫。下面我們要新建一個測試Spark程式的資料庫,資料庫名稱是“spark”,表的名稱是“student”

請執行下面命令在Linux中啟動MySQL資料庫,並完成資料庫和表的建立,以及樣例資料的錄入:

service mysql start
mysql -u root -p
# 螢幕會提示你輸入密碼

輸入密碼後,你就可以進入“mysql>”命令提示符狀態,然後就可以輸入下面的SQL語句完成資料庫和表的建立:

mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> alter table student change id id int auto_increment primary key;
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;

上面已經建立好了我們所需要的MySQL資料庫和表,下面我們編寫Spark應用程式連線MySQL資料庫並且讀寫資料。

Spark支援通過JDBC方式連線到其他資料庫獲取資料生成DataFrame。

首先,請進入Linux系統(本教程統一使用hadoop使用者名稱登入),開啟火狐(FireFox)瀏覽器,下載一個MySQL的JDBC驅動(下載)。

JDBC 驅動下載方法一:

解壓,把 mysql-connector-java-8.0.28.jar 貼上到 /usr/local/spark/jars 中,這樣便完成了驅動的匯入

JDBC 驅動下載方法二:

在火狐瀏覽器中下載時,一般預設儲存在hadoop使用者的當前工作目錄的“下載”目錄下,所以,可以開啟一個終端介面,輸入下面命令檢視:

cd ~
cd Downloads

就可以看到剛才下載到的MySQL的JDBC驅動程式,檔名稱為 mysql-connector-java-8.0.28.tar.gz(你下載的版本可能和這個不同)。現在,使用下面命令,把該驅動程式拷貝到 Spark 的安裝目錄下:

sudo tar -zxf ~/Downloads/mysql-connector-java-8.0.28.tar.gz -C /usr/local/spark/jars
cd /usr/local/spark/jars
ls

這時就可以在/usr/local/spark/jars目錄下看到這個驅動程式檔案所在的資料夾 mysql-connector-java-8.0.28,進入這個資料夾,就可以看到驅動程式檔案 mysql-connector-java-8.0.28.jar。
請輸入下面命令啟動已經安裝在Linux系統中的mysql資料庫(如果前面已經啟動了MySQL資料庫,這裡就不用重複啟動了)。

service mysql start

下面,我們要啟動一個pyspark,而且啟動的時候,要附加一些引數。啟動pyspark時,必須指定mysql連線驅動jar包。

cd /usr/local/spark
./bin/pyspark \
--jars /usr/local/spark/jars/mysql-connector-java-8.0.28.jar

上面的命令列中,在一行的末尾加入斜槓\,是為了告訴spark-shell,命令還沒有結束。

啟動進入pyspark以後,可以執行以下命令連線資料庫,讀取資料,並顯示:

jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "password").load()

下面我們再來看一下如何往MySQL中寫入資料。
為了看到MySQL資料庫在Spark程式執行前後發生的變化,我們先在Linux系統中新建一個終端,使用下面命令檢視一下MySQL資料庫中的資料庫spark中的表student的內容:

mysql>  use spark;
Database changed
 
mysql> select * from student;
//上面命令執行後返回下面結果
+------+----------+--------+------+
| id   | name     | gender | age  |
+------+----------+--------+------+
|    1 | Xueqian  | F      |   23 |
|    2 | Weiliang | M      |   24 |
+------+----------+--------+------+

現在我們開始在pyspark中編寫程式,往spark.student表中插入兩條記錄。
下面,我們要啟動一個pyspark,而且啟動的時候,要附加一些引數。啟動pyspark時,必須指定mysql連線驅動jar包(如果你前面已經採用下面方式啟動了pyspark,就不需要重複啟動了):

cd /usr/local/spark
./bin/pyspark \
--jars /usr/local/spark/jars/mysql-connector-java-8.0.28.jar

上面的命令列中,在一行的末尾加入斜槓\,是為了告訴spark-shell,命令還沒有結束。

啟動進入pyspark以後,可以執行以下命令連線資料庫,寫入資料,程式如下(你可以把下面程式一條條拷貝到pyspark中執行)

>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
//下面要設定模式資訊
>>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
>>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
//建立起Row物件和模式之間的對應關係,也就是把資料和模式對應起來
>>> studentDF = spark.createDataFrame(rowRDD, schema)
>>> prop = {}
>>> prop['user'] = 'root'
>>> prop['password'] = 'password'
>>> prop['driver'] = "com.mysql.cj.jdbc.Driver"
>>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

在pyspark中執行完上述程式後,我們可以看一下效果,看看MySQL資料庫中的spark.student表發生了什麼變化。請在剛才的另外一個視窗的MySQL命令提示符下面繼續輸入下面命令:

mysql> select * from student;
+------+-----------+--------+------+
| id   | name      | gender | age  |
+------+-----------+--------+------+
|    1 | Xueqian   | F      |   23 |
|    2 | Weiliang  | M      |   24 |
|    3 | Rongcheng | M      |   26 |
|    4 | Guanhua   | M      |   27 |
+------+-----------+--------+------+
4 rows in set (0.00 sec)