kafka+sparkstreaming 的offset管理
阿新 • • 發佈:2019-01-24
需求:
在使用sparkstreaming消費kafka的topic時,對offset進行管理
網上資料比較少,而且參差不齊
管理的方法也有很多,區別主要在於offset儲存在哪裡,不同的儲存位置意味著不同的儲存以及讀取方法
本篇部落格主要記錄一下
如何通過kafka.consumer.SimpleConsumer這個類對offset進行儲存和讀取
這個類是將offset儲存於kafka內部的一個特殊的topic: __consumer_offsets 中
程式碼如下
package main.scala import kafka.api.{OffsetCommitRequest, OffsetFetchRequest, TopicMetadataRequest} import kafka.common.{OffsetAndMetadata, TopicAndPartition} import kafka.consumer.SimpleConsumer import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils} //需求:消費者自定義控制offset //在這裡offset儲存到kafka內部的特殊topic:__consumer_offsets中,使用kafka.consumer.SimpleConsumer類來進行一系列操作 object kafka_offset_learning { val groupid="user3" def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo") val scc = new StreamingContext(sparkConf, Duration(5000)) //new一個spark-streaming的上下文 val topics = Set("kafka_test4") //我們需要消費的kafka資料的topic val kafkaParam = Map( "metadata.broker.list" -> "localhost:9092",// kafka的broker list地址 "groupid"->groupid ) val topic="kafka_test4" //檢視當前topic:__consumer_offsets中已儲存的最新的offset val simpleConsumer = new SimpleConsumer("localhost", 9092, 1000000, 64 * 1024, "test")//new一個consumer並連線上kafka val topiclist=Seq("kafka_test4") val topicReq = new TopicMetadataRequest(topiclist,0)//定義一個topic請求,為了獲取相關topic的資訊(不包括offset,有partition) val res = simpleConsumer.send(topicReq)//傳送請求,得到kafka相應 val topicMetaOption = res.topicsMetadata.headOption //定義一個Topicandpartition的格式,便於後面請求offset val topicAndPartition: Seq[TopicAndPartition] = topicMetaOption match { case Some(tm) => tm.partitionsMetadata.map(pm => TopicAndPartition("kafka_test4", pm.partitionId)) case None => Seq[TopicAndPartition]() } val fetchRequest = OffsetFetchRequest("user3",topicAndPartition)//定義一個請求,傳遞的引數為groupid,topic,partitionid,這三個也正好能確定對應的offset的位置 val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest).requestInfo//向kafka傳送請求並獲取返回的offset資訊 // println(fetchRequest) // println(fetchResponse) val offsetl=fetchResponse.map{l=> val part_name=l._1.partition val offset_name=l._2.offset (topic,part_name,offset_name) } println(offsetl.toList) //使用KafkaUtils.createDirectStream,使得kafka流從指定的offset開始 val offsetList = offsetl.toList // val offsetList = List((topic, 0, 1L),(topic, 1, 1L),(topic, 2, 1L),(topic, 3, 1L))//在此只是用1做實驗,沒有變成動態的,實際情況應該是這裡的offset都是前面查出來已經儲存好的offset val fromOffsets = setFromOffsets(offsetList)//對List進行處理,變成需要的格式,即Map[TopicAndPartition, Long] val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //構建MessageAndMetadata,這個所有使用情況都是一樣的,就這麼寫 //定義流.這種方法是不會在zookeeper的/consumers中建立一個新的groupid例項的 val stream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](scc, kafkaParam, fromOffsets, messageHandler) stream.print()//為了放出時間戳 //將已更新的offset存入topic:__consumer_offsets中,以便下次使用 //另外,這裡涉及到與外部系統即kafka的連線,所以要使用一下結構 stream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => //配置說明 val simpleConsumer2 = new SimpleConsumer("localhost", 9092, 1000000, 64 * 1024, "test-client") partitionOfRecords.foreach { record => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//這個語句可以返回當前rdd所更新到的offset值(OffsetRange(topic: 'kafka_test4', partition: 0, range: [1 -> 4])) for (o <- offsetRanges) { //在這裡o.untilOffset返回的是offset末態 //而o.fromOffset返回的是offset初態 //所以看需求進行儲存 val topicAndPartition = TopicAndPartition(topic, o.partition)//定義一個格式 val commitRequest = OffsetCommitRequest(groupid, Map(topicAndPartition -> OffsetAndMetadata(o.fromOffset)))//定義一個請求,注意,在這裡儲存的是fromOffset val commitResponse = simpleConsumer2.commitOffsets(commitRequest)//提交請求,完成offset儲存即更新 } } } } scc.start() // 真正啟動程式 scc.awaitTermination() } def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = { var fromOffsets: Map[TopicAndPartition, Long] = Map() for (offset <- list) { val tp = TopicAndPartition(offset._1, offset._2)//topic和分割槽數 fromOffsets += (tp -> offset._3) // offset位置 } fromOffsets } }
在這裡補充一點:
KafkaUtils.createDirectStream這個方法建立的流是不會在zookeeper中建立一個/consumer/groupid節點的
因此即使是groupid也要自己管理