1. 程式人生 > >獲取kafka最新offset-scala

獲取kafka最新offset-scala

com ora each main 通過 最新 intval sim exist

無論是在spark streaming消費kafka,或是監控kafka的數據時,我們經常會需要知道offset最新情況

kafka數據的topic基於分區,並且通過每個partition的主分區可以獲取offset的最新情況

GetOffsetShellWrap

//這是對kafka自帶工具包的擴展
object GetOffsetShellWrap {
//在主函數添加一個參數map def main(args: Array[String],map: ArrayBuffer[String]): Unit = {
//對參數的解析 val parser
= new OptionParser val brokerListOpt
= parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg .describedAs("hostname:port,...,hostname:port") .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") .withRequiredArg .describedAs(
"topic") .ofType(classOf[String]) val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") .withRequiredArg .describedAs("partition ids") .ofType(classOf[String]) .defaultsTo(
"") val timeOpt = parser.accepts("time", "timestamp of the offsets before that") .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)
  //獲取參數的值 val clientId
= "GetOffsetShell" val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser, brokerList) val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) var partitionList = options.valueOf(partitionOpt) var time = options.valueOf(timeOpt).longValue val nOffsets = options.valueOf(nOffsetsOpt).intValue val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) } val partitions = if(partitionList == "") { topicsMetadata.head.partitionsMetadata.map(_.partitionId) } else { partitionList.split(",").map(_.toInt).toSeq }
//遍歷每個主分區 partitions.foreach { partitionId
=> val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId) partitionMetadataOpt match { case Some(metadata) => metadata.leader match { case Some(leader) => val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) val topicAndPartition = TopicAndPartition(topic, partitionId) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets //把獲取到的offset進行存儲 map += "%s:%d:%s".format(topic, partitionId, offsets.mkString(",")) case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId)) } case None => System.err.println("Error: partition %d does not exist".format(partitionId)) } } } }

GetOffsetShellWrapScalaTest
object GetOffsetShellWrapScalaTest {
  def main(args: Array[String]) {
    var arr = ArrayBuffer[String]();
    arr+="--broker-list=hadoop-01:9092"
    arr+="-topic=2017-11-6-test"
    arr+="--time=-1"
    val resule = getOffset(arr.toArray)
    for(i<-resule){
      println("我自己獲取到的偏移量=> "+i)
    }
  }
  def getOffset(args: Array[String]) : Array[String]={
    val map = new ArrayBuffer[String]()
    GetOffsetShellWrap.main(args.toArray,map)
    map.toArray
  }
}

結果輸出:

2017-11-6-test:2:16099
2017-11-6-test:1:15930
2017-11-6-test:0:16096

技術分享圖片

獲取kafka最新offset-scala