1. 程式人生 > >spark streaming 寫入db,hdfs

spark streaming 寫入db,hdfs

package main.java

import java.sql.Connection

import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory


/**
 * Created  on 2016-3-15.
 */
object ConnectionPool {
  val logger = LoggerFactory.getLogger(this.getClass)
  private val connectionPool = {
    try{
      Class.forName("oracle.jdbc.driver.OracleDriver"
)//com.mysql.jdbc.Driver val config = new BoneCPConfig() config.setJdbcUrl("jdbc:oracle:thin:@xxx:1521:fzkfs")//jdbc:mysql://xxx:3306/test config.setUsername("xxx") config.setPassword("xxx") config.setLazyInit(true) config.setMinConnectionsPerPartition(3) config.setMaxConnectionsPerPartition(5
) config.setPartitionCount(5) config.setCloseConnectionWatch(true) config.setLogStatementsEnabled(false) Some(new BoneCP(config)) } catch { case exception:Exception=> logger.warn("Error in creation of connection pool"+exception.printStackTrace()) None } } def
getConnection:Option[Connection] ={ connectionPool match { case Some(connPool) => Some(connPool.getConnection) case None => None } } def closeConnection(connection:Connection): Unit = { if(!connection.isClosed) { connection.close() } } }
package main.java

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

/**
 * Created on 2016-3-15.
 */
object StreamingToDB {
  val logger = LoggerFactory.getLogger(this.getClass)
  //PropertyConfigurator.configure("log4j.properties")
  def main(args: Array[String]) {

    if (args.length < 4) {
      System.err.println("Usage: StreamingToDB <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }


    val sql ="insert into SPATK_TEST(time,context) values(?,?)"

    val Array(zkQuorum, group, topics, numThreads) = args
    //log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor=FATAL
    val sparkConf = new SparkConf().set("log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor","FATAL").setAppName("ErrorLogtoDB").setMaster("local[2]")
    val sc =  new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val totalcounts = sc.accumulator(0L,"Total count")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val rowrdd = lines.filter(_.contains("ERROR")).map[(Date, String)](eventRecord => {
    //[2016-03-03 10:54:33 ERROR] {DevAppWebDaoImpl.java:317}-For input string: "sd"
     val time = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").parse(eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']')).trim)
      logger.info("time:====="+time+"-------eventRecord:==="+eventRecord)
      System.out.println("time:====="+time+"-------eventRecord:==="+eventRecord)
      (time, eventRecord.toString)
    })



    rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){
      rdd.foreachPartition(
        res =>{
          if(!res.isEmpty){
            val connection = ConnectionPool.getConnection.getOrElse(null)
            res.foreach(r=>{
              insertIntoMySQL(connection, sql,r._1,r._2)
              logger.info("finish time:====="+r._1+"-------eventRecord:==="+r._2)
              }
            )
            ConnectionPool.closeConnection(connection)
          }
        }
      )
    })

//寫入hdfs
//    rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){
//      val out = rdd.map(_._1).first().toString
//      rdd.map(_._2).saveAsTextFile("/user/root/hive/jboss/"+out)

    ssc.start()
    ssc.awaitTermination()

  }



  def insertIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={
    // println(data.toString)
    try {
      val ps = con.prepareStatement(sql)
      ps.setString(1, ftime.toString)
      ps.setString(2, fcotext)
      ps.executeUpdate()
      ps.close()

    }catch{
      case exception:Exception=>
        logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------")
    }
  }
  def selectIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={
    // println(data.toString)
    try {
      val sql ="select * from SPATK_TEST t"
      val ps = con.prepareStatement(sql)
      val res = ps.executeQuery()
      while(res.next()){       //ѭ
        System.out.println(""+res.getString(1));
      }
      ps.close()
    }catch{
      case exception:Exception=>
        logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------")
    }
  }

}