spark寫入mysql的幾種方法,針對不同場景
阿新 • • 發佈:2018-12-03
方法一:各個欄位都是提前定好的
val prop = new java.util.Properties prop.setProperty("user", "root") prop.setProperty("password", "123456") df1.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.1.97:3306/xiang_log", "nginx_code_phone", prop) df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.1.97:3306/xiang_log", "nginx_params_phone", prop)
方法二:欄位可自由添減
df.foreachPartition(p => { @transient val conn = ConnectionPool.getConnection p.foreach(x => { val sql = "insert into app_id(id,date,appid,num) values (" + "'"+UUID.randomUUID+"'," + "'"+x.getInt(0)+"'," + "'"+x.getString(1)+"'," + "'"+x.getLong(2)+"'" + ")" val stmt = conn.createStatement stmt.executeUpdate(sql) }) ConnectionPool.returnConnection(conn) })
資料庫連結池: package com.prince.spark.util; import java.sql.Connection; import java.sql.DriverManager; import java.util.LinkedList; public class ConnectionPool { private static LinkedList<Connection> connectionQueue; static { try { Class.forName("com.mysql.jdbc.Driver"); }catch (ClassNotFoundException e) { e.printStackTrace(); } } public synchronized static Connection getConnection() { try { if (connectionQueue == null) { connectionQueue = new LinkedList<Connection>(); for (int i = 0;i < 5;i ++) { Connection conn = DriverManager.getConnection( "jdbc:mysql://192.168.1.97:3306/xiang_log?characterEncoding=utf8", "root", "123456" ); connectionQueue.push(conn); } } }catch (Exception e) { e.printStackTrace(); } return connectionQueue.poll(); } public static void returnConnection(Connection conn) { connectionQueue.push(conn); } }
方法三:有時涉及到計算結果的寫入,還要組裝df
//組裝結果RDD
val arrayRDD = sc.parallelize(List ((num,log_date)))
//將結果RDD對映到rowRDD
val resultRowRDD = arrayRDD.map(p =>Row(
p._1.toInt,
p._2.toString,
new Timestamp(new java.util.Date().getTime)
))
//通過StructType直接指定每個欄位的schema
val resultSchema = StructType(
List(
StructField("verify_num", IntegerType, true),
StructField("log_date", StringType, true), //是哪一天日誌分析出來的結果
StructField("create_time", TimestampType, true) //分析結果的建立時間
)
)
//組裝新的DataFrame
val DF = spark.createDataFrame(resultRowRDD,resultSchema)
//將結果寫入到Mysql
DF.write.mode("append")
.format("jdbc")
.option("url","jdbc:mysql://192.168.1.97:3306/xiang_log")
.option("dbtable","verify") //表名
.option("user","root")
.option("password","123456")
.save()
--------------------- 本文來自 放開那個產品經理 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/qq_39869388/article/details/80423151?utm_source=copy