SparkStreaming(SparkSQL)+Kafka+Oracle 使用SQL完成的實時累加統計(更新)
阿新 • • 發佈:2019-01-07
Kafka+SparkStreaming已經發展為一個比較成熟的實時日誌收集與計算架構,利用Kafka,即可以支援將用於離線分析的資料流到HDFS,又可以同時支撐多個消費者實時消費資料,包括SparkStreaming。然而,在SparkStreaming程式中如果有複雜業務邏輯的統計,使用scala程式碼實現起來比較困難,也不易於別人理解。但如果在SparkSteaming中也使用SQL來做統計分析,是不是就簡單的多呢?
本文介紹將SparkSQL與SparkStreaming結合起來,使用SQL完成實時的日誌資料統計。SparkStreaming程式以yarn-cluster模式執行在YARN上,不單獨部署Spark叢集。
環境部署
Hadoop-2.6.0-cdh5.13.0(YARN)
spark-2.1.0-bin-hadoop2.6
kafka-0.9.0+kafka2.0.1(上次使用的是kafka-0.10.0版本)
實時統計需求
以10秒為間隔,統計10秒內的各大區潛客的數量
pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${spark.artifact}</artifactId>
<version>${spark.version}</version>
<scope>${dependency.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId >
<version>${spark.version}</version>
<scope>${dependency.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
<scope>${dependency.scope}</scope>
</dependency>
SparkStreaming程式程式碼
package com.chumi.dac.sp.stream.sparksqlcount
import kafka.serializer.StringDecoder
import com.chumi.dac.sp.stream.jdbc.DBCustomerStreamRssc
import com.chumi.dac.sp.stream.utils.DateUtil
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
/**
* Created by LHX on 2018/8/24 14:37.
* 從kafka讀取資料 結合oracle資料 join
*/
object CustomerStreamRsscCount {
/**
* BroadcastWrapper,用來註冊廣播變數
*/
object BroadcastWrapper {
@volatile private var instance:Broadcast[String]=null
def getInstance(sc: SparkContext): Broadcast[String] = {
val point_time: String = DateUtil.getPointTime()
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(point_time)
//初始化資料庫
DBCustomerStreamRssc.initializeHour(point_time)
println("==初始化全域性變數=="+point_time)
}
}
}
instance
}
def update(sc: SparkContext, blocking: Boolean = false,hv:String): Broadcast[String] = {
if (instance != null)
instance.unpersist(blocking)
instance = sc.broadcast(hv)
println("==更新=="+hv)
instance
}
}
/**
* SQLContextSingleton
*/
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
case class DapLog(CIM_ID:String, ENTITY_CODE:String, CARD_FOUND_TIME:String)
def main(args: Array[String]) {
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("CustomerStreamRsscCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)
//要使用updateStateByKey方法,必須設定Checkpoint
ssc.checkpoint("C:/tmp/checkPointPath")
//TM_SST
val jdbcMaps = Map("url" -> "jdbc:oracle:thin:@//IP:1521/DB",
"user" -> "user",
"password" -> "password",
"dbtable" -> "dbtable",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFs = sqlContext.read.options(jdbcMaps).format("jdbc").load
jdbcDFs.createOrReplaceTempView("TM_SST")
//TM_RSSC
val jdbcMapc = Map("url" -> "jdbc:oracle:thin:@//IP:1521/DB",
"user" -> "user",
"password" -> "password",
"dbtable" -> "dbtable",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFv = sqlContext.read.options(jdbcMapc).format("jdbc").load
jdbcDFv.createOrReplaceTempView("TM_RSSC")
val topics = "test01"
val topicsSet = topics.split(",").toSet
val brokers = "svldl072.test.com:9092,svldl073.test.com:9092,svldl077.test.com:9092"
val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)
//註冊廣播變數
var broadcast: Broadcast[String] = BroadcastWrapper.getInstance(ssc.sparkContext)
//按小時累加
var hourValue = ""
var zeroTime = ""
val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val value = dStream.transform(rdd => {
//獲取SQLContext
val sqlC = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val logDataFrame = rdd.map(w => {
val m: Array[String] = w._2.split(",")
DapLog(m(0), m(1), m(9))
}).toDF()
// 註冊為tempTable
logDataFrame.createOrReplaceTempView("TT_VW_POTENTIAL_CUSTOMER")
val sql = "select R.RSSC_ID,R.RSSC_NAME,COUNT(1) FROM TT_VW_POTENTIAL_CUSTOMER T join TM_SST S on T.ENTITY_CODE = S.ENTITYCODE join TM_RSSC R ON S.RSSC_ID = R.RSSC_ID GROUP BY R.RSSC_ID,R.RSSC_NAME"
val data1: DataFrame = sqlC.sql(sql)
val a =data1.rdd.map{r =>(r(1).toString,r(2).toString.toInt) }
a
})
//將以前的資料和最新10s的資料進行求和
val addFunction = (currValues : Seq[Int],preValueState : Option[Int]) => {
val currentSum = currValues.sum
val previousSum = preValueState.getOrElse(0)
//如果全域性變數更新清零
if(broadcast.value != hourValue){
Some(currentSum)
}else{
Some(currentSum + previousSum)
}
}
val total: DStream[(String, Int)] = value.updateStateByKey[Int](addFunction)
//輸出總計的結果
total.foreachRDD(rdd=>{
hourValue = DateUtil.getPointTime()
zeroTime = DateUtil.getZeroTime()
// 收集資料,遍歷
val tuples: Array[(String, Int)] = rdd.collect()
//如果當前整點時間 !=全域性整點時間
if(broadcast.value!=null && broadcast.value != hourValue){
//插入資料庫
DBCustomerStreamRssc.initializeHour(hourValue)
for (i <- 0 until tuples.length){
DBCustomerStreamRssc.updateTable(tuples(i)._1,tuples(i)._2,hourValue)
}
//更新全域性整點時間
broadcast = BroadcastWrapper.update(rdd.sparkContext, true,hourValue)
println("==更新後=="+broadcast.value)
}else{
//更新資料庫
for (i <- 0 until tuples.length){
DBCustomerStreamRssc.updateTable(tuples(i)._1,tuples(i)._2,broadcast.value)
}
}
})
ssc
}
//重啟streamingContext,讀取以前儲存的資料,否則建立新的StreamingContext
val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _)
context.start()
context.awaitTermination()
}
}
遇到的問題:
報錯
Error:(119, 71) type arguments [String,String,org.apache.commons.codec.StringDecoder,org.apache.commons.codec.StringDecoder] conform to the bounds of none of the overloaded alternatives of
value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String], topics: java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V] <and> [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], topics: Set[String])(implicit evidence$19: scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V], implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22: scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
解決方法:把kafka_2.11版本從0.9.0.0改成0.8.2.1即可。
總結
其中廣播變數是根據時間篩選資料時候使用的,整體思路是先讀取oracle資料並註冊成臨時表,後獲取kafka資料,根據dStream.transform()方法把資料轉換成想要的結果,最後用updateStateByKey()方法累加上一批次的統計結果。 對於初學者很多sparkstream方法還不是很熟悉,所以寫程式碼想不到使用,如果對大家有所幫助,記得點贊哦~