1. 程式人生 > >pyspark中dataframe讀寫資料庫

pyspark中dataframe讀寫資料庫

本文只討論spark藉助jdbc讀寫mysql資料庫

一,jdbc

想要spark能夠從mysql中獲取資料,我們首先需要一個連線mysql的jar包,mysql-connector-java-5.1.40-bin.jar

將jar包放入虛擬機器中合適的位置,比如我放置在/home/sxw/Documents路徑下,並在spark的 spark-env.sh 檔案中加入:

export SPARK_CLASSPATH=/home/sxw/Documents/mysql-connector-java-5.1.40-bin.jar

 

二,讀取示例程式碼

df = spark.read.format('jdbc').options(
    url='jdbc:mysql://127.0.0.1',
    dbtable='mysql.db',
    user='root',
    password='123456' 
    ).load()
df.show()

# 也可以傳入SQL語句

sql="(select * from mysql.db where db='wp230') t"
df = spark.read.format('jdbc').options(
    url='jdbc:mysql://127.0.0.1',
    dbtable=sql,
    user='root',
    password='123456' 
    ).load()
df.show()
--------------------- 
作者:振裕 
來源:CSDN 

三,寫入示例程式碼

# 開啟動態分割槽
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")

# 使用普通的hive-sql寫入分割槽表
spark.sql("""
    insert overwrite table ai.da_aipurchase_dailysale_hive 
    partition (saledate) 
    select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate 
    from szy_aipurchase_tmp_szy_dailysale distribute by saledate
    """)

# 或者使用每次重建分割槽表的方式
jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')

# 不寫分割槽表,只是簡單的匯入到hive表
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)
--------------------- 
作者:振裕 
來源:CSDN 
原文:https://blog.csdn.net/suzyu12345/article/details/79673473 

 

四,其他

import os
os.environ["JAVA_HOME"] = r"D:\Program Files\Java\jdk1.8.0_131"

from pyspark.sql import SparkSession, SQLContext, DataFrame
from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
appname = "demo"
sparkmaster = "local"
spark = SparkSession.builder.appName(appname).master(sparkmaster).getOrCreate()
sc = spark.sparkContext

sqlContext = SQLContext(sc)

spark中實際是DataFrameReader, DataFrameWriter來實現讀寫dataframe資料操作。

df = sqlContext.read.format("jdbc").options(url, driver, dbtable).load()

df_reader = DataFrameReadre(sqlContext)

df = df_reader.format("jdbc").options().load()

df = df_reader.jdbc(url, table, porperties)