spark streaming整合kafka中聚合類運算如何和kafka保持exactly once一致性語義(mysql方式,利用事務)
阿新 • • 發佈:2022-04-05
/** * 從Kafka讀取資料,實現ExactlyOnce,偏移量儲存到MySQL中 * 1.將聚合好的資料,收集到Driver端, * 2.然後建計算好的資料和偏移量在一個事物中同時儲存到MySQL中 * 3.成功了提交事物 * 4.失敗了讓這個任務重啟 * * MySQL資料庫中有兩張表:儲存計算好的結果、儲存偏移量 */ object ExactlyOnceWordCountOffsetStoreInMySQL { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb val Array(isLocal, appName, groupId, allTopics) = args val conf = new SparkConf() .setAppName(appName) if (isLocal.toBoolean) { conf.setMaster("local[*]") } //建立StreamingContext,並指定批次生成的時間 val ssc = new StreamingContext(conf, Milliseconds(5000)) //設定日誌級別 ssc.sparkContext.setLogLevel("WARN") //SparkStreaming 跟kafka進行整合 //1.匯入跟Kafka整合的依賴 //2.跟kafka整合,建立直連的DStream【使用底層的消費API,效率更高】 val topics = allTopics.split(",") //SparkSteaming跟kafka整合的引數 //kafka的消費者預設的引數就是每5秒鐘自動提交偏移量到Kafka特殊的topic中: __consumer_offsets val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> groupId, "auto.offset.reset" -> "earliest" //如果沒有記錄偏移量,第一次從最開始讀,有偏移量,接著偏移量讀 , "enable.auto.commit" -> (false: java.lang.Boolean) //消費者不自動提交偏移量 ) //在建立KafkaDStream之前要先讀取MySQL資料庫,查詢歷史偏移量,沒有就從頭讀,有就接著讀 //offsets: collection.Map[TopicPartition, Long] val offsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appName, groupId) //跟Kafka進行整合,需要引入跟Kafka整合的依賴 //createDirectStream更加高效,使用的是Kafka底層的消費API,消費者直接連線到Kafka的Leader分割槽進行消費 //直連方式,RDD的分割槽數量和Kafka的分割槽數量是一一對應的【數目一樣】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //排程task到Kafka所在的節點 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) //指定訂閱Topic的規則 ) kafkaDStream.foreachRDD(rdd => { //判斷當前批次的RDD是否有資料 if (!rdd.isEmpty()) { //獲取RDD所有分割槽的偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //實現WordCount業務邏輯 val words: RDD[String] = rdd.flatMap(_.value().split(" ")) val wordsAndOne: RDD[(String, Int)] = words.map((_, 1)) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _) //將計算好的結果收集到Driver端再寫入到MySQL中【保證資料和偏移量寫入在一個事物中】 //觸發Action,將資料收集到Driver段 val res: Array[(String, Int)] = reduced.collect() //建立一個MySQL的連線【在Driver端建立】 //預設MySQL自動提交事物 var connection: Connection = null var ps1: PreparedStatement = null var ps2: PreparedStatement = null try { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456") //不要自動提交事物 connection.setAutoCommit(false) ps1 = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?") //將計算好的WordCount結果寫入資料庫表中,但是沒有提交事物 for (tp <- res) { ps1.setString(1, tp._1) ps1.setLong(2, tp._2) ps1.setLong(3, tp._2) ps1.executeUpdate() //沒有提交事物,不會講資料真正寫入到MySQL } //(app1_g001, wc_0) -> 1000 ps2 = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?") //將偏移量寫入到MySQL的另外一個表中,也沒有提交事物 for (offsetRange <- offsetRanges) { //topic名稱 val topic = offsetRange.topic //topic分割槽編號 val partition = offsetRange.partition //獲取結束偏移量 val untilOffset = offsetRange.untilOffset //將結果寫入MySQL ps2.setString(1, appName + "_" + groupId) ps2.setString(2, topic + "_" + partition) ps2.setLong(3, untilOffset) ps2.setLong(4, untilOffset) ps2.executeUpdate() } //提交事物 connection.commit() } catch { case e: Exception => { //回滾事物 connection.rollback() //讓任務停掉 ssc.stop() } } finally { if(ps2 != null) { ps2.close() } if(ps1 != null) { ps1.close() } if(connection != null) { connection.close() } } } }) ssc.start() ssc.awaitTermination() } }