實時電商數倉(二十三)之實時計算(二)日活處理模組(二)精確一次消費
精確一次消費
1 定義
精確一次消費(Exactly-once)是指訊息一定會被處理且只會被處理一次。不多不少就一次處理。
如果達不到精確一次消費,可能會達到另外兩種情況:
至少一次消費(at least once),主要是保證資料不會丟失,但有可能存在資料重複問題。
最多一次消費 (at most once),主要是保證資料不會重複,但有可能存在資料丟失問題。
如果同時解決了資料丟失和資料重複的問題,那麼就實現了精確一次消費的語義了。
2 問題如何產生
資料何時會丟失: 比如實時計算任務進行計算,到資料結果存檔之前,程序崩潰,假設在程序崩潰前kafka調整了偏移量,那麼kafka就會認為資料已經被處理過,即使程序重啟,
資料何時會重複: 如果資料計算結果已經存檔了,在kafka調整偏移量之前,程序崩潰,那麼kafka會認為資料沒有被消費,程序重啟,會重新從舊的偏移量開始,那麼資料就會被2次消費,又會被存檔,資料就被存了2遍,造成資料重複。
3 如何解決
策略一:利用關係型資料庫的事務進行處理。
出現丟失或者重複的問題,核心就是偏移量的提交與資料的儲存,不是原子性的。如果能做成要麼資料儲存和偏移量都成功,要麼兩個失敗。那麼就不會出現丟失或者重複了。
這樣的話可以把存資料和偏移量放到一個事務裡。這樣就做到前面的成功,如果後面做失敗了,就回滾前面那麼就達成了原子性。
問題與限制
但是這種方式有限制就是資料必須都要放在某一個關係型資料庫中,無法使用其他功能強大的nosql資料庫。如果儲存的資料量較大一個數據庫節點不夠,多個節點的話,還要考慮分散式事務的問題。
策略二:手動提交偏移量+冪等性處理
咱們知道如果能夠同時解決資料丟失和資料重複問題,就等於做到了精確一次消費。
那咱們就各個擊破。
首先解決資料丟失問題,辦法就是要等資料儲存成功後再提交偏移量,所以就必須手工來控制偏移量的提交時機。
但是如果資料儲存了,沒等偏移量提交程序掛了,資料會被重複消費。怎麼辦?那就要把資料的儲存做成冪等性儲存。即同一批資料反覆儲存多次,資料不會翻倍,儲存一次和儲存一百次的效果是一樣的。如果能做到這個,就達到了冪等性儲存,就不用擔心資料會重複了。
難點
話雖如此,在實際的開發中手動提交偏移量其實不難,難的是冪等性的儲存,有的時候並不一定能保證。所以有的時候只能優先保證的資料不丟失。資料重複難以避免。即只保證了至少一次消費的語義。
4 手動提交偏移
流程
為什麼用redis儲存偏移量:
本身kafka 0.9版本以後consumer的偏移量是儲存在kafka的__consumer_offsets主題中。但是如果用這種方式管理偏移量,有一個限制就是在提交偏移量時,資料流的元素結構不能發生轉變,即提交偏移量時資料流,必須是InputDStream[ConsumerRecord[String, String]]這種結構。但是在實際計算中,資料難免發生轉變,或聚合,或關聯,一旦發生轉變,就無法在利用以下語句進行偏移量的提交:
xxDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)。
所以實際生產中通常會利用zookeeper,redis,mysql等工具對偏移量進行儲存。
偏移量管理類
package com.atguigu.gmall0105.realtime.util import java.util import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import redis.clients.jedis.Jedis object OffsetManager { //從redis中讀取偏移量 def getOffset(topicName:String,groupId:String): Map[TopicPartition,Long] ={ // Redis 中偏移量的儲存格式 type? hash key ? "offset:[topic]:[groupid]" field ? partition_id value ? offset expire val jedis: Jedis = RedisUtil.getJedisClient val offsetKey="offset:"+topicName+":"+groupId val offsetMap: util.Map[String, String] = jedis.hgetAll(offsetKey) jedis.close() import scala.collection.JavaConversions._ val kafkaOffsetMap: Map[TopicPartition, Long] = offsetMap.map { case (patitionId, offset) => println("載入分割槽偏移量:"+patitionId +":"+offset ) (new TopicPartition(topicName, patitionId.toInt), offset.toLong) }.toMap kafkaOffsetMap } def saveOffset(topicName:String ,groupId:String ,offsetRanges: Array[OffsetRange]): Unit ={ //redis偏移量的寫入 // Redis 中偏移量的儲存格式 type? hash key ? "offset:[topic]:[groupid]" field ? partition_id value ? offset expire val offsetKey="offset:"+topicName+":"+groupId val offsetMap:util.Map[String,String]=new util.HashMap() //轉換結構 offsetRanges -> offsetMap for (offset <- offsetRanges) { val partition: Int = offset.partition val untilOffset: Long = offset.untilOffset offsetMap.put(partition+"",untilOffset+"") //println("寫入分割槽:"+partition +":"+offset.fromOffset+"-->"+offset.untilOffset) } //寫入redis if(offsetMap!=null&&offsetMap.size()>0){ val jedis: Jedis = RedisUtil.getJedisClient jedis.hmset(offsetKey,offsetMap) jedis.close() } } //把偏移量寫入redis }
根據自定義偏移量載入的讀取kafka資料
def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dau_app") val ssc = new StreamingContext(sparkConf, Seconds(5)) val groupId = "GMALL_DAU_CONSUMER" val topic = "GMALL_START" //從redis讀取偏移量 val startupOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId,topic) //根據偏移起始點獲得資料 val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc,startupOffsets,groupId) //獲得偏移結束點 var startupOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] val startupInputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = startupInputDstream.transform { rdd => startupOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } val startLogInfoDStream: DStream[JSONObject] = startupInputGetOffsetDstream.map { record => val startupJson: String = record.value() val startupJSONObj: JSONObject = JSON.parseObject(startupJson) val ts: lang.Long = startupJSONObj.getLong("ts") startupJSONObj } startLogInfoDStream.print(100) ….. ….. ….. ….. dauDstream.foreachRDD{rdd=> rdd.foreachPartition{dauInfoItr=> ///可以觀察偏移量 if(startupOffsetRanges!=null&&startupOffsetRanges.size>0){ val offsetRange: OffsetRange = startupOffsetRanges(TaskContext.get().partitionId()) println("from:"+offsetRange.fromOffset +" --- to:"+offsetRange.untilOffset) } val dauInfoWithIdList: List[(String, DauInfo)] = dauInfoItr.toList.map(dauInfo=>(dauInfo.dt+ "_"+dauInfo.mid,dauInfo)) val dateStr: String = new SimpleDateFormat("yyyyMMdd").format(new Date()) MyEsUtil.bulkInsert(dauInfoWithIdList,"gmall_dau_info_"+dateStr) } //在儲存時最後提交偏移量 OffsetManager.saveOffset(groupId ,topic, startupOffsetRanges) //如果流發生了轉換,無法用以下方法提交偏移量 //dauDstream.asInstanceOf[CanCommitOffsets].commitAsync(startupOffsetRanges) } ….. …..
5 最終處理日活程式碼
package com.atguigu.gmall0105.realtime.app import java.lang import java.text.SimpleDateFormat import java.util.Date import com.alibaba.fastjson.{JSON, JSONObject} import com.atguigu.gmall0105.realtime.bean.DauInfo import com.atguigu.gmall0105.realtime.util.{MyEsUtil, MyKafkaUtil, OffsetManager, RedisUtil} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.Jedis import scala.collection.mutable.ListBuffer object DauApp { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("dau_app").setMaster("local[4]") val ssc = new StreamingContext(sparkConf,Seconds(5)) val topic="GMALL_STARTUP_0105" val groupId="DAU_GROUP" val kafkaOffsetMap: Map[TopicPartition, Long] = OffsetManager.getOffset(topic,groupId) var recordInputStream: InputDStream[ConsumerRecord[String, String]]=null if(kafkaOffsetMap!=null&&kafkaOffsetMap.size>0){ recordInputStream = MyKafkaUtil.getKafkaStream(topic,ssc,kafkaOffsetMap,groupId) }else{ recordInputStream = MyKafkaUtil.getKafkaStream(topic,ssc) } //得到本批次的偏移量的結束位置,用於更新redis中的偏移量 var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = recordInputStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //driver? executor? //週期性的執行 rdd } // recordInputStream.map(_.value()).print() val jsonObjDstream: DStream[JSONObject] = inputGetOffsetDstream.map { record => val jsonString: String = record.value() val jsonObj: JSONObject = JSON.parseObject(jsonString) val ts: lang.Long = jsonObj.getLong("ts") val datehourString: String = new SimpleDateFormat("yyyy-MM-dd HH").format(new Date(ts)) val dateHour: Array[String] = datehourString.split(" ") jsonObj.put("dt", dateHour(0)) jsonObj.put("hr", dateHour(1)) jsonObj } //去重思路: 利用redis儲存今天訪問過系統的使用者清單 //清單在redis中儲存 //redis : type set string hash list set zset key ? dau:2020-06-17 value? mid (field? score?) (expire?) 24小時 // println("過濾前:::"+jsonObjDstream.count()) val filteredDstream: DStream[JSONObject] = jsonObjDstream.mapPartitions { jsonObjItr => val jedis: Jedis = RedisUtil.getJedisClient //一個分割槽只申請一次連線 val filteredList=new ListBuffer[JSONObject]() // Iterator 只能迭代一次 包括取size 所以要取size 要把迭代器轉為別的容器 val jsonList: List[JSONObject] = jsonObjItr.toList // println("過濾前:"+jsonList.size) for (jsonObj <- jsonList) { val dt: String = jsonObj.getString("dt") val mid: String = jsonObj.getJSONObject("common").getString("mid") val dauKey = "dau:" + dt val isNew: lang.Long = jedis.sadd(dauKey, mid) //如果未存在則儲存 返回1 如果已經存在則不儲存 返回0 jedis.expire(dauKey,3600*24) if (isNew == 1L) { filteredList+=jsonObj } } jedis.close() // println("過濾後:"+filteredList.size) filteredList.toIterator } filteredDstream.foreachRDD{rdd=> rdd.foreachPartition{jsonItr=> val list: List[JSONObject] = jsonItr.toList //把源資料 轉換成為要儲存的資料格式 val dauList: List[(String,DauInfo)] = list.map { jsonObj => val commonJSONObj: JSONObject = jsonObj.getJSONObject("common") val dauInfo = DauInfo(commonJSONObj.getString("mid"), commonJSONObj.getString("uid"), commonJSONObj.getString("ar"), commonJSONObj.getString("ch"), commonJSONObj.getString("vc"), jsonObj.getString("dt"), jsonObj.getString("hr"), "00", jsonObj.getLong("ts") ) (dauInfo.mid,dauInfo) } val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) MyEsUtil.bulkDoc(dauList,"gmall0105_dau_info_"+dt) } /// // 偏移量提交區 OffsetManager.saveOffset(topic,groupId,offsetRanges) /// } ssc.start() ssc.awaitTermination() } }