Spark Streaming +Kafka 使用底層API直接讀取Kafka的Partition資料,手動更新Offset到Zookeeper叢集
阿新 • • 發佈:2019-02-13
Spark Streaming +Kafka 使用底層API直接讀取Kafka的Partition資料,正常Offset儲存在CheckPoint中。但是這樣無法實現Kafka監控工具對Kafka的監控,所以手動更新Offset到Zookeeper叢集中
相關原始碼簡單介紹:
1:TopicAndPartition是對 topic和partition的id的封裝的一個樣例類
case class TopicAndPartition(topic: String, partitionId: Int)
2:OffsetRange 是對topic name,partition id,fromOffset(當前消費的開始偏移),untilOffset(當前消費的結束偏移)的封裝。所以OffsetRange 包含資訊有:topic名字,分割槽Id,開始偏移,結束偏移。
/** * * @param topic Kafka topic name * @param partition Kafka partition id * @param fromOffset inclusive starting offset * @param untilOffset exclusive ending offset */ final class OffsetRange private(val topic: String, val partition: Int, val fromOffset: Long, val untilOffset: Long) extends Serializable
3:程式碼實現:
object Iteblog { val brokerAddress = "http://www.iteblog.com:9092" val groupID="testGroup" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokerAddress, "group.id" -> "iteblog") def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Test") sparkConf.set("spark.kryo.registrator", "utils.CpcKryoSerializer") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val topicsSet = Set("iteblog") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) messages.foreachRDD(rdd => { // 把RDD轉成HasOffsetRanges型別(KafkaRDD extends HasOffsetRanges) // OffsetRange 說明:Represents a range of offsets from a single Kafka TopicAndPartition. // OffsetRange 說明: Instances of this class can be created with `OffsetRange.create()`. val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // offsetRanges 的實現程式碼(KafkaRDD中):tp:TopicAndPartition,fo:fromOffset // val offsetRanges = fromOffsets.map { case (tp, fo) => // val uo = untilOffsets(tp) // OffsetRange(tp.topic, tp.partition, fo, uo.offset) // }.toArray val kc = new KafkaCluster(kafkaParams) for (offsets <- offsetsList) { //TopicAndPartition 主構造引數第一個是topic,第二個是 partition id val topicAndPartition = TopicAndPartition("iteblog", offsets.partition) //offsets.partition表示的是Kafka partition id val o = kc.setConsumerOffsets(groupID, Map((topicAndPartition, offsets.untilOffset)))//offsets.untilOffset:是 if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } }) ssc.start() ssc.awaitTermination() ssc.stop() } }
核心程式碼講解:
使用KafkaUtils的createDirectStream方法,呼叫底層API直接消費Kafka Partition的資料(Kafka Partition和RDD Partition 一一對應)。createDirectStream返回值是DStream,底層是RDD。
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
messages.foreachRDD 是對messages底層RDD計算其偏移範圍。
KafkaRDD和HasOffsetRanges關係(構造引數和泛型省略,具體見原始碼):
KafkaRDD extends RDD[R](sc, Nil) with Logging with HasOffsetRanges
//rdd是messages.foreachRDD中的變數,rdd其型別是KafkaRDD,但是由於多型的原因rdd實際上不是KafkaRDD型別,而是RDD型別,所以需要向下轉型為HasOffsetRanges,呼叫offsetRanges方法。(回憶OffsetRange是對什麼的封裝?答案:topic名字,分割槽Id,開始偏移,結束偏移。)
val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges 的實現程式碼(KafkaRDD中):tp:TopicAndPartition,fo:fromOffsetval offsetRanges = fromOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}.toArray
KafkaCluster 完成對更新偏移到zookeeper叢集的封裝工具類。
val kc = new KafkaCluster(kafkaParams)
對offsetRanges陣列遍歷:setConsumerOffsets是KafkaCluster
強調:OffsetRange是對什麼的封裝?答案:topic名字,分割槽Id,開始偏移,結束偏移
for (offsets <- offsetsList) {
//offsets.untilOffset是結束偏移量
val o = kc.setConsumerOffsets(groupID, Map((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } }
以上是完成對createDirectStream建立的DStream中的一個底層RDD完成偏移的更新到zookeeper叢集,通過foreachRDD完成對所有RDD的更新!!!至此已經完成RDD偏移的計算以及更新,但是具體的更新方法?在KafkaCluster中。接下來看KafkaCluster程式碼。
直接上程式碼:
package org.apache.spark.streaming.kafka
//使用org.apache.spark.streaming.kafka的原因: private[spark] object SimpleConsumerConfig限制只在spark包中使用!
import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { //Err是型別ArrayBuffer[Throwable]別名 type Err = ArrayBuffer[Throwable]
@transient private var _config: SimpleConsumerConfig = null
def config: SimpleConsumerConfig = this.synchronized {
if (_config == null) {
//SimpleConsumerConfig的apply方法部分程式碼:
//val brokers = kafkaParams.get("metadata.broker.list").orElse(kafkaParams.get("bootstrap.servers"))
//所以kafkaParams必須包含key=metadata.broker.list或者bootstrap.servers對應的Value
_config = SimpleConsumerConfig(kafkaParams)
}
_config
}
/**
*
* @param groupId: String
* @param offsets: Map[TopicAndPartition, Long]
* @return
*/
def setConsumerOffsets(groupId: String,
offsets: Map[TopicAndPartition, Long]
): Either[Err, Map[TopicAndPartition, Short]] = {
setConsumerOffsetMetadata(groupId, offsets.map { kv =>
kv._1 -> OffsetMetadataAndError(kv._2)
})
}
def setConsumerOffsetMetadata(groupId: String,
metadata: Map[TopicAndPartition, OffsetMetadataAndError]
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
val req = OffsetCommitRequest(groupId, metadata)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.commitOffsets(req)
val respMap = resp.requestInfo
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { err: Short =>
if (err == ErrorMapping.NoError) {
result += tp -> err
} else {
errs.append(ErrorMapping.exceptionFor(err))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
Left(errs)
}
private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
(fn: SimpleConsumer => Any): Unit = {
brokers.foreach { hp =>
var consumer: SimpleConsumer = null
try {
consumer = connect(hp._1, hp._2)
fn(consumer)
} catch {
case NonFatal(e) =>
errs.append(e)
} finally {
if (consumer != null) {
consumer.close()
}
}
}
}
def connect(host: String, port: Int): SimpleConsumer =
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
}
小拓展:
Map的連續獲取值用法:
object MapDemo extends App {
val map = Map("1" -> "11", "2" -> "22", "3" -> "33")
map.map {
case (a, b) =>
println(a + " " + b)
}
//語法:先獲取key=4的value,如果存在返回否則獲取key=5的value,如果存在返回,不存在的話直接異常。
val result = map.get("4").orElse(map.get("5")).getOrElse(throw new Exception("exception"))
println(result)
}
核心程式碼講解之SimpleConsumerConfig的apply方法:
/**
* Make a consumer config without requiring group.id or zookeeper.connect,
* since communicating with brokers also needs common settings such as timeout
*/
def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
// These keys are from other pre-existing kafka configs for specifying brokers,accept either // map連續獲取值用法見上面示例
val brokers = kafkaParams.get("metadata.broker.list")
.orElse(kafkaParams.get("bootstrap.servers"))
.getOrElse(throw new SparkException(
"Must specify metadata.broker.list or bootstrap.servers"))
val props = new Properties()
kafkaParams.foreach { case (key, value) =>
// prevent warnings on parameters ConsumerConfig doesn't know about
if (key != "metadata.broker.list" && key != "bootstrap.servers") {
props.put(key, value)
}
}
//如果沒有zookeeper.connect和group.id,設定其value為空字串。 Seq("zookeeper.connect", "group.id").foreach { s =>
if (!props.contains(s)) {
props.setProperty(s, "")
}
}
new SimpleConsumerConfig(brokers, props)
}
OffsetMetadataAndError樣例類:
case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) {
def offset = offsetMetadata.offset
def metadata = offsetMetadata.metadata
override def toString = "[%s,ErrorCode %d]".format(offsetMetadata, error)
}
TopicAndPartition是對 topic和partition的id的封裝的一個樣例類
/**
*
* @param groupId: String
* @param offsets: Map[TopicAndPartition, Long]
* Map中key和value含義TopicAndPartition 是topic和partition id封裝long是消費的結束偏移
* @return */ def setConsumerOffsets(groupId: String,offsets: Map[TopicAndPartition, Long]): Either[Err, Map[TopicAndPartition, Short]] = { setConsumerOffsetMetadata(groupId, offsets.map { kv => kv._1 -> OffsetMetadataAndError(kv._2) }) }具體更多細節見程式碼註釋吧,寫的很詳細。就不在重複貼在部落格了。
github地址:https://github.com/Dax1n/spark-kafka-directstream-zookeeper