1. 程式人生 > >SparkStreaming(SparkSQL)+Kafka+Oracle 使用SQL完成的實時累加統計

SparkStreaming(SparkSQL)+Kafka+Oracle 使用SQL完成的實時累加統計

Kafka+SparkStreaming已經發展為一個比較成熟的實時日誌收集與計算架構,利用Kafka,即可以支援將用於離線分析的資料流到HDFS,又可以同時支撐多個消費者實時消費資料,包括SparkStreaming。然而,在SparkStreaming程式中如果有複雜業務邏輯的統計,使用scala程式碼實現起來比較困難,也不易於別人理解。但如果在SparkSteaming中也使用SQL來做統計分析,是不是就簡單的多呢?

本文介紹將SparkSQL與SparkStreaming結合起來,使用SQL完成實時的日誌資料統計。SparkStreaming程式以yarn-cluster模式執行在YARN上,不單獨部署Spark叢集。

環境部署 Hadoop-2.6.0-cdh5.8.0(YARN) spark-2.1.0-bin-hadoop2.6 kafka-0.10.2+kafka2.2.0

實時統計需求 以10秒為間隔,統計10秒內的各大區潛客的數量 pom

 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${spark.artifact}</artifactId>
            <version
>
${spark.version}</version> <scope>${dependency.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version
>
${spark.version}</version> <scope>${dependency.scope}</scope> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>11.2.0.3</version> <scope>${dependency.scope}</scope> </dependency>

SparkStreaming程式程式碼

package com.chumi.dac.sp.stream.sparksqlcount

import com.chumi.dac.sp.stream.jdbc.DBCustomerStream
import com.chumi.dac.sp.stream.utils.DateUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.streaming.dstream. InputDStream

/**
  * Created by LHX on 2018/8/24 14:37.
  */

object CustomerStreamRsscCount {
    /**
      * BroadcastWrapper,用來註冊廣播變數。
      */
    object BroadcastWrapper {
        @volatile private var instance:Broadcast[String]=null
        def getInstance(sc: SparkContext): Broadcast[String] = {
            val point_time: String = DateUtil.getPointTime()
            if (instance == null) {
                synchronized {
                    if (instance == null) {
                        instance = sc.broadcast(point_time)
                        println("==初始化全域性變數=="+point_time)
                    }
                }
            }
            instance
        }
        def update(sc: SparkContext, blocking: Boolean = false,hv:String): Broadcast[String] = {
            if (instance != null)
                instance.unpersist(blocking)
            instance = sc.broadcast(hv)
            println("==更新=="+hv)
            instance
        }
    }
    /**
      * SQLContextSingleton
      */
    object SQLContextSingleton {
        @transient  private var instance: SQLContext = _
        def getInstance(sparkContext: SparkContext): SQLContext = {
            if (instance == null) {
                instance = new SQLContext(sparkContext)
            }
            instance
        }
    }
    case class DapLog(CIM_ID:String, ENTITY_CODE:String, CARD_FOUND_TIME:String)

    def main(args: Array[String]) {

        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setAppName("CustomerStreamRsscCount").setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(10))
            val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)

            //要使用updateStateByKey方法,必須設定Checkpoint。
            ssc.checkpoint("C:/tmp/checkPointPath")

            //TM_SST
            val jdbcMaps = Map("url" -> "jdbc:oracle:thin:@//IP:1521/test",
                "user" -> "user",
                "password" -> "password",
                "dbtable" -> "TM_SST",
                "driver" -> "oracle.jdbc.driver.OracleDriver")
            val jdbcDFs = sqlContext.read.options(jdbcMaps).format("jdbc").load
            jdbcDFs.createOrReplaceTempView("TM_SST")

            //TM_RSSC
            val jdbcMapc = Map("url" -> "jdbc:oracle:thin:@//IP:1521/test",
                "user" -> "user",
                "password" -> "password",
                "dbtable" -> "TM_RSSC",
                "driver" -> "oracle.jdbc.driver.OracleDriver")
            val jdbcDFv = sqlContext.read.options(jdbcMapc).format("jdbc").load
            jdbcDFv.createOrReplaceTempView("TM_RSSC")

            val topics = "topic1" //stream_test01 topic1
            val topicsSet = topics.split(",").toSet
            val brokers = "IP:9095"

            val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
                , "auto.offset.reset" -> "latest"
                , "sasl.kerberos.service.name" -> "kafka"
                , "key.deserializer" -> classOf[StringDeserializer]
                , "value.deserializer" -> classOf[StringDeserializer]
                , "group.id" -> "testgroup"
            )


            val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
            val value = dStream.transform(rdd => {
                val sqlC = SQLContextSingleton.getInstance(rdd.sparkContext)
                import sqlContext.implicits._
                val logDataFrame = rdd.map(w => {
                    val m: Array[String] = w.value().split(",")
                    DapLog(m(0), m(1), m(9))
                }).toDF()
                // 註冊為tempTable
                logDataFrame.createOrReplaceTempView("TT_CUSTOMER")
                val sql = "select R.RSSC_ID,R.RSSC_NAME,COUNT(1) FROM TT_CUSTOMER  T join  TM_SST S on T.ENTITY_CODE = S.ENTITYCODE join TM_RSSC R ON S.RSSC_ID = R.RSSC_ID  GROUP BY R.RSSC_ID,R.RSSC_NAME"
                val data1: DataFrame = sqlC.sql(sql)
                val a =data1.rdd.map{r =>(r(1).toString,r(2).toString.toInt) }
                a
            })
            //將以前的資料和最新10s的資料進行求和
            val addFunction = (currValues : Seq[Int],preVauleState : Option[Int]) => {
                val currentSum = currValues.sum
                val previousSum = preVauleState.getOrElse(0)
                Some(currentSum + previousSum)
            }
            val total = value.updateStateByKey[Int](addFunction)
            //輸出總計的結果
            total.print()
            ssc
    }
    //重啟streamingContext,讀取以前儲存的資料,否則建立新的StreamingContext
    val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _)
    context.start()
    context.awaitTermination()

    }
}

總結 其中廣播變數是後期根據時間篩選時候使用的,整體思路是先讀取oracle資料並註冊成臨時表,後獲取kafka資料,根據dStream.transform()方法把資料轉換成想要的結果,最後用updateStateByKey()方法累加上一批次的統計結果。 對於初學者很多sparkstream方法還不是很熟悉,所以寫程式碼想不到使用,如果對大家有所幫助,記得點贊哦~