Spark整合Kafka原始碼分析——SparkStreaming從kafak中接收資料
要實現SparkStreaming從kafak中接收資料分為以下幾步(其中涉及的類在包org.apache.spark.streaming.kafka中):
1.建立createStream()函式,返回型別為ReceiverInputDStream物件,在createStream()函式中最後返回構造的KafkaInputDStream類物件
2.KafkaInputDStream類要繼承ReceiverInputDStream,來實現ReceiverInputDStream中的getReceiver()函式,在getReceiver()函式中構造KafkaReceiver類物件
3.KafkaReceiver類是真正幹活的類了,前邊的一些工作都沒啥實質工作,就是在扯皮,就跟工作中某些情況似的,專案中有很多人,一層層的領導們指揮規劃任務,但具體幹活的就是最底層的幾個,不過還是要有這些工作的,這樣整體脈絡比較清晰。
a.設定kafka相關引數
b.設定儲存kafka元資料的zookeeper的地址,連線zookeeper
c.設定kafka中資料的反序列化相關類
d.呼叫kafka消費者api來獲取資料
e.建立執行緒池來
f.關閉執行緒池
SparkStreaming從kafak中接收資料的主要工作就是:
1.在Receiver中做:
a.消費訊息佇列中的資料,得到一條條資料。
b.呼叫Receiver中store函式將資料儲存到Spark記憶體
2.將createStream、ReceiverInputDStream、KafkaInputDStream、KafkaReceiver、Receiver這些類的關係對應好。
具體邏輯分析:
1.spark官網KafkaWordCount示例:
2.主要分析KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)中的createStream()函式:object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }
函式最後返回了KafkaInputDStream物件,跟進KafkaInputDStream。/** * Create an input stream that pulls messages from a Kafka Broker. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, storageLevel) } 跟進createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel): def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] = { val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) }
3.KafkaInputDStream類中程式碼
/**
* Input stream that pulls messages from a Kafka Broker.
*
* @param kafkaParams Map of kafka configuration parameters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*/
private[streaming]
class KafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
}
該類主要有兩個工作:a.KafkaInputDStream類繼承了ReceiverInputDStream[(K, V)](ssc_)。
b.實現了ReceiverInputDStream中的getReceiver()函式,getReceiver()返回兩個Recceiver,原理一樣檢視KafkaReceiver即可。
4.檢視構造的KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel):
private[streaming]
class KafkaReceiver[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends Receiver[(K, V)](storageLevel) with Logging {
// Connection to Kafka
var consumerConnector: ConsumerConnector = null
def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}
}
def onStart() {
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]
// Create threads for each topic/message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
} finally {
executorPool.shutdown() // Just causes threads to terminate after work is done
}
// Handles Kafka messages
private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message)) //Store a single item of received data to Spark's memory.
}
} catch {
case e: Throwable => logError("Error handling message; exiting", e)
}
}
}
}
在構造的KafkaReceiver物件中做了最主要的工作。繼承了Receiver[(K, V)](storageLevel),要實現Receiver中的onStart()、onStop()函式。
在onStart()函式中要做的工作就是把kafka中的資料放到kafka中。
a.設定kafka相關引數
b.設定儲存kafka元資料的zookeeper的地址,連線zookeeper
c.設定kafka中資料的反序列化相關類
d.呼叫kafka消費者api來獲取資料
e.建立執行緒池來將獲取的流資料儲存到spark,store((msgAndMetadata.key, msgAndMetadata.message))該函式在Receiver類中,就是把該條訊息以鍵值對的形式儲存到spark記憶體中,正因為這種鍵值儲存導致呼叫 KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)時返回的是鍵值對的物件,之前用java寫spark接收kafak資料呼叫這個埠時返回這個鍵值對的物件,我就對此有些疑問,現在明白是在這做的處理導致返回的是鍵值對物件。
f.關閉執行緒池
onStop()函式就是關閉消費者與kafka連線了
然後就一層層返回,最後createStream函式的返回物件中就可以得到資料了。
至此spark接收消費kafak資料的工作流程結束了。
相關推薦
Spark整合Kafka原始碼分析——SparkStreaming從kafak中接收資料
整體概括:要實現SparkStreaming從kafak中接收資料分為以下幾步(其中涉及的類在包org.apache.spark.streaming.kafka中): 1.建立createStream()函式,返回型別為ReceiverInputDStream物件,在cre
spark 1.6.0 core原始碼分析9 從簡單例子看action
這一節以reduce為例講解action操作 首先看submitJob方法,它將我們reduce中寫的處理函式隨JobSubmitted訊息傳遞出去,因為每個分割槽都需要呼叫它進行計算;而resultHandler是指最後合併的方法,在每個task完成後,需要呼叫resul
【Spring Boot】(29)、SpringBoot整合Mybatis原始碼分析
在【Spring Boot】(23)、Spring Boot整合Mybatis的章節中講述了SpringBoot整合Mybatis的過程,以及一些配置說明,這節主要講解一下整合的原始碼。 廢話不多說,直接進入今天的主題。 閱讀過我之前寫的文章的童靴,肯定知道SpringBoot整合第三方
Kafka 原始碼分析之LogSegment
這裡分析kafka LogSegment原始碼 通過一步步分析LogManager,Log原始碼之後就會發現,最終的log操作都在LogSegment上實現.LogSegment負責分片的讀寫恢復重新整理刪除等動作都在這裡實現.LogSegment程式碼同樣在原始碼目錄log下. LogSe
Spark Executor啟動原始碼分析
Spark CoarseGrainedExecutorBackend啟動原始碼分析 更多資源 github: https://github.com/opensourceteams/spark-scala-maven csdn(彙總視訊線上看): https://blog
Spark Worker啟動原始碼分析
Spark Worker啟動原始碼分析 更多資源 github: https://github.com/opensourceteams/spark-scala-maven csdn(彙總視訊線上看): https://blog.csdn.net/thinktothing
Spark Master啟動原始碼分析
Spark Master啟動原始碼分析 更多資源 github: https://github.com/opensourceteams/spark-scala-maven csdn(彙總視訊線上看): https://blog.csdn.net/thinktothing
【Go 原始碼分析】從 sort.go 看排序演算法的工程實踐
go version go1.11 darwin/amd64file: src/sort/sort.go 排序演算法有很多種類,比如快排、堆排、插入排序等。各種排序演算法各有其優劣性,在實際生產過程中用到的排序演算法(或者說 Sort 函式)通常是由幾種排序演算法組
從0開始的jieba分詞原始碼分析_1_從cut開始
從一個函式入口逐步分析分詞的整個過程,最後對關鍵函式做了簡化實現,附在最後供大家參考 分析 尋根 import jieba jieba.cut("sentence") 查詢cut的引用: dt =
spark 2.3原始碼分析之ShuffleInMemorySorter
PackedRecordPointer 概述 PackedRecordPointer物件用一個64bit的long型變數來記錄record資訊: [24 bit partition number][13 bit memory page number][27 bit
spark 2.3原始碼分析之SortShuffleWriter
SortShuffleWriter 概述 SortShuffleWriter它主要是判斷在Map端是否需要本地進行combine操作。如果需要聚合,則使用PartitionedAppendOnlyMap;如果不進行combine操作,則使用PartitionedPairB
spark 2.3原始碼分析之ShuffleDependency
ShuffleDependency 成員變數 - ShuffleHandle 在ShuffleDependency中建立ShuffleHandle. 如前面的部落格所述,有以下三種ShuffleHandle: BypassMergeSortShuffleHandle
dubbo原始碼分析(一)-從xml到我們認識的Java物件
專案中用的dubbo的挺多的,然後隨著自己對dubbo的慢慢深入,自己也希望能夠了解dubbo的底層實現,這半年來一直在看dubbo的原始碼,有點斷斷續續的,於是準備寫一個dubbo原始碼系列的分析文章,一來方便自己總結,二來也能夠讓自己的學習有輸出分享。 整個系列會從dubbo的xml到bean到生
挺過最艱難的2018,我終將長大 沉澱一年,我想推薦這些書給你 dubbo原始碼分析(一)-從xml到我們認識的Java物件
2017年結束之後一直沒有更新部落格,2017年的年終總結也沒有寫,一直覺的自己欠自己一個2017。這一年沒有繼續寫部落格,一個原因是自己一整年在瞎忙,忙的暈頭轉向且感覺也沒有收穫多少,同事家裡也發生變故;二是感覺個人沉澱的不夠,需要靜下心沉澱技術與人生。終於渾渾噩噩的2018的過去了,要迎來嶄新的2
Kafka原始碼分析--生產者
ps.本文所有原始碼都基於kafka-0.10.0.1Kafka提供了Java版本的生產者實現--KafkaProducer,使用KafkaProducer的API可以輕鬆實現同步/非同步傳送訊息、批量傳送、超時重發等複雜的功能,KafkaProducer是執行緒安全的,多個
kafka原始碼分析之consumer的原始碼
Consumer的client端 示例程式碼 Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SER
【spring】原始碼分析 一 從ContextLoaderListener開始·
原始碼環境 : idea + spring 4.3.4 +tomcat7 + gradle附 : 基於 java 註解的 配置元資料 的 web.xml 配置做參考(spring 3.0 後支援)<?xml version="1.0" encoding="UTF-8"
kafka原始碼分析
原文地址:http://www.aboutyun.com/thread-9938-1-1.html 問題導讀1.Kafka提供了Producer類作為java producer的api,此類有幾種傳送方式?2.總結呼叫producer.send方法包含哪些流程?3.Producer難以理解的在什麼地方? p
Spark SQL Catalyst原始碼分析之Physical Plan
前面幾篇文章主要介紹的是spark sql包裡的的spark sql執行流程,以及Catalyst包內的SqlParser,Analyzer和Optimizer,最後要介紹一下Catalyst裡最後的一個Plan了,即Physical Plan。物理計劃是Spark SQ
spark-core_04: org.apache.spark.deploy.SparkSubmit原始碼分析:
SparkSubmitArgumentsParser的父類就SparkSubmitOptionParser,在launcher.Main方法執行時用到OptionParser 它的父類也是SparkSubmitOptionParser。並且這個父類有一個方法parser。作用