pyspark對Mysql資料庫進行讀寫的實現
pyspark是Spark對Python的api介面,可以在Python環境中通過呼叫pyspark模組來操作spark,完成大資料框架下的資料分析與挖掘。其中,資料的讀寫是基礎操作,pyspark的子模組pyspark.sql 可以完成大部分型別的資料讀寫。文字介紹在pyspark中讀寫Mysql資料庫。
1 軟體版本
在Python中使用Spark,需要安裝配置Spark,這裡跳過配置的過程,給出執行環境和相關程式版本資訊。
- win10 64bit
- java 13.0.1
- spark 3.0
- python 3.8
- pyspark 3.0
- pycharm 2019.3.4
2 環境配置
pyspark連線Mysql是通過java實現的,所以需要下載連線Mysql的jar包。
下載地址
選擇下載Connector/J
,然後選擇作業系統為Platform Independent
,下載壓縮包到本地。
然後解壓檔案,將其中的jar包mysql-connector-java-8.0.19.jar
放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars
。
環境配置完成!
3 讀取Mysql
指令碼如下:
from pyspark.sql import SQLContext,SparkSession if __name__ == '__main__': # spark 初始化 spark = SparkSession. \ Builder(). \ appName('sql'). \ master('local'). \ getOrCreate() # mysql 配置(需要修改) prop = {'user': 'xxx','password': 'xxx','driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 讀取表 data = spark.read.jdbc(url=url,table='tb_newCity',properties=prop) # 列印data資料型別 print(type(data)) # 展示資料 data.show() # 關閉spark會話 spark.stop()
- 注意點:
prop
引數需要根據實際情況修改,文中使用者名稱和密碼用xxx代替了,driver
引數也可以不需要;url
引數需要根據實際情況修改,格式為jdbc:mysql://主機:埠/資料庫
;- 通過呼叫方法
read.jdbc
進行讀取,返回的資料型別為spark DataFrame;
執行指令碼,輸出如下:
4 寫入Mysql
指令碼如下:
import pandas as pd from pyspark import SparkContext from pyspark.sql import SQLContext,Row if __name__ == '__main__': # spark 初始化 sc = SparkContext(master='local',appName='sql') spark = SQLContext(sc) # mysql 配置(需要修改) prop = {'user': 'xxx','driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 建立spark DataFrame # 方式1:list轉spark DataFrame l = [(1,12),(2,22)] # 建立並指定列名 list_df = spark.createDataFrame(l,schema=['id','value']) # 方式2:rdd轉spark DataFrame rdd = sc.parallelize(l) # rdd col_names = Row('id','value') # 列名 tmp = rdd.map(lambda x: col_names(*x)) # 設定列名 rdd_df = spark.createDataFrame(tmp) # 方式3:pandas dataFrame 轉spark DataFrame df = pd.DataFrame({'id': [1,2],'value': [12,22]}) pd_df = spark.createDataFrame(df) # 寫入資料庫 pd_df.write.jdbc(url=url,table='new',mode='append',properties=prop) # 關閉spark會話 sc.stop()
注意點:
prop
和url
引數同樣需要根據實際情況修改;
寫入資料庫要求的物件型別是spark DataFrame,提供了三種常見資料型別轉spark DataFrame的方法;
通過呼叫write.jdbc
方法進行寫入,其中的model
引數控制寫入資料的行為。
model | 引數解釋 |
---|---|
error | 預設值,原表存在則報錯 |
ignore | 原表存在,不報錯且不寫入資料 |
append | 新資料在原錶行末追加 |
overwrite | 覆蓋原表 |
5 常見報錯
Access denied for user …
原因:mysql配置引數出錯
解決辦法:檢查user,password拼寫,檢查賬號密碼是否正確,用其他工具測試mysql是否能正常連線,做對比檢查。
No suitable driver
原因:沒有配置執行環境
解決辦法:下載jar包進行配置,具體過程參考本文的2 環境配置。
到此這篇關於pyspark對Mysql資料庫進行讀寫的實現的文章就介紹到這了,更多相關pyspark Mysql讀寫內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!