kafka資料快取到redis的全路徑操作流程
第一步:配置redis客戶端
spark中配置redis客戶端的程式碼參考:
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
object RedisClient extends Serializable {
val redisHost = "192.168.16.100"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}
若出錯可能缺少jar包,需要引入common-pool2-2.2.jar 和 jedis-2.6.jar
第二步:資料輸入到kafka中,本列使用sparkstream
①Kafka生產資料
package Traffic
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jettison.json.JSONObject
/**
* Created by Administrator on 2017/10/14.
* 功能:SparkStream作為kafka的生產者,將制定檔案資料打到kafka中
*
*/
object KafkaEventProducer {
def main(args: Array[String]): Unit = {
//建立topic
val topic="car_event"
val brokers="192.168.17.108:9092"
val props=new Properties()
//把broker put進去
props.put("metadata.broker.list",brokers)
//把kafka編譯器放進去
props.put("serializer.class","kafka.serializer.StringEncoder")
//配置kafka的config(配置)
val kafkaconfig=new ProducerConfig(props)
val producer=new Producer[String,String](kafkaconfig)
//配置spark的config
val conf=new SparkConf().setAppName("KafkaEventProducer").setMaster("local[2]")
val sc=new SparkContext(conf)
//從path中載入資料
// val filePath="data/shuju.txt"
val filePath="c://test//shuju.txt"
//載入資料並進行切分
val records=sc.textFile(filePath)
.filter(!_.startsWith(";"))
.map(_.split(",")).collect()
//對資料進行預處理形成Json形式
for(temp <-records)
{
val event=new JSONObject()
//因為要put很多資料,這樣看起來很規範
event
.put("camer_id",temp(0)) //相機編號
.put("car_id",temp(2)) //車牌號
.put("event_time",temp(4)) //時間
.put("car_speed",temp(6)) //速度
.put("car_speed",temp(13)) //車道編號
//生產event資訊 topic 是往哪個topic中生產資料 event.toString是生產的真正的內容
producer.send(new KeyedMessage[String,String](topic,event.toString))
println("Message Sent: "+event)
Thread.sleep(200) //休息200微秒
}
sc.stop()
}
}
需要commons-pool2-2.2.jar,jedis-2.6.1.jar和json-lib-2.3-jdk15.jar
②啟動kafka 建立car_event 和 topic
start-kafka.sh
kafka-topics.sh --create --zookeeper hadoop:2181 --topic car_event --partitions 1 --replication-factor 1
Created topic "car_event".
第三小步:啟動car_event的topic的消費者,此步僅僅是為了驗證資料的
Kafka-console-consumer.sh --topic car_event --zookeeper hadoop:2181
第三步:idea中部署kafka打入redis的程式碼,如下所示:
package Traffic
import java.text.SimpleDateFormat
import java.util.Calendar
import kafka.serializer.{StringDecoder, StringEncoder}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import net.sf.json.JSONObject
/**
* Created by Administrator on 2017/10/14.
* 功能: 從kafka中獲取資料寫入到redis中
*
*/
object CarEventAnalysis {
def main(args: Array[String]): Unit = {
//配置SparkStrteaming
val conf=new SparkConf().setAppName("CarEventAnalysis").setMaster("local[2]")
val sc=new SparkContext(conf)
val ssc=new StreamingContext(sc,Seconds(5))
val dbindex=1 //指定是用哪個資料庫進行連線
//從kafka中讀取資料(用直連的方法)
val topics=Set("car_event")
// 只要和brokers相關的都要寫全
val brokers="192.168.17.108:9092"
//配置kafka引數
val kafkaParams=Map[String,String](
"metadata.broker.list"->brokers,
"serializer.class"->"kafka.serializer.StringEncoder"
)
//建立一個流 這是一個模板程式碼 引數中的兩個String代表的是kafka的鍵值對的資料,及key和value
val kafkaStream=KafkaUtils.createDirectStream[String,String,
StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//從kafka中將資料讀出
val events=kafkaStream.flatMap(line=>{
//轉換為object
val data=JSONObject.fromObject(line._2) // ._2是真正的資料
// println(data)
//必須用Some修飾data option有兩個子類 none 代表無值 some代表有值
// 加上some表示一定有值,後面有x.getString和x.getInt,保證程式能知道有值
Some(data)
})
//從kafka中取出卡口編號和速度資料
val carspeed=events.map(x=>(x.getString("camer_id"),x.getInt("car_speed")))
//把資料變成(camer_id,(car_speed,1))
.mapValues((x:Int)=>(x,1.toInt))
//每隔10秒計算一次前20秒的速度(4個rdd) Tuple2表示兩個引數
// (速度,數量) (速度,數量)
.reduceByKeyAndWindow((a:Tuple2[Int,Int], b:Tuple2[Int,Int]) =>
{(a._1 + b._1,a._2 + b._2)},Seconds(20),Seconds(10))
// carspeed 速度之和 數量之和
// carspeed.map{case(key,value)=>(key,value._1/value._2.toFloat)}
carspeed.foreachRDD(rdd=>{
rdd.foreachPartition(partitionofRecords=>{
//得到連線池的一個資源
val jedis=RedisClient.pool.getResource
// camer_id 卡口以及總的速度
partitionofRecords.foreach(pair=>{
val camer_id=pair._1 //卡口
val total_speed=pair._2._1 //總的速度
val count=pair._2._2 //總的數量
val now=Calendar.getInstance().getTime() //獲取當前的時間
val minuteFormat=new SimpleDateFormat("HHmm") //獲取分鐘格式
val dayFormat=new SimpleDateFormat("yyyyMMdd") //獲取天格式
val time = minuteFormat.format(now) //獲取分鐘
val day = dayFormat.format(now) //獲取天
//開始往redis中插入資料
if(count!=0){
jedis.select(dbindex) //用選擇的資料庫
// set進去一個map
jedis.hset(day + "_" + camer_id, time ,total_speed + "_" + count)
// 從redis中取資料
val foreachdata=jedis.hget(day + "_" + camer_id, time)
println(foreachdata)
}
})
RedisClient.pool.returnResource(jedis)
})
})
println("----------計算開始---------------------------")
ssc.start()
ssc.awaitTermination()
}
}
第四步: idea中執行第三步部署好的kafka打入redis的程式碼
記得要引入ezmorph-1.0.6.jar, commons-collections-3.2.jar ,
commons-lang-2.3.jar,commons-pool2-2.2.jar, 共四個jar
第五步:執行第二步的往kafka中打資料的程式
第六步:登入到redis的客戶端,驗證資料是否存入redis中
redis-cli -p 12002
Select 資料庫名稱