spark streaming整合kafka中聚合類運算如何和kafka保持exactly once一致性語義(redis方式,利用pipeline)
阿新 • • 發佈:2022-04-05
/** * 從Kafka讀取資料,實現ExactlyOnce,偏移量儲存到Redis中 * 1.將聚合好的資料,收集到Driver端, * 2.然後將計算好的資料和偏移量在一個pipeline中同時儲存到Redis中 * 3.成功了提交事物 * 4.失敗了廢棄原來的資料並讓這個任務重啟 */ object ExactlyOnceWordCountOffsetStoreInRedis { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb val Array(isLocal, appName, groupId, allTopics) = args val conf = new SparkConf() .setAppName(appName) if (isLocal.toBoolean) { conf.setMaster("local[*]") } //建立StreamingContext,並指定批次生成的時間 val ssc = new StreamingContext(conf, Milliseconds(5000)) //設定日誌級別 ssc.sparkContext.setLogLevel("WARN") //SparkStreaming 跟kafka進行整合 //1.匯入跟Kafka整合的依賴 //2.跟kafka整合,建立直連的DStream【使用底層的消費API,效率更高】 val topics = allTopics.split(",") //SparkSteaming跟kafka整合的引數 //kafka的消費者預設的引數就是每5秒鐘自動提交偏移量到Kafka特殊的topic中: __consumer_offsets val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> groupId, "auto.offset.reset" -> "earliest" //如果沒有記錄偏移量,第一次從最開始讀,有偏移量,接著偏移量讀 , "enable.auto.commit" -> (false: java.lang.Boolean) //消費者不自動提交偏移量 ) //在建立KafkaDStream之前要先讀取Redis資料庫,查詢歷史偏移量,沒有就從頭讀,有就接著讀 //offsets: collection.Map[TopicPartition, Long] val offsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromRedis(appName, groupId) //跟Kafka進行整合,需要引入跟Kafka整合的依賴 //createDirectStream更加高效,使用的是Kafka底層的消費API,消費者直接連線到Kafka的Leader分割槽進行消費 //直連方式,RDD的分割槽數量和Kafka的分割槽數量是一一對應的【數目一樣】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //排程task到Kafka所在的節點 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) //指定訂閱Topic的規則 ) kafkaDStream.foreachRDD(rdd => { //判斷當前批次的RDD是否有資料 if (!rdd.isEmpty()) { //獲取RDD所有分割槽的偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //實現WordCount業務邏輯 val words: RDD[String] = rdd.flatMap(_.value().split(" ")) val wordsAndOne: RDD[(String, Int)] = words.map((_, 1)) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _) //將計算好的結果收集到Driver端再寫入到Redis中【保證資料和偏移量寫入在一個事物中】 //觸發Action,將資料收集到Driver段 val res: Array[(String, Int)] = reduced.collect() var jedis: Jedis = null var pipeline: Pipeline = null //建立一個Redis的連線【在Driver端建立】 try { jedis = JedisConnectionPool.getConnection() //使用pipeline pipeline = jedis.pipelined() pipeline.select(1) //開啟多個操作在一起執行 pipeline.multi() //寫入計算好的結果 for (tp <- res) { pipeline.hincrBy("WORD_COUNT", tp._1, tp._2) } //寫入偏移量 for (offsetRange <- offsetRanges) { val topic = offsetRange.topic val partition = offsetRange.partition val untilOffset = offsetRange.untilOffset //將原來的偏移量覆蓋 pipeline.hset(appName +"_" + groupId, topic + "_" + partition, untilOffset.toString) } //類似提交事物 pipeline.exec() pipeline.sync() } catch { case e: Exception => { pipeline.discard() e.printStackTrace() ssc.stop() } } finally { pipeline.close() jedis.close() } } }) ssc.start() ssc.awaitTermination() } }