1. 程式人生 > 實用技巧 >SparkStreaming消費Kafka資料並計算後往Redis寫資料案列

SparkStreaming消費Kafka資料並計算後往Redis寫資料案列

package com.lg.blgdata.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.storage.StorageLevel
import org.apache.kafka.common.serialization.StringDeserializer
import kafka.serializer.StringDecoder
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.PerPartitionConfig
import org.apache.spark.streaming.kafka010.PreferConsistent
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategy
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types.LongType
import java.util.Date
import scala.collection.mutable
import java.lang.Long
import org.apache.kafka.common.TopicPartition
import redis.clients.jedis.Jedis
import redis.clients.jedis.Pipeline
import com.lg.blgdata.utils.JedisConnectionPool
import com.lg.bigdata.utils.JZWUtil

/**
 * 1. 建立Driver 無狀態
 * 	kafka給redis推送實時5分鐘/流量,1天/流量
 */
object KafkaRedis {
	val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
			val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
			val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH")
			val daysdf = new SimpleDateFormat("yyyy-MM-dd")
			val fmtScornd = new SimpleDateFormat("ss")

			def main(args: Array[String]): Unit = {
					val groupId = "jwz"

							//1.建立SparkConf並初始化SSC
							val sparkConf = new SparkConf().setMaster("local[*]").setAppName("CarCount")
							val ssc = new StreamingContext(sparkConf, Seconds(1))
							ssc.sparkContext.setLogLevel("WARN")

							/*2.定義kafka引數將kafka引數對映為map
							 * earliest  當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
							 * latest  當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料
							 * none  topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常
							 */

							val kafkaParams = Map[String, Object](
									"bootstrap.servers" -> "hadoop104:9092", //kafka連結地址
									"key.deserializer" -> classOf[StringDeserializer], //序列化
									"value.deserializer" -> classOf[StringDeserializer], //反序列化
									"group.id" -> groupId, //主題
									"auto.offset.reset" -> "latest", //earliest latest
									"enable.auto.commit" -> (true: java.lang.Boolean) //是否讓消費者自己提交偏移量
									)

							val topics = Array("car")

							//3.通過KafkaUtil建立kafkaDSteam
							//官方推薦的直連方式,使用kafka底層的API,效率更高
							val kafkaDSteam = KafkaUtils.createDirectStream(
									ssc,
									LocationStrategies.PreferConsistent,
									ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

							//資料型別
							val schema = StructType(List(
									StructField("cameraId", StringType),
									StructField("time", StringType),
									StructField("lane_position", StringType),
									StructField("carType", StringType),
									StructField("speed", StringType),
									StructField("space", StringType)))

							//4.yKey結果輸出到redis
							var jedis: Jedis = null
							//開啟redis的(pipeline)事務
							var pipeline: Pipeline = null
							
							var spark:SparkSession =null

							/**
							 * 將reduceB
							 * 處理JSON字串為Row 生成RDD[Row] 然後通過schema建立DataFrame
							 * 左線 :V158
							 * 右線 :V005
							 */
							kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => {
								if (!rdd.isEmpty()) { //資料不為空
    								  if(spark==null){
    								    spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
    								  }
										  val df:DataFrame = spark.createDataFrame(rdd, schema)
											val map=getTime
											//主線左時間節點和點位篩選
											val dfV158=df.filter(" cameraId =='V158' and time >"+map.get("sdate").get).toDF()
											//篩選兩個列
											val countV158=dfV158.select("time","cameraId").count()

											//主線右時間節點和點位篩選
											val dfV005=df.filter(" cameraId =='V005' and time >"+map.get("sdate").get).toDF()
											//篩選兩個列
											val countV005=dfV005.select("time","cameraId").count()
											
											//主線右時間節點和點位篩選
											val dfV024=df.filter(" cameraId =='V024' and time >"+map.get("sdate").get).toDF()
											//篩選兩個列
											val countV024=dfV024.select("time","cameraId").count()

											try {

												   //獲取一個jedis連線池
    											  if(jedis==null){
    											    	jedis=JedisConnectionPool.getConnections()
    											  }
														jedis.select(3)//3號db,預設有16個

														//開啟pipeline
														pipeline=jedis.pipelined()
														//開啟多操作模式
														pipeline.multi()

														//寫入計算好的結果
														
														  /*
															*  pipeline.hset(x$1, x$2, x$3)//覆蓋
															* 	大key  小key  值
															* 	有則累加,無則新增
															*/
															//5s實時
															pipeline.hincrBy("SV158", format.format(map.get("edate").get),countV158)

															//分鐘實時
															pipeline.hincrBy("MV158", sdf.format(map.get("edate").get),countV158)

															//小時實時
															pipeline.hincrBy("HV158", hourSdf.format(map.get("edate").get),countV158)

															//天實時
															pipeline.hincrBy("DV158", daysdf.format(map.get("edate").get),countV158)

															//全線
															pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV158)
															
    											  //V005
    													pipeline.hincrBy("SV005",format.format(map.get("edate").get), countV005)
    													pipeline.hincrBy("MV005",sdf.format(map.get("edate").get),countV005)
    													pipeline.hincrBy("HV005",hourSdf.format(map.get("edate").get),countV005)
    													pipeline.hincrBy("DV005",daysdf.format(map.get("edate").get), countV005)
    
    													//全線
    													pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV005)
    											
    													//V024
															pipeline.hincrBy("HV024", hourSdf.format(map.get("edate").get),countV024)
															pipeline.hincrBy("DV024", daysdf.format(map.get("edate").get),countV024)
    													
												//提交事務
												pipeline.sync()
												pipeline.exec()

											} catch {
											case e: Exception => {
												e.printStackTrace()
												pipeline.discard()//放棄前面的操作
												ssc.stop(true)//優雅關閉
											}
											}finally{
												if(pipeline!=null){
													pipeline.close()
												}
												if(jedis!=null){
													jedis.close()
												}
											}
								}
							})
							//啟動採集器
							ssc.start()

							//Driver等待採集器的執行,採集器終止,Driver也會終止
							ssc.awaitTermination()
	}
	def getTime(): mutable.Map[String, Long] = {
			//計算出最新的5秒鐘時間節點
			val date: Calendar = Calendar.getInstance()
					val indexMinute = format.format(date.getTime())
					var dt: String = null
					val scornd = fmtScornd.format(date.getTime)
					if (Integer.valueOf(scornd) % 5 != 0) {
						val rs: Int = Integer.valueOf(scornd) / 5
								val min = (rs * 5 + 5).toString()
								val builderDate = new StringBuilder(indexMinute).replace(17, 19, min)
								dt = builderDate.toString()
					} else {
						dt = indexMinute
					}

					//算出上一個5秒鐘節點的結束時間
					val time: Date = format.parse(dt.toString())
							val sdate: Calendar = Calendar.getInstance()
							sdate.setTime(time)
							sdate.add(Calendar.SECOND, -5)

							var map: mutable.Map[String, Long] = mutable.Map()
							map("sdate") = sdate.getTimeInMillis.toLong //時間戳,用於做時間比對
							map("edate") = format.parse(dt).getTime().longValue() //存入redis的是格式化的時間
							(map)
	}

}