大資料學習之路107-spark streaming基於mysql歷史state統計
阿新 • • 發佈:2018-12-16
package com.test.sparkStreaming import java.sql.{DriverManager, PreparedStatement} import com.typesafe.config.{Config, ConfigFactory} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} object MyNetWorkWordCountMysqlState { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) //載入配置檔案,會去載入resources下面的配置檔案, // 預設規則:application.conf -> application.json -> application.properties val config: Config = ConfigFactory.load() //建立Streamingcontext物件 val conf = new SparkConf().setAppName("MyNetWorkWordCountMysqlState").setMaster("local[2]") //定義一個取樣時間,每隔2秒鐘採集一次資料,這個時間不能隨意設定 val ssc: StreamingContext = new StreamingContext(conf,Seconds(2)) //建立一個離散流 val lines = ssc.socketTextStream("marshal",5678) /** * 插入當前批次計算結果 * foreachRDD在Driver端執行 * foreachPartition,foreach在worker端執行 */ lines.foreachRDD( rdd =>{ //計算當前批次結果 val current_result: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //插入當前批次計算出來的結果 current_result.foreachPartition( partition => { //建立一個連線 val url = config.getString("db.url") val user = config.getString("db.user") val password = config.getString("db.password") val conn = DriverManager.getConnection(url,user,password) //將當前分割槽裡面的所有資料都插入到mysql資料庫中 partition.foreach( tp =>{ val word = tp._1 //判斷即將插入的資料是否之前已經插入過,如果已經插入過,則進行更新操作,否則就是插入 val pst = conn.prepareStatement("select * from wordcount where words=?") pst.setString(1,word) val rs = pst.executeQuery() var flag = false while(rs.next()){ flag = true //即將插入的單詞已經存在,可以進行更新操作 println("已經存在") val i: Int = rs.getInt("total") val i2 = i + tp._2 //更新 val update = conn.prepareStatement("update wordcount set total = ? where words = ?") update.setInt(1,i2) update.setString(2,word) update.executeUpdate() update.close() } if(!flag){ println("單詞不存在,需要插入") //插入一條資料 val pst: PreparedStatement = conn.prepareStatement("insert into wordcount values(?,?)") pst.setString(1,tp._1) pst.setInt(2,tp._2) pst.executeUpdate() pst.close() } }) if (conn != null) conn.close() }) }) ssc.start() ssc.awaitTermination() } }
執行結果: