Spark Streaming之使用Spark Streaming完成詞頻統計,並將結果寫入到MySQL資料庫中
阿新 • • 發佈:2019-02-20
package com.yys.spark.project
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成詞頻統計,並將結果寫入到MySQL資料庫中
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("spark01", 9999)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//result.print() //此處僅僅是將統計結果輸出到控制檯
//TODO... 將結果寫入到MySQL
// result.foreachRDD(rdd =>{
// val connection = createConnection() // executed at the driver
// rdd.foreach { record =>
// val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"
// connection.createStatement().execute(sql)
// }
// })
result.print()
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 獲取MySQL的連線
*/
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.20.128:3306/yys_spark", "root", "Root-123")
}
}
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成詞頻統計,並將結果寫入到MySQL資料庫中
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("spark01", 9999)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//result.print() //此處僅僅是將統計結果輸出到控制檯
//TODO... 將結果寫入到MySQL
// result.foreachRDD(rdd =>{
// val connection = createConnection() // executed at the driver
// rdd.foreach { record =>
// val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"
// connection.createStatement().execute(sql)
// }
// })
result.print()
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 獲取MySQL的連線
*/
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.20.128:3306/yys_spark", "root", "Root-123")
}
}