1. 程式人生 > 實用技巧 >實時電商數倉(二十三)之實時計算(二)日活處理模組(二)精確一次消費

實時電商數倉(二十三)之實時計算(二)日活處理模組(二)精確一次消費

精確一次消費

1 定義

精確一次消費(Exactly-once是指訊息一定會被處理且只會被處理一次。不多不少就一次處理。

如果達不到精確一次消費,可能會達到另外兩種情況:

至少一次消費at least once,主要是保證資料不會丟失,但有可能存在資料重複問題。

最多一次消費 at most once,主要是保證資料不會重複,但有可能存在資料丟失問題。

如果同時解決了資料丟失和資料重複的問題,那麼就實現了精確一次消費的語義了。

2 問題如何產生

資料何時會丟失 比如實時計算任務進行計算,到資料結果存檔之前,程序崩潰,假設在程序崩潰前kafka調整了偏移量,那麼kafka就會認為資料已經被處理過,即使程序重啟,

kafka也會從新的偏移量開始,所以之前沒有儲存的資料就被丟失掉了。

資料何時會重複 如果資料計算結果已經存檔了,在kafka調整偏移量之前,程序崩潰,那麼kafka會認為資料沒有被消費,程序重啟,會重新從舊的偏移量開始,那麼資料就會被2次消費,又會被存檔,資料就被存了2遍,造成資料重複。

3 如何解決

策略一:利用關係型資料庫的事務進行處理。

出現丟失或者重複的問題,核心就是偏移量的提交與資料的儲存,不是原子性的。如果能做成要麼資料儲存和偏移量都成功,要麼兩個失敗。那麼就不會出現丟失或者重複了。

這樣的話可以把存資料和偏移量放到一個事務裡。這樣就做到前面的成功,如果後面做失敗了,就回滾前面那麼就達成了原子性。

問題與限制

但是這種方式有限制就是資料必須都要放在某一個關係型資料庫中,無法使用其他功能強大的nosql資料庫。如果儲存的資料量較大一個數據庫節點不夠,多個節點的話,還要考慮分散式事務的問題。

策略二:手動提交偏移量+冪等性處理

咱們知道如果能夠同時解決資料丟失和資料重複問題,就等於做到了精確一次消費。

那咱們就各個擊破。

首先解決資料丟失問題,辦法就是要等資料儲存成功後再提交偏移量,所以就必須手工來控制偏移量的提交時機。

但是如果資料儲存了,沒等偏移量提交程序掛了,資料會被重複消費。怎麼辦?那就要把資料的儲存做成冪等性儲存。即同一批資料反覆儲存多次,資料不會翻倍,儲存一次和儲存一百次的效果是一樣的。如果能做到這個,就達到了冪等性儲存,就不用擔心資料會重複了。

難點

話雖如此,在實際的開發中手動提交偏移量其實不難,難的是冪等性的儲存,有的時候並不一定能保證。所以有的時候只能優先保證的資料不丟失。資料重複難以避免。即只保證了至少一次消費的語義

4 手動提交偏移

流程

為什麼用redis儲存偏移量:

本身kafka 0.9版本以後consumer的偏移量是儲存在kafka的__consumer_offsets主題中。但是如果用這種方式管理偏移量,有一個限制就是在提交偏移量時,資料流的元素結構不能發生轉變,即提交偏移量時資料流,必須是InputDStream[ConsumerRecord[String, String]]這種結構。但是在實際計算中,資料難免發生轉變,或聚合,或關聯,一旦發生轉變,就無法在利用以下語句進行偏移量的提交

xxDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)。

所以實際生產中通常會利用zookeeper,redis,mysql等工具對偏移量進行儲存。

偏移量管理類

package com.atguigu.gmall0105.realtime.util

import java.util

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis



object OffsetManager {


  //從redis中讀取偏移量
  def  getOffset(topicName:String,groupId:String): Map[TopicPartition,Long]  ={
    // Redis 中偏移量的儲存格式   type?  hash   key ?  "offset:[topic]:[groupid]"    field ?  partition_id  value ?  offset     expire
    val jedis: Jedis = RedisUtil.getJedisClient
    val offsetKey="offset:"+topicName+":"+groupId
    val offsetMap: util.Map[String, String] = jedis.hgetAll(offsetKey)
    jedis.close()
    import scala.collection.JavaConversions._
    val kafkaOffsetMap:  Map[TopicPartition, Long] = offsetMap.map { case (patitionId, offset) =>
      println("載入分割槽偏移量:"+patitionId +":"+offset  )
      (new TopicPartition(topicName, patitionId.toInt), offset.toLong)
    }.toMap
    kafkaOffsetMap

  }

    def saveOffset(topicName:String ,groupId:String ,offsetRanges: Array[OffsetRange]): Unit ={
      //redis偏移量的寫入
      // Redis 中偏移量的儲存格式   type?  hash   key ?  "offset:[topic]:[groupid]"    field ?  partition_id  value ?  offset     expire

      val offsetKey="offset:"+topicName+":"+groupId
      val offsetMap:util.Map[String,String]=new util.HashMap()
      //轉換結構 offsetRanges -> offsetMap
      for (offset <- offsetRanges) {
        val partition: Int = offset.partition
        val untilOffset: Long = offset.untilOffset
         offsetMap.put(partition+"",untilOffset+"")
         //println("寫入分割槽:"+partition +":"+offset.fromOffset+"-->"+offset.untilOffset)
      }
      //寫入redis
      if(offsetMap!=null&&offsetMap.size()>0){
        val jedis: Jedis = RedisUtil.getJedisClient
        jedis.hmset(offsetKey,offsetMap)
        jedis.close()
      }


  }



  //把偏移量寫入redis

}

根據自定義偏移量載入的讀取kafka資料

def main(args: Array[String]): Unit = {
  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dau_app")
  val ssc = new StreamingContext(sparkConf, Seconds(5))
  val groupId = "GMALL_DAU_CONSUMER"
  val topic = "GMALL_START"

  //從redis讀取偏移量
  val startupOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId,topic)

  //根據偏移起始點獲得資料
  val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc,startupOffsets,groupId)


  //獲得偏移結束點
  var startupOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  val startupInputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = startupInputDstream.transform { rdd =>
    startupOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
  }


  val startLogInfoDStream: DStream[JSONObject] = startupInputGetOffsetDstream.map { record =>
    val startupJson: String = record.value()
    val startupJSONObj: JSONObject = JSON.parseObject(startupJson)
    val ts: lang.Long = startupJSONObj.getLong("ts")
    startupJSONObj
   
  }
  startLogInfoDStream.print(100)
…..
…..
…..
…..

dauDstream.foreachRDD{rdd=>
  rdd.foreachPartition{dauInfoItr=>

///可以觀察偏移量
if(startupOffsetRanges!=null&&startupOffsetRanges.size>0){
  val offsetRange: OffsetRange = startupOffsetRanges(TaskContext.get().partitionId())
  println("from:"+offsetRange.fromOffset +" --- to:"+offsetRange.untilOffset)
}


    val dauInfoWithIdList: List[(String, DauInfo)] = dauInfoItr.toList.map(dauInfo=>(dauInfo.dt+  "_"+dauInfo.mid,dauInfo))
    val dateStr: String = new SimpleDateFormat("yyyyMMdd").format(new Date())
    MyEsUtil.bulkInsert(dauInfoWithIdList,"gmall_dau_info_"+dateStr)


  }
//在儲存時最後提交偏移量

  OffsetManager.saveOffset(groupId ,topic, startupOffsetRanges)

//如果流發生了轉換,無法用以下方法提交偏移量
//dauDstream.asInstanceOf[CanCommitOffsets].commitAsync(startupOffsetRanges)


}

…..
…..

5 最終處理日活程式碼

package com.atguigu.gmall0105.realtime.app

import java.lang
import java.text.SimpleDateFormat
import java.util.Date

import com.alibaba.fastjson.{JSON, JSONObject}
import com.atguigu.gmall0105.realtime.bean.DauInfo
import com.atguigu.gmall0105.realtime.util.{MyEsUtil, MyKafkaUtil, OffsetManager, RedisUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

import scala.collection.mutable.ListBuffer

object DauApp {


  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("dau_app").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    val topic="GMALL_STARTUP_0105"
    val groupId="DAU_GROUP"
    val kafkaOffsetMap: Map[TopicPartition, Long] = OffsetManager.getOffset(topic,groupId)
    var recordInputStream: InputDStream[ConsumerRecord[String, String]]=null
    if(kafkaOffsetMap!=null&&kafkaOffsetMap.size>0){
         recordInputStream = MyKafkaUtil.getKafkaStream(topic,ssc,kafkaOffsetMap,groupId)
    }else{
         recordInputStream = MyKafkaUtil.getKafkaStream(topic,ssc)
    }

    //得到本批次的偏移量的結束位置,用於更新redis中的偏移量
    var  offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val  inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = recordInputStream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  //driver? executor?  //週期性的執行
      rdd
    }



    //   recordInputStream.map(_.value()).print()

    val jsonObjDstream: DStream[JSONObject] = inputGetOffsetDstream.map { record =>
      val jsonString: String = record.value()
      val jsonObj: JSONObject = JSON.parseObject(jsonString)
      val ts: lang.Long = jsonObj.getLong("ts")
      val datehourString: String = new SimpleDateFormat("yyyy-MM-dd HH").format(new Date(ts))
      val dateHour: Array[String] = datehourString.split(" ")

      jsonObj.put("dt", dateHour(0))
      jsonObj.put("hr", dateHour(1))

      jsonObj
    }

    //去重思路: 利用redis儲存今天訪問過系統的使用者清單
    //清單在redis中儲存
    //redis :  type set   string hash list set zset       key ? dau:2020-06-17        value?  mid   (field? score?)  (expire?) 24小時

   // println("過濾前:::"+jsonObjDstream.count())
    val filteredDstream: DStream[JSONObject] = jsonObjDstream.mapPartitions { jsonObjItr =>
      val jedis: Jedis = RedisUtil.getJedisClient //一個分割槽只申請一次連線
      val filteredList=new ListBuffer[JSONObject]()
      //  Iterator 只能迭代一次 包括取size   所以要取size 要把迭代器轉為別的容器
      val jsonList: List[JSONObject] = jsonObjItr.toList
    //   println("過濾前:"+jsonList.size)
      for (jsonObj <- jsonList) {
          val dt: String = jsonObj.getString("dt")
          val mid: String = jsonObj.getJSONObject("common").getString("mid")
          val dauKey = "dau:" + dt
          val isNew: lang.Long = jedis.sadd(dauKey, mid) //如果未存在則儲存 返回1  如果已經存在則不儲存 返回0
          jedis.expire(dauKey,3600*24)
        if (isNew == 1L) {
            filteredList+=jsonObj
          }
      }
      jedis.close()
     // println("過濾後:"+filteredList.size)
      filteredList.toIterator
    }

    filteredDstream.foreachRDD{rdd=>
      rdd.foreachPartition{jsonItr=>
          val list: List[JSONObject] = jsonItr.toList
        //把源資料 轉換成為要儲存的資料格式
        val dauList: List[(String,DauInfo)] = list.map { jsonObj =>
          val commonJSONObj: JSONObject = jsonObj.getJSONObject("common")
          val dauInfo = DauInfo(commonJSONObj.getString("mid"),
            commonJSONObj.getString("uid"),
            commonJSONObj.getString("ar"),
            commonJSONObj.getString("ch"),
            commonJSONObj.getString("vc"),
            jsonObj.getString("dt"),
            jsonObj.getString("hr"),
            "00",
            jsonObj.getLong("ts")
          )

          (dauInfo.mid,dauInfo)

        }
          val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
          MyEsUtil.bulkDoc(dauList,"gmall0105_dau_info_"+dt)

      }
      ///
      // 偏移量提交區
      OffsetManager.saveOffset(topic,groupId,offsetRanges)
      ///

    }

    ssc.start()
    ssc.awaitTermination()

  }
}