sparkStreaming 與fafka直接方式 進行消費者偏移量的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題
阿新 • • 發佈:2019-03-08
create term tex ria streaming 保存 else config cal
import java.util import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object KafkaDricteRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("redis").setMaster("local[*]") val ssc = new StreamingContext(conf,newDuration(5000)) val groupid = "GB01" //組名 val topic = "topic_bc"//topic 名 //在redis中以 groupid/topic作為唯一標識 ,存儲分區偏移量 //在Reids 使用的時hash類型來存儲 val gtKey = groupid+"/"+topic //topic val topics = Set(topic) //zk地址 val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //brokerListval brokerList = "hadoop04:9092,hadoop05:9092,hadoop06:9092" val kafkaParams = Map( // metadata.broker.list "metadata.broker.list"->brokerList, "group.id"->groupid, "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString //從頭開始消費 ) //記錄topic 、分區對應的偏移量偏移量,在創建InputDStream時作為參數傳如 //從這個偏移量開始讀取 var fromOffset = Map[TopicAndPartition,Long]() var kafkaDStream :InputDStream[(String,String)] = null // 獲取一個jedis連接 val conn = getConnection() // conn.flushDB() //jd.hget(groupid+topic,"") //獲取全部的keys val values: util.Set[String] = conn.keys("*") //println(values) // [GB01/wordcount3] 分區數 偏移量 //如果keys中包含 GB01/wordcount3這樣的key,則表示以前讀取過 if(values.contains(gtKey)){ //獲取key 為GB01/wordcount3 下面所對應的(k,v) /** conn.hgetAll(gtKey) GB01/wordcount3: * 1 888 * 2 888 * 3 888 * 4 888 */ var allKey: util.Map[String, String] = conn.hgetAll(gtKey) //導入後,可以把Java中的集合轉換為Scala中的集合 import scala.collection.JavaConversions._ var list: List[(String, String)] = allKey.toList //循環得到的(k,v) //這裏面的 k 對應的是分區, v對應的是偏移量 for (key <- list){ //這裏的key是一個tuple類型 //new一個TopicAndPartition 把 topic 和分區數傳入 val tp = new TopicAndPartition(topic,key._1.toInt) //把每個topic 分區 對應的偏移量傳入 fromOffset += tp -> key._2.toLong println("分區"+key._1+"偏移量為"+key._2) } //這裏的是把數據(key ,value)是kafka 的key默認是null, //value 是kafka中的value val messageHandler =(mmd:MessageAndMetadata[String,String])=>{ ( mmd.key(),mmd.message()) } //創建一個InputDStream kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams,fromOffset,messageHandler) }else{ //如果以前沒有讀取過,創建一個新的InputDStream kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topics ) } //用來更新偏移量,OffsetRange中可以獲取分區及偏移量 var OffsetRangs = Array[OffsetRange]() // kafkaDStream.foreachRDD(kafkaRDD=> { //這裏面的RDD是kafkaRDD ,可以轉換為HasOffsetRange val ranges = kafkaRDD.asInstanceOf[HasOffsetRanges] // 獲取分區信息的集合 OffsetRangs = ranges.offsetRanges //獲取value,(key 默認是null,沒有用) val map: RDD[String] = kafkaRDD.map(_._2) map.foreach(x=>print("")) //更新偏移量 for (o <- OffsetRangs){ //取出偏移量 val offset = o.untilOffset //取出分區 val partition = o.partition println("partition: "+partition) println("offset: "+offset) //把通過hset,把對應的partition和offset寫入到redis中 conn.hset(gtKey,partition.toString,offset.toString) } }) ssc.start() ssc.awaitTermination() } //Jedis連接池 def getConnection(): Jedis ={ //new 一個JedisPoolConfig,用來設定參數 val conf = new JedisPoolConfig() val pool = new JedisPool(conf,"192.168.121.12",6379) //最大連接數 conf.setMaxTotal(20) //最大空閑數 conf.setMaxIdle(20) val jedis = pool.getResource() //密碼 jedis.auth("test123") jedis }
sparkStreaming 與fafka直接方式 進行消費者偏移量的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題