spark streaming 寫入db,hdfs
阿新 • • 發佈:2019-02-13
package main.java
import java.sql.Connection
import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory
/**
* Created on 2016-3-15.
*/
object ConnectionPool {
val logger = LoggerFactory.getLogger(this.getClass)
private val connectionPool = {
try{
Class.forName("oracle.jdbc.driver.OracleDriver" )//com.mysql.jdbc.Driver
val config = new BoneCPConfig()
config.setJdbcUrl("jdbc:oracle:thin:@xxx:1521:fzkfs")//jdbc:mysql://xxx:3306/test
config.setUsername("xxx")
config.setPassword("xxx")
config.setLazyInit(true)
config.setMinConnectionsPerPartition(3)
config.setMaxConnectionsPerPartition(5 )
config.setPartitionCount(5)
config.setCloseConnectionWatch(true)
config.setLogStatementsEnabled(false)
Some(new BoneCP(config))
} catch {
case exception:Exception=>
logger.warn("Error in creation of connection pool"+exception.printStackTrace())
None
}
}
def getConnection:Option[Connection] ={
connectionPool match {
case Some(connPool) => Some(connPool.getConnection)
case None => None
}
}
def closeConnection(connection:Connection): Unit = {
if(!connection.isClosed) {
connection.close()
}
}
}
package main.java
import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
/**
* Created on 2016-3-15.
*/
object StreamingToDB {
val logger = LoggerFactory.getLogger(this.getClass)
//PropertyConfigurator.configure("log4j.properties")
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: StreamingToDB <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val sql ="insert into SPATK_TEST(time,context) values(?,?)"
val Array(zkQuorum, group, topics, numThreads) = args
//log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor=FATAL
val sparkConf = new SparkConf().set("log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor","FATAL").setAppName("ErrorLogtoDB").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
val totalcounts = sc.accumulator(0L,"Total count")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val rowrdd = lines.filter(_.contains("ERROR")).map[(Date, String)](eventRecord => {
//[2016-03-03 10:54:33 ERROR] {DevAppWebDaoImpl.java:317}-For input string: "sd"
val time = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").parse(eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']')).trim)
logger.info("time:====="+time+"-------eventRecord:==="+eventRecord)
System.out.println("time:====="+time+"-------eventRecord:==="+eventRecord)
(time, eventRecord.toString)
})
rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){
rdd.foreachPartition(
res =>{
if(!res.isEmpty){
val connection = ConnectionPool.getConnection.getOrElse(null)
res.foreach(r=>{
insertIntoMySQL(connection, sql,r._1,r._2)
logger.info("finish time:====="+r._1+"-------eventRecord:==="+r._2)
}
)
ConnectionPool.closeConnection(connection)
}
}
)
})
//寫入hdfs
// rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){
// val out = rdd.map(_._1).first().toString
// rdd.map(_._2).saveAsTextFile("/user/root/hive/jboss/"+out)
ssc.start()
ssc.awaitTermination()
}
def insertIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={
// println(data.toString)
try {
val ps = con.prepareStatement(sql)
ps.setString(1, ftime.toString)
ps.setString(2, fcotext)
ps.executeUpdate()
ps.close()
}catch{
case exception:Exception=>
logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------")
}
}
def selectIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={
// println(data.toString)
try {
val sql ="select * from SPATK_TEST t"
val ps = con.prepareStatement(sql)
val res = ps.executeQuery()
while(res.next()){ //ѭ
System.out.println(""+res.getString(1));
}
ps.close()
}catch{
case exception:Exception=>
logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------")
}
}
}