1. 程式人生 > >kafka+sparkstreaming 的offset管理

kafka+sparkstreaming 的offset管理

需求:

在使用sparkstreaming消費kafka的topic時,對offset進行管理

網上資料比較少,而且參差不齊

管理的方法也有很多,區別主要在於offset儲存在哪裡,不同的儲存位置意味著不同的儲存以及讀取方法

本篇部落格主要記錄一下

如何通過kafka.consumer.SimpleConsumer這個類對offset進行儲存和讀取

這個類是將offset儲存於kafka內部的一個特殊的topic: __consumer_offsets 中

程式碼如下

package main.scala

import kafka.api.{OffsetCommitRequest, OffsetFetchRequest, TopicMetadataRequest}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}


//需求:消費者自定義控制offset
//在這裡offset儲存到kafka內部的特殊topic:__consumer_offsets中,使用kafka.consumer.SimpleConsumer類來進行一系列操作
object kafka_offset_learning {
  val groupid="user3"
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo")
    val scc = new StreamingContext(sparkConf, Duration(5000)) //new一個spark-streaming的上下文
    val topics = Set("kafka_test4") //我們需要消費的kafka資料的topic
    val kafkaParam = Map(
      "metadata.broker.list" -> "localhost:9092",// kafka的broker list地址
      "groupid"->groupid
    )
    val topic="kafka_test4"
    //檢視當前topic:__consumer_offsets中已儲存的最新的offset
    val simpleConsumer = new SimpleConsumer("localhost", 9092, 1000000, 64 * 1024, "test")//new一個consumer並連線上kafka
    val topiclist=Seq("kafka_test4")
    val topicReq = new TopicMetadataRequest(topiclist,0)//定義一個topic請求,為了獲取相關topic的資訊(不包括offset,有partition)
    val res = simpleConsumer.send(topicReq)//傳送請求,得到kafka相應
    val topicMetaOption = res.topicsMetadata.headOption
    //定義一個Topicandpartition的格式,便於後面請求offset
    val topicAndPartition: Seq[TopicAndPartition] = topicMetaOption match {
      case Some(tm) => tm.partitionsMetadata.map(pm => TopicAndPartition("kafka_test4", pm.partitionId))
      case None => Seq[TopicAndPartition]()
    }
    val fetchRequest = OffsetFetchRequest("user3",topicAndPartition)//定義一個請求,傳遞的引數為groupid,topic,partitionid,這三個也正好能確定對應的offset的位置
    val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest).requestInfo//向kafka傳送請求並獲取返回的offset資訊
//    println(fetchRequest)
//    println(fetchResponse)
    val offsetl=fetchResponse.map{l=>
    val part_name=l._1.partition
    val offset_name=l._2.offset
  (topic,part_name,offset_name)
}
    println(offsetl.toList)
    //使用KafkaUtils.createDirectStream,使得kafka流從指定的offset開始
    val offsetList = offsetl.toList
//    val offsetList = List((topic, 0, 1L),(topic, 1, 1L),(topic, 2, 1L),(topic, 3, 1L))//在此只是用1做實驗,沒有變成動態的,實際情況應該是這裡的offset都是前面查出來已經儲存好的offset
    val fromOffsets = setFromOffsets(offsetList)//對List進行處理,變成需要的格式,即Map[TopicAndPartition, Long]
    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //構建MessageAndMetadata,這個所有使用情況都是一樣的,就這麼寫

    //定義流.這種方法是不會在zookeeper的/consumers中建立一個新的groupid例項的
    val stream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](scc, kafkaParam, fromOffsets, messageHandler)

    stream.print()//為了放出時間戳
    //將已更新的offset存入topic:__consumer_offsets中,以便下次使用
    //另外,這裡涉及到與外部系統即kafka的連線,所以要使用一下結構
    stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        //配置說明
        val simpleConsumer2 = new SimpleConsumer("localhost", 9092, 1000000, 64 * 1024, "test-client")
        partitionOfRecords.foreach { record =>
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//這個語句可以返回當前rdd所更新到的offset值(OffsetRange(topic: 'kafka_test4', partition: 0, range: [1 -> 4]))
          for (o <- offsetRanges) {
            //在這裡o.untilOffset返回的是offset末態
            //而o.fromOffset返回的是offset初態
            //所以看需求進行儲存
            val topicAndPartition = TopicAndPartition(topic, o.partition)//定義一個格式
            val commitRequest = OffsetCommitRequest(groupid, Map(topicAndPartition -> OffsetAndMetadata(o.fromOffset)))//定義一個請求,注意,在這裡儲存的是fromOffset
            val commitResponse = simpleConsumer2.commitOffsets(commitRequest)//提交請求,完成offset儲存即更新
          }
        }

      }
    }
    scc.start() // 真正啟動程式
    scc.awaitTermination()
  }
  def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
    var fromOffsets: Map[TopicAndPartition, Long] = Map()
    for (offset <- list) {
      val tp = TopicAndPartition(offset._1, offset._2)//topic和分割槽數
      fromOffsets += (tp -> offset._3)           // offset位置
    }
    fromOffsets
  }
}

在這裡補充一點:

KafkaUtils.createDirectStream這個方法建立的流是不會在zookeeper中建立一個/consumer/groupid節點的

因此即使是groupid也要自己管理