Kafka:無丟失提取kafka的值,詳解kafka的消費過程
阿新 • • 發佈:2018-12-04
目錄:
1、需求
2、程式碼步鄹
3、程式碼展現
4、pom.xml檔案
5、結果展現
——————————————————————————————————–
1、需求
前提:將org.apache.spark.streaming.kafka.KafkaCluster這個類抽出來變成KafkaClusterHelper
* 需求:*
1、將kafka中的資料無丟失提取,且存到本地
2、詳解Kafka讀取資料步鄹
3、詳解Zookeeper儲存TopicAndPartition和對應的Offset
2、程式碼步鄹
步鄹:
1、將org.apache.spark.streaming.kafka
2、編寫ZookeeperHelper類便於將TopicAndPartition和對應的Offset儲存到Zookeeper中
3、將類變成物件kafkaHelper
4、通過kafkaHelper.getFromOffsets獲取開始的Offset,如果不是第一次則從Zookeeper中獲取TopicAndPartition和對應的Offset
5、通過kafkaHelper.getLatestLeaderOffsets獲取最後的Offset
6、通過org.apache.spark.streaming.kafka
7、通過org.apache.spark.streaming.kafka.KafkaUtils.createRDD方法建立RDD
8、將RDD儲存本地
9、最後ZookeeperHelper類將TopicAndPartition和對應的Offset儲存到Zookeeper中
3、程式碼展現
總計三個類:kafkaConsumer.scala KafkaClusterHelper.scala ZookeeperHelper.scala
kafkaConsumer.scala
package com.donews.localspark import com.donews.util.{KafkaClusterHelper,ZookeeperHelper} import kafka.common.TopicAndPartition import kafka.serializer.StringDecoder import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange} import org.apache.spark.{SparkConf, SparkContext} /** * Created by yuhui on 2016/11/17. */ object kafkaConsumer extends Serializable{ val topicsSet = Set("donews_website_nginx_log") val filePath = "E:\\web_nginx_log" def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("App_Name").setMaster("local[4]").set("sp�6�8�6�7ark.driver.port", "180�6�8�6�780"); val sc = new SparkContext(conf) val blockSize = 1024 * 1024 * 128 // 128MB sc.hadoopConfiguration.setInt("dfs.blocksize", blockSize) val kafkaParams = Map[String, String]( "metadata.broker.list" -> "tagtic-master:9092,tagtic-slave01:9092,tagtic-slave02:9092,tagtic-slave03:9092", "auto.offset.reset" -> "smallest" ) val kafkaHelper = new KafkaClusterHelper(kafkaParams) var num: Long = 0 try { //獲取Zookeeper中最新的offset,如果第一次則取kafkaParams中的smallest val offsets = ZookeeperHelper.loadOffsets(topicsSet, kafkaHelper.getFromOffsets(kafkaParams, topicsSet)) //獲取kafka中最新的offset val latestOffsets = KafkaClusterHelper.checkErrors(kafkaHelper.getLatestLeaderOffsets(offsets.keySet)) val offsetRanges = offsets.keys.map { tp => val fromOffset = offsets(tp) val latestOffset = latestOffsets(tp).offset println("topicName和partition====>"+tp+ " fromOffset====>"+fromOffset+" latestOffset====>"+latestOffset) //OffsetRange(tp, 8800000, Math.min(fromOffset + 1024 * 1024, latestOffset)) //限制成大約是500M OffsetRange(tp, 170000, 170006) //限制成大約是500M }.toArray val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](sc, kafkaParams, offsetRanges) println("rdd.count()====================》"+rdd.count()) //rdd存在本地 rdd.map(line=>{val lenth = line.toString().substring(38,line.toString().length-1)}).coalesce(1,true).saveAsTextFile(filePath) val nextOffsets = offsetRanges.map(x => (TopicAndPartition(x.topic, x.partition), x.untilOffset)).toMap //將offset儲存到zookeeper,zookeeper儲存路徑可以刪除,保證資料不丟失及資料重新讀入 ZookeeperHelper.storeOffsets(nextOffsets) } } }
ZookeeperHelper.scala
package com.donews.util import kafka.common.TopicAndPartition import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ /** * Created by yuhui on 16-6-8. */ object ZookeeperHelper { val LOG = LoggerFactory.getLogger(ZookeeperHelper.getClass) val client = { val client = CuratorFrameworkFactory .builder .connectString(WebConfig.ZOOKEEPER_CONNECT) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace("statistic") .build() client.start() client } //zookeeper建立路徑 def ensurePathExists(path: String): Unit = { if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path) } } //zookeeper載入offset的方法 def loadOffsets(topicSet: Set[String], defaultOffset: Map[TopicAndPartition, Long]): Map[TopicAndPartition, Long] = { val kafkaOffsetPath = s"/kafkaOffsets" ensurePathExists(kafkaOffsetPath) val offsets = for { //t就是路徑webstatistic/kafkaOffsets下面的子目錄遍歷 t <- client.getChildren.forPath(kafkaOffsetPath) if topicSet.contains(t) //p就是新路徑 /webstatistic/kafkaOffsets/donews_website p <- client.getChildren.forPath(s"$kafkaOffsetPath/$t") } yield { //遍歷路徑下面的partition中的offset val data = client.getData.forPath(s"$kafkaOffsetPath/$t/$p") //將data變成Long型別 val offset = java.lang.Long.valueOf(new String(data)).toLong (TopicAndPartition(t, Integer.parseInt(p)), offset) } defaultOffset ++ offsets.toMap } //zookeeper儲存offset的方法 def storeOffsets(offsets: Map[TopicAndPartition, Long]): Unit = { val kafkaOffsetPath = s"/kafkaOffsets" if (client.checkExists().forPath(kafkaOffsetPath) == null) { client.create().creatingParentsIfNeeded().forPath(kafkaOffsetPath) } for ((tp, offset) <- offsets) { val data = String.valueOf(offset).getBytes val path = s"$kafkaOffsetPath/${tp.topic}/${tp.partition}" ensurePathExists(path) client.setData().forPath(path, data) } } }
KafkaClusterHelper.scala
package com.donews.util /** * Created by yuhui on 16-6-29. * copy from spark-kafka source */ import java.util.Properties import kafka.api._ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.consumer.{ConsumerConfig, SimpleConsumer} import org.apache.spark.SparkException import scala.collection.mutable.ArrayBuffer import scala.util.Random import scala.util.control.NonFatal /** * Convenience methods for interacting with a Kafka cluster. * * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), * NOT zookeeper servers, specified in host1:port1,host2:port2 form */ class KafkaClusterHelper(val kafkaParams: Map[String, String]) extends Serializable { import KafkaClusterHelper.{Err, LeaderOffset, SimpleConsumerConfig} // ConsumerConfig isn't serializable @transient private var _config: SimpleConsumerConfig = null def config: SimpleConsumerConfig = this.synchronized { if (_config == null) { _config = SimpleConsumerConfig(kafkaParams) } _config } def connect(host: String, port: Int): SimpleConsumer = new SimpleConsumer(host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId) def findLeaders( topicAndPartitions: Set[TopicAndPartition] ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { val topics = topicAndPartitions.map(_.topic) val response = getPartitionMetadata(topics).right val answer = response.flatMap { tms: Set[TopicMetadata] => val leaderMap = tms.flatMap { tm: TopicMetadata => tm.partitionsMetadata.flatMap { pm: PartitionMetadata => val tp = TopicAndPartition(tm.topic, pm.partitionId) if (topicAndPartitions(tp)) { pm.leader.map { l => tp -> (l.host -> l.port) } } else { None } } }.toMap if (leaderMap.keys.size == topicAndPartitions.size) { Right(leaderMap) } else { val missing = topicAndPartitions.diff(leaderMap.keySet) val err = new Err err.append(new SparkException(s"Couldn't find leaders for ${missing}")) Left(err) } } answer } def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = { getPartitionMetadata(topics).right.map { r => r.flatMap { tm: TopicMetadata => tm.partitionsMetadata.map { pm: PartitionMetadata => TopicAndPartition(tm.topic, pm.partitionId) } } } } def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = { val req = TopicMetadataRequest( TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp: TopicMetadataResponse = consumer.send(req) val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) if (respErrs.isEmpty) { return Right(resp.topicsMetadata.toSet) } else { respErrs.foreach { m => val cause = ErrorMapping.exceptionFor(m.errorCode) val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?" errs.append(new SparkException(msg, cause)) } } } Left(errs) } //獲取kafka最新的offset def getLatestLeaderOffsets( topicAndPartitions: Set[TopicAndPartition] ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) def getEarliestLeaderOffsets( topicAndPartitions: Set[TopicAndPartition] ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) def getLeaderOffsets( topicAndPartitions: Set[TopicAndPartition], before: Long ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = { getLeaderOffsets(topicAndPartitions, before, 1).right.map { r => r.map { kv => // mapValues isnt serializable, see SI-7005 kv._1 -> kv._2.head } } } private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] = m.groupBy(_._2).map { kv => kv._1 -> kv._2.keys.toSeq } def getLeaderOffsets( topicAndPartitions: Set[TopicAndPartition], before: Long, maxNumOffsets: Int ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = { findLeaders(topicAndPartitions).right.flatMap { tpToLeader => val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader) val leaders = leaderToTp.keys var result = Map[TopicAndPartition, Seq[LeaderOffset]]() val errs = new Err withBrokers(leaders, errs) { consumer => val partitionsToGetOffsets: Seq[TopicAndPartition] = leaderToTp((consumer.host, consumer.port)) val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition => tp -> PartitionOffsetRequestInfo(before, maxNumOffsets) }.toMap val req = OffsetRequest(reqMap) val resp = consumer.getOffsetsBefore(req) val respMap = resp.partitionErrorAndOffsets partitionsToGetOffsets.foreach { tp: TopicAndPartition => respMap.get(tp).foreach { por: PartitionOffsetsResponse => if (por.error == ErrorMapping.NoError) { if (por.offsets.nonEmpty) { result += tp -> por.offsets.map { off => LeaderOffset(consumer.host, consumer.port, off) } } else { errs.append(new SparkException( s"Empty offsets for ${tp}, is ${before} before log beginning?")) } } else { errs.append(ErrorMapping.exceptionFor(por.error)) } } } if (result.keys.size == topicAndPartitions.size) { return Right(result) } } val missing = topicAndPartitions.diff(result.keySet) errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}")) Left(errs) } } // Consumer offset api // scalastyle:off // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI // scalastyle:on // this 0 here indicates api version, in this case the original ZK backed api. private def defaultConsumerApiVersion: Short = 0 // Try a call against potentially multiple brokers, accumulating errors private def withBrokers(brokers: Iterable[(String, Int)], errs: Err) (fn: SimpleConsumer => Any): Unit = { brokers.foreach { hp => var consumer: SimpleConsumer = null try { consumer = connect(hp._1, hp._2) fn(consumer) } catch { case NonFatal(e) => errs.append(e) } finally { if (consumer != null) { consumer.close() } } } } //獲取kafka最開始的offset def getFromOffsets(kafkaParams: Map[String, String], topics: Set[String]): Map[TopicAndPartition, Long] = { val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) val result = for { topicPartitions <- getPartitions(topics).right leaderOffsets <- (if (reset == Some("smallest")) { getEarliestLeaderOffsets(topicPartitions) } else { getLatestLeaderOffsets(topicPartitions) }).right } yield { leaderOffsets.map { case (tp, lo) => (tp, lo.offset) } } KafkaClusterHelper.checkErrors(result) } } object KafkaClusterHelper { type Err = ArrayBuffer[Throwable] /** If the result is right, return it, otherwise throw SparkException */ def checkErrors[T](result: Either[Err, T]): T = { result.fold( errs => throw new SparkException(errs.mkString("\n")), ok => ok ) } case class LeaderOffset(host: String, port: Int, offset: Long) /** * High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case. * Simple consumers connect directly to brokers, but need many of the same configs. * This subclass won't warn about missing ZK params, or presence of broker params. */ class SimpleConsumerConfig private(brokers: String, originalProps: Properties) extends ConsumerConfig(originalProps) { val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp => val hpa = hp.split(":") if (hpa.size == 1) { throw new SparkException(s"Broker not in the correct format of <host>:<port> [$brokers]") } (hpa(0), hpa(1).toInt) } } object SimpleConsumerConfig { /** * Make a consumer config without requiring group.id or zookeeper.connect, * since communicating with brokers also needs common settings such as timeout */ def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = { // These keys are from other pre-existing kafka configs for specifying brokers, accept either val brokers = kafkaParams.get("metadata.broker.list") .orElse(kafkaParams.get("bootstrap.servers")) .getOrElse(throw new SparkException( "Must specify metadata.broker.list or bootstrap.servers")) val props = new Properties() kafkaParams.foreach { case (key, value) => // prevent warnings on parameters ConsumerConfig doesn't know about if (key != "metadata.broker.list" && key != "bootstrap.servers") { props.put(key, value) } } Seq("zookeeper.connect", "group.id").foreach { s => if (!props.containsKey(s)) { props.setProperty(s, "") } } new SimpleConsumerConfig(brokers, props) } } }
4、pom.xml檔案
<dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.11</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.11</artifactId> <version>1.6.1</version> </dependency> </dependencies>