SparkStreaming與kafka通過直連方式讀取資料
阿新 • • 發佈:2018-11-11
1、Spark-Streaming的receive的方式和直連方式有什麼區別:
Receive接收固定時間間隔的資料(放在記憶體中),達到固定的時間才進行處理,效率低並且容易丟失資料(Kafka高階API),自動維護偏移量
Direct直連方式,相當於直接連線到Kafka的分割槽上,相當於Kafka底層API,效率很高,需要自己維護偏移量,讀一條處理一條(把指定的時間間隔當做一個批次)。
2、直接連到kafka的分割槽上讀取,一個RDD的分割槽對應一個kafka的分割槽,一個分割槽會生成一個Task,這個Task不會消失,會一直盯著這個分割槽,不停的讀取資料。
3、在用Reciver方式,消費消費者時,不用指定broker,在直連的方式,需要指定broker,因為這種方式相當於直接練到Kafka的分割槽中,需要broker
4、zookeeper的作用,zookeeper中記錄的是,以組名和topic名作為唯一標識,不同的組可以讀取同一topic中的資料,記偏移量是從前面記錄
package day01.Dirctor
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org. apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object DrictorDemoV4 {
def main(args: Array[String]): Unit = {
val group = "groupTT"//指定組名
val conf = new SparkConf ().setAppName("kafkaWC").setMaster("local[2]")
val sc = new SparkContext(conf)
//建立SparkStreaming,並設定時間間隔
val ssc = new StreamingContext(conf,Duration(5000))
//指定消費的topic名字
val topic = "tt2"
//指定kafka的broker地址,Streaming的Task直連到kafka的分割槽上,用底層的API,效率更高
val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
//指定zk地址,後期更新消費的偏移量時使用(以後可以Redis、MySQL)
val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
//建立DStream時使用topic名字的集合,SparkStreaming可以同時消費多少topic
val topics:Set[String] = Set(topic)
//建立一個ZKGroupTopicDirs物件,其實就是指定往zk中寫入資料的目錄,用於儲存偏移量
val topicDirs = new ZKGroupTopicDirs(group,topic)
//獲取zookeeper中的路徑"groupTT/offsets/tt01"
val zkTopicPath:String = s"${topicDirs.consumerOffsetDir}"
//準備kafka引數
val kafkaParams = Map(
"metadata.broker.list"->brokerList,
"group.id"->group,
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
)
//zookeeper的host和ip,建立一個Client,用於更新偏移量
//是zookeeper的一個客戶端,可以從zk中讀取,偏移量的資料,並跟新偏移量
val zkClient: ZkClient = new ZkClient(zkQuorum)
//查詢該路徑下是否位元組點(預設有位元組點為我們自己儲存不同Partition生成的)
// /consumers/組名/offsets/topic名/分割槽名/偏移量, 可以zkClient.sh 插詢
val children = zkClient.countChildren(zkTopicPath)
//建立一個InputDStream, 要是var,因為不去定是不是以前讀過,要先判斷,再賦值
//key 是 kafka的Key,預設不設定是null,value是讀取的內容
var kafkaStream:InputDStream [(String,String)] = null
//如果zookeeper中儲存offset,我們會利用這個Offset作為kafkaStream的讀取位置
var fromOffsets:Map[TopicAndPartition,Long] = Map()
//如果儲存過Offset,以前讀取過
if(children >0){
for (i<- 0 until children){
//zkClient根據檔案位置讀取偏移量( /consumers/組名/offsets/topic名/分割槽名/偏移量,)
val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}")
val tp: TopicAndPartition = TopicAndPartition(topic,i)
//將不同 Partition對應的Offset增加到fromOffset中(
//
fromOffsets += (tp-> partitionOffset.toLong)
//這個會將kafka的訊息進行transform,最終的kafka的資料會變成kafka的key,message)這樣的Tuple
val messageHandler = (mam:MessageAndMetadata[String,String])=>(mam.key(),mam.message())
//通過KafkaUtils建立直連的DStream,fromOffset引數的作用是按照之間計算好的偏移量繼續讀取
//[String,String,StringDecoder,StringDecoder,(String,String)]
// key value key的解碼, value的解碼
kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
ssc,kafkaParams,fromOffsets,messageHandler
)
}
}else{
//從頭開始讀,之前沒有讀取過
kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
}
var offsetRanges = Array[OffsetRange]()
//從kafka中讀取訊息,DStream的Transform方法,可以將當前批次RDD取出來來
//該transform方法計算獲取當前批次RDD,然後將RDD的偏移量取出來,然後將RDD返回DStream
val transformed: DStream[(String, String)] = kafkaStream.transform(rdd => {
//得到該rdd對應的kafka的訊息的offset
//該RDD是個kafkaRDD,可以獲取偏移量的範圍
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
})
val messages: DStream[String] = transformed.map((_._2))
//一次迭代DStream中的RDD
messages.foreachRDD(rdd=>{
//對RDD進行操作,觸發Action
rdd.foreachPartition(partition=>{
partition.foreach(x=>{
println(x)
})
})
})
for(off <- offsetRanges){
//獲取zk 中記錄偏移量的目錄,
// /consumers/組名/offsets/topic名/分割槽名
val zkPath = s"${topicDirs.consumerOffsetDir}/${off.partition}"
//更新偏移量
ZkUtils.updatePersistentPath(zkClient,zkPath,off.untilOffset.toString)
}
ssc.start()
ssc.awaitTermination()
}
}