pyspark中dataframe讀寫資料庫
阿新 • • 發佈:2018-11-17
本文只討論spark藉助jdbc讀寫mysql資料庫
一,jdbc
想要spark能夠從mysql中獲取資料,我們首先需要一個連線mysql的jar包,mysql-connector-java-5.1.40-bin.jar
將jar包放入虛擬機器中合適的位置,比如我放置在/home/sxw/Documents路徑下,並在spark的 spark-env.sh 檔案中加入:
export SPARK_CLASSPATH=/home/sxw/Documents/mysql-connector-java-5.1.40-bin.jar
二,讀取示例程式碼
df = spark.read.format('jdbc').options( url='jdbc:mysql://127.0.0.1', dbtable='mysql.db', user='root', password='123456' ).load() df.show() # 也可以傳入SQL語句 sql="(select * from mysql.db where db='wp230') t" df = spark.read.format('jdbc').options( url='jdbc:mysql://127.0.0.1', dbtable=sql, user='root', password='123456' ).load() df.show() --------------------- 作者:振裕 來源:CSDN
三,寫入示例程式碼
# 開啟動態分割槽 spark.sql("set hive.exec.dynamic.partition.mode = nonstrict") spark.sql("set hive.exec.dynamic.partition=true") # 使用普通的hive-sql寫入分割槽表 spark.sql(""" insert overwrite table ai.da_aipurchase_dailysale_hive partition (saledate) select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate from szy_aipurchase_tmp_szy_dailysale distribute by saledate """) # 或者使用每次重建分割槽表的方式 jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive") jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate') # 不寫分割槽表,只是簡單的匯入到hive表 jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None) --------------------- 作者:振裕 來源:CSDN 原文:https://blog.csdn.net/suzyu12345/article/details/79673473
四,其他
import os os.environ["JAVA_HOME"] = r"D:\Program Files\Java\jdk1.8.0_131" from pyspark.sql import SparkSession, SQLContext, DataFrame from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter appname = "demo" sparkmaster = "local" spark = SparkSession.builder.appName(appname).master(sparkmaster).getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc)
spark中實際是DataFrameReader, DataFrameWriter來實現讀寫dataframe資料操作。
df = sqlContext.read.format("jdbc").options(url, driver, dbtable).load()
df_reader = DataFrameReadre(sqlContext)
df = df_reader.format("jdbc").options().load()
df = df_reader.jdbc(url, table, porperties)