1. 程式人生 > >sparksql 連線讀取MySQL資料庫

sparksql 連線讀取MySQL資料庫

1. 從oracle 官方網站  https://dev.mysql.com/downloads/connector/j/ 下載mysql-connector 驅動,一般是一個rpm包。
2.  部署mysql-connector 驅動
    在spark中使用此驅動連線mysql資料庫時,不要在spark 叢集上使用rpm  -ivh 方式安裝,因為你會發現這樣安裝後找不到很多部署文件中提到的mysql-connector的jar包。其實只需要在windows中通過解壓程式對rpm包解壓即可(可能的話要多解壓幾層),在其中的cpio壓縮檔案中包含目錄,其中的java目錄中包含此jar包。
   將此jar包傳到spark叢集伺服器上,一般可以放在SPAKR_HOME/jars目錄中,然後設定CLASSPATH 環境變數(可參考mysql-connector的官方安裝指導)
   export CLASSPATH=/path/mysql-connector-***.jar:$CLASSPATH3. 程式碼連線mysql資料庫  from pyspark.sql import SparkSessiontry:
    sc.stop()
except:
    passspk = SparkSession.builder.master("spark://192.168.12.7:7077").appName("spark1").getOrCreate()
print(spk)
sc11=spk.sparkContext
print(sc11)jdbcdf = spk.read.format('jdbc').option('url',"jdbc:mysql://192.168.8.61:3306/sparkdb?user=spark&password=passwrod")\
.option('dbtable','class').load()
print(jdbcdf)
#jdbcdf.select('id','name','score').write.format('parquet').save("src/main/resources/sparkdb")
#jdbcdf.select('id','name','score').write.format('json').save("src/main/resources/sparkdb")
jdbcdf.select('id','name','score').write.saveAsTable("sparktb3")
jdbcsql=spk.sql("select * from sparktb3")
jdbcsql.show()4. 儲存資料到mysql資料庫dforacle.write.format('jdbc').mode("overwrite").options(
    url="jdbc:mysql://192.168.8.61:3306/sparkdb",
    dbtable='tablename',
    user='spark',
    password='password').save()dfmysql=spk.read.format('jdbc').options(
    url="jdbc:mysql://192.168.8.61:3306/sparkdb",
    dbtable='tablename',
    user='spark',
    password='password').load()dfmysql.show(3)dfmysql.createOrReplaceTempView('fy_ls')
spk.sql("select * from fy_ls").show(30)