1. 程式人生 > 資料庫 >pyspark對Mysql資料庫進行讀寫的實現

pyspark對Mysql資料庫進行讀寫的實現

pyspark是Spark對Python的api介面,可以在Python環境中通過呼叫pyspark模組來操作spark,完成大資料框架下的資料分析與挖掘。其中,資料的讀寫是基礎操作,pyspark的子模組pyspark.sql 可以完成大部分型別的資料讀寫。文字介紹在pyspark中讀寫Mysql資料庫。

1 軟體版本

在Python中使用Spark,需要安裝配置Spark,這裡跳過配置的過程,給出執行環境和相關程式版本資訊。

  • win10 64bit
  • java 13.0.1
  • spark 3.0
  • python 3.8
  • pyspark 3.0
  • pycharm 2019.3.4

2 環境配置

pyspark連線Mysql是通過java實現的,所以需要下載連線Mysql的jar包。

下載地址

在這裡插入圖片描述

選擇下載Connector/J,然後選擇作業系統為Platform Independent,下載壓縮包到本地。

在這裡插入圖片描述

然後解壓檔案,將其中的jar包mysql-connector-java-8.0.19.jar放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars

在這裡插入圖片描述

環境配置完成!

3 讀取Mysql

指令碼如下:

from pyspark.sql import SQLContext,SparkSession

if __name__ == '__main__':
  # spark 初始化
  spark = SparkSession. \
    Builder(). \
    appName('sql'). \
    master('local'). \
    getOrCreate()
  # mysql 配置(需要修改)
  prop = {'user': 'xxx','password': 'xxx','driver': 'com.mysql.cj.jdbc.Driver'}
  # database 地址(需要修改)
  url = 'jdbc:mysql://host:port/database'
  # 讀取表
  data = spark.read.jdbc(url=url,table='tb_newCity',properties=prop)
  # 列印data資料型別
  print(type(data))
  # 展示資料
  data.show()
  # 關閉spark會話
  spark.stop()
  • 注意點:
  • prop引數需要根據實際情況修改,文中使用者名稱和密碼用xxx代替了,driver引數也可以不需要;
  • url引數需要根據實際情況修改,格式為jdbc:mysql://主機:埠/資料庫
  • 通過呼叫方法read.jdbc進行讀取,返回的資料型別為spark DataFrame;

執行指令碼,輸出如下:

在這裡插入圖片描述

4 寫入Mysql

指令碼如下:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row

if __name__ == '__main__':
  # spark 初始化
  sc = SparkContext(master='local',appName='sql')
  spark = SQLContext(sc)
  # mysql 配置(需要修改)
  prop = {'user': 'xxx','driver': 'com.mysql.cj.jdbc.Driver'}
  # database 地址(需要修改)
  url = 'jdbc:mysql://host:port/database'

  # 建立spark DataFrame
  # 方式1:list轉spark DataFrame
  l = [(1,12),(2,22)]
  # 建立並指定列名
  list_df = spark.createDataFrame(l,schema=['id','value']) 
  
  # 方式2:rdd轉spark DataFrame
  rdd = sc.parallelize(l) # rdd
  col_names = Row('id','value') # 列名
  tmp = rdd.map(lambda x: col_names(*x)) # 設定列名
  rdd_df = spark.createDataFrame(tmp) 
  
  # 方式3:pandas dataFrame 轉spark DataFrame
  df = pd.DataFrame({'id': [1,2],'value': [12,22]})
  pd_df = spark.createDataFrame(df)

  # 寫入資料庫
  pd_df.write.jdbc(url=url,table='new',mode='append',properties=prop)
  # 關閉spark會話
  sc.stop()

注意點:

propurl引數同樣需要根據實際情況修改;

寫入資料庫要求的物件型別是spark DataFrame,提供了三種常見資料型別轉spark DataFrame的方法;

通過呼叫write.jdbc方法進行寫入,其中的model引數控制寫入資料的行為。

model 引數解釋
error 預設值,原表存在則報錯
ignore 原表存在,不報錯且不寫入資料
append 新資料在原錶行末追加
overwrite 覆蓋原表

5 常見報錯

Access denied for user …

在這裡插入圖片描述

原因:mysql配置引數出錯
解決辦法:檢查user,password拼寫,檢查賬號密碼是否正確,用其他工具測試mysql是否能正常連線,做對比檢查。

No suitable driver

pyspark對Mysql資料庫進行讀寫的實現

原因:沒有配置執行環境
解決辦法:下載jar包進行配置,具體過程參考本文的2 環境配置

到此這篇關於pyspark對Mysql資料庫進行讀寫的實現的文章就介紹到這了,更多相關pyspark Mysql讀寫內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!