1. 程式人生 > 其它 >spark streaming整合kafka中聚合類運算如何和kafka保持exactly once一致性語義(mysql方式,利用事務)

spark streaming整合kafka中聚合類運算如何和kafka保持exactly once一致性語義(mysql方式,利用事務)

/**
  * 從Kafka讀取資料,實現ExactlyOnce,偏移量儲存到MySQL中
  * 1.將聚合好的資料,收集到Driver端,
  * 2.然後建計算好的資料和偏移量在一個事物中同時儲存到MySQL中
  * 3.成功了提交事物
  * 4.失敗了讓這個任務重啟
  *
  * MySQL資料庫中有兩張表:儲存計算好的結果、儲存偏移量
  */
object ExactlyOnceWordCountOffsetStoreInMySQL {

  def main(args: Array[String]): Unit = {

    //true a1 g1 ta,tb
    val Array(isLocal, appName, groupId, allTopics) = args


    val conf = new SparkConf()
      .setAppName(appName)

    if (isLocal.toBoolean) {
      conf.setMaster("local[*]")
    }


    //建立StreamingContext,並指定批次生成的時間
    val ssc = new StreamingContext(conf, Milliseconds(5000))
    //設定日誌級別
    ssc.sparkContext.setLogLevel("WARN")

    //SparkStreaming 跟kafka進行整合
    //1.匯入跟Kafka整合的依賴
    //2.跟kafka整合,建立直連的DStream【使用底層的消費API,效率更高】

    val topics = allTopics.split(",")

    //SparkSteaming跟kafka整合的引數
    //kafka的消費者預設的引數就是每5秒鐘自動提交偏移量到Kafka特殊的topic中: __consumer_offsets
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest" //如果沒有記錄偏移量,第一次從最開始讀,有偏移量,接著偏移量讀
      , "enable.auto.commit" -> (false: java.lang.Boolean) //消費者不自動提交偏移量
    )

    //在建立KafkaDStream之前要先讀取MySQL資料庫,查詢歷史偏移量,沒有就從頭讀,有就接著讀
    //offsets: collection.Map[TopicPartition, Long]
    val offsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appName, groupId)

    //跟Kafka進行整合,需要引入跟Kafka整合的依賴
    //createDirectStream更加高效,使用的是Kafka底層的消費API,消費者直接連線到Kafka的Leader分割槽進行消費
    //直連方式,RDD的分割槽數量和Kafka的分割槽數量是一一對應的【數目一樣】
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, //排程task到Kafka所在的節點
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) //指定訂閱Topic的規則
    )

    kafkaDStream.foreachRDD(rdd => {

      //判斷當前批次的RDD是否有資料
      if (!rdd.isEmpty()) {

        //獲取RDD所有分割槽的偏移量
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        //實現WordCount業務邏輯
        val words: RDD[String] = rdd.flatMap(_.value().split(" "))
        val wordsAndOne: RDD[(String, Int)] = words.map((_, 1))
        val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _)
        //將計算好的結果收集到Driver端再寫入到MySQL中【保證資料和偏移量寫入在一個事物中】
        //觸發Action,將資料收集到Driver段
        val res: Array[(String, Int)] = reduced.collect()

        //建立一個MySQL的連線【在Driver端建立】
        //預設MySQL自動提交事物

        var connection: Connection = null
        var ps1: PreparedStatement = null
        var ps2: PreparedStatement = null
        try {
          connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456")
          //不要自動提交事物
          connection.setAutoCommit(false)

          ps1 = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?")
          //將計算好的WordCount結果寫入資料庫表中,但是沒有提交事物
          for (tp <- res) {
            ps1.setString(1, tp._1)
            ps1.setLong(2, tp._2)
            ps1.setLong(3, tp._2)
            ps1.executeUpdate() //沒有提交事物,不會講資料真正寫入到MySQL
          }

          //(app1_g001, wc_0) ->  1000
          ps2 = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?")
          //將偏移量寫入到MySQL的另外一個表中,也沒有提交事物
          for (offsetRange <- offsetRanges) {
            //topic名稱
            val topic = offsetRange.topic
            //topic分割槽編號
            val partition = offsetRange.partition
            //獲取結束偏移量
            val untilOffset = offsetRange.untilOffset
            //將結果寫入MySQL
            ps2.setString(1, appName + "_" + groupId)
            ps2.setString(2, topic + "_" + partition)
            ps2.setLong(3, untilOffset)
            ps2.setLong(4, untilOffset)
            ps2.executeUpdate()
          }

          //提交事物
          connection.commit()

        } catch {
          case e: Exception => {
            //回滾事物
            connection.rollback()
            //讓任務停掉
            ssc.stop()
          }
        } finally {
          if(ps2 != null) {
            ps2.close()
          }
          if(ps1 != null) {
            ps1.close()
          }
          if(connection != null) {
            connection.close()
          }
        }
      }
    })


    ssc.start()

    ssc.awaitTermination()


  }
}