SparkStreaming消費Kafka資料並計算後往Redis寫資料案列
阿新 • • 發佈:2021-01-21
package com.lg.blgdata.streaming import org.apache.spark.streaming.StreamingContext import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.storage.StorageLevel import org.apache.kafka.common.serialization.StringDeserializer import kafka.serializer.StringDecoder import kafka.serializer.StringDecoder import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.PerPartitionConfig import org.apache.spark.streaming.kafka010.PreferConsistent import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategy import org.apache.spark.streaming.Seconds import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.sql.Dataset import org.apache.spark.sql.types.LongType import java.util.Date import scala.collection.mutable import java.lang.Long import org.apache.kafka.common.TopicPartition import redis.clients.jedis.Jedis import redis.clients.jedis.Pipeline import com.lg.blgdata.utils.JedisConnectionPool import com.lg.bigdata.utils.JZWUtil /** * 1. 建立Driver 無狀態 * kafka給redis推送實時5分鐘/流量,1天/流量 */ object KafkaRedis { val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH") val daysdf = new SimpleDateFormat("yyyy-MM-dd") val fmtScornd = new SimpleDateFormat("ss") def main(args: Array[String]): Unit = { val groupId = "jwz" //1.建立SparkConf並初始化SSC val sparkConf = new SparkConf().setMaster("local[*]").setAppName("CarCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.sparkContext.setLogLevel("WARN") /*2.定義kafka引數將kafka引數對映為map * earliest 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 * latest 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 * none topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop104:9092", //kafka連結地址 "key.deserializer" -> classOf[StringDeserializer], //序列化 "value.deserializer" -> classOf[StringDeserializer], //反序列化 "group.id" -> groupId, //主題 "auto.offset.reset" -> "latest", //earliest latest "enable.auto.commit" -> (true: java.lang.Boolean) //是否讓消費者自己提交偏移量 ) val topics = Array("car") //3.通過KafkaUtil建立kafkaDSteam //官方推薦的直連方式,使用kafka底層的API,效率更高 val kafkaDSteam = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //資料型別 val schema = StructType(List( StructField("cameraId", StringType), StructField("time", StringType), StructField("lane_position", StringType), StructField("carType", StringType), StructField("speed", StringType), StructField("space", StringType))) //4.yKey結果輸出到redis var jedis: Jedis = null //開啟redis的(pipeline)事務 var pipeline: Pipeline = null var spark:SparkSession =null /** * 將reduceB * 處理JSON字串為Row 生成RDD[Row] 然後通過schema建立DataFrame * 左線 :V158 * 右線 :V005 */ kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => { if (!rdd.isEmpty()) { //資料不為空 if(spark==null){ spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() } val df:DataFrame = spark.createDataFrame(rdd, schema) val map=getTime //主線左時間節點和點位篩選 val dfV158=df.filter(" cameraId =='V158' and time >"+map.get("sdate").get).toDF() //篩選兩個列 val countV158=dfV158.select("time","cameraId").count() //主線右時間節點和點位篩選 val dfV005=df.filter(" cameraId =='V005' and time >"+map.get("sdate").get).toDF() //篩選兩個列 val countV005=dfV005.select("time","cameraId").count() //主線右時間節點和點位篩選 val dfV024=df.filter(" cameraId =='V024' and time >"+map.get("sdate").get).toDF() //篩選兩個列 val countV024=dfV024.select("time","cameraId").count() try { //獲取一個jedis連線池 if(jedis==null){ jedis=JedisConnectionPool.getConnections() } jedis.select(3)//3號db,預設有16個 //開啟pipeline pipeline=jedis.pipelined() //開啟多操作模式 pipeline.multi() //寫入計算好的結果 /* * pipeline.hset(x$1, x$2, x$3)//覆蓋 * 大key 小key 值 * 有則累加,無則新增 */ //5s實時 pipeline.hincrBy("SV158", format.format(map.get("edate").get),countV158) //分鐘實時 pipeline.hincrBy("MV158", sdf.format(map.get("edate").get),countV158) //小時實時 pipeline.hincrBy("HV158", hourSdf.format(map.get("edate").get),countV158) //天實時 pipeline.hincrBy("DV158", daysdf.format(map.get("edate").get),countV158) //全線 pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV158) //V005 pipeline.hincrBy("SV005",format.format(map.get("edate").get), countV005) pipeline.hincrBy("MV005",sdf.format(map.get("edate").get),countV005) pipeline.hincrBy("HV005",hourSdf.format(map.get("edate").get),countV005) pipeline.hincrBy("DV005",daysdf.format(map.get("edate").get), countV005) //全線 pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV005) //V024 pipeline.hincrBy("HV024", hourSdf.format(map.get("edate").get),countV024) pipeline.hincrBy("DV024", daysdf.format(map.get("edate").get),countV024) //提交事務 pipeline.sync() pipeline.exec() } catch { case e: Exception => { e.printStackTrace() pipeline.discard()//放棄前面的操作 ssc.stop(true)//優雅關閉 } }finally{ if(pipeline!=null){ pipeline.close() } if(jedis!=null){ jedis.close() } } } }) //啟動採集器 ssc.start() //Driver等待採集器的執行,採集器終止,Driver也會終止 ssc.awaitTermination() } def getTime(): mutable.Map[String, Long] = { //計算出最新的5秒鐘時間節點 val date: Calendar = Calendar.getInstance() val indexMinute = format.format(date.getTime()) var dt: String = null val scornd = fmtScornd.format(date.getTime) if (Integer.valueOf(scornd) % 5 != 0) { val rs: Int = Integer.valueOf(scornd) / 5 val min = (rs * 5 + 5).toString() val builderDate = new StringBuilder(indexMinute).replace(17, 19, min) dt = builderDate.toString() } else { dt = indexMinute } //算出上一個5秒鐘節點的結束時間 val time: Date = format.parse(dt.toString()) val sdate: Calendar = Calendar.getInstance() sdate.setTime(time) sdate.add(Calendar.SECOND, -5) var map: mutable.Map[String, Long] = mutable.Map() map("sdate") = sdate.getTimeInMillis.toLong //時間戳,用於做時間比對 map("edate") = format.parse(dt).getTime().longValue() //存入redis的是格式化的時間 (map) } }