spark筆記之Spark Streaming整合kafka實戰
kafka作為一個實時的分散式訊息佇列,實時的生產和消費訊息,這裡我們可以利用SparkStreaming實時地讀取kafka中的資料,然後進行相關計算。
在Spark1.3版本後,KafkaUtils裡面提供了兩個建立dstream的方法,一種為KafkaUtils.createDstream,另一種為KafkaUtils.createDirectStream。
7.1 KafkaUtils.createDstream方式
KafkaUtils.createDstream(ssc, [zk], [group id], [per-topic,partitions] ) 使用了receivers接收器來接收資料,利用的是Kafka高層次的消費者api,對於所有的receivers接收到的資料將會儲存在
A、建立一個receiver接收器來對kafka進行定時拉取資料,這裡產生的dstream中rdd分割槽和kafka的topic分割槽不是一個概念,故如果增加特定主體分割槽數僅僅是增加一個receiver中消費topic的執行緒數,並沒有增加spark的並行處理的資料量。 B、對於不同的group和topic可以使用多個receivers建立不同的DStream C、如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)
同時需要設定儲存級別(預設StorageLevel.MEMORY_AND_DISK_SER_2),
7.1.1 KafkaUtils.createDstream實戰
(1)新增kafka的pom依賴
[AppleScript] 純文字檢視 複製程式碼
|
(2)啟動zookeeper叢集
zkServer.sh start
(3)啟動kafka叢集
kafka-server-start.sh /export/servers/kafka/config/server.properties
(4) 建立topic
kafka-topics.sh --create --zookeeper hdp-node-01:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
(5) 向topic中生產資料
通過shell命令向topic傳送訊息
kafka-console-producer.sh --broker-list hdp-node-01:9092 --topic kafka_spark
(6)編寫Spark Streaming應用程式
[AppleScript] 純文字檢視 複製程式碼
|
(7)執行程式碼,檢視控制檯結果資料
總結:
通過這種方式實現,剛開始的時候系統正常執行,沒有發現問題,但是如果系統異常重新啟動sparkstreaming程式後,發現程式會重複處理已經處理過的資料,這種基於receiver的方式,是使用Kafka的高階API,topic的offset偏移量在ZooKeeper中。這是消費Kafka資料的傳統方式。這種方式配合著WAL機制可以保證資料零丟失的高可靠性,但是卻無法保證資料只被處理一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。官方現在也已經不推薦這種整合方式,我們使用官網推薦的第二種方式kafkaUtils的createDirectStream()方式。
7.2 KafkaUtils.createDirectStream方式
這種方式不同於Receiver接收資料,它定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量範圍在每個batch裡面處理資料,Spark通過呼叫kafka簡單的消費者Api(低階api)讀取一定範圍的資料。 相比基於Receiver方式有幾個優點: A、簡化並行
不需要建立多個kafka輸入流,然後union它們,sparkStreaming將會建立和kafka分割槽數相同的rdd的分割槽數,而且會從kafka中並行讀取資料,spark中RDD的分割槽數和kafka中的topic分割槽數是一一對應的關係。
B、高效,
第一種實現資料的零丟失是將資料預先儲存在WAL中,會複製一遍資料,會導致資料被拷貝兩次,第一次是接受kafka中topic的資料,另一次是寫到WAL中。而沒有receiver的這種方式消除了這個問題。 C、恰好一次語義(Exactly-once-semantics)
Receiver讀取kafka資料是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過資料儲存在WAL中保證資料不丟失,但是可能會因為sparkStreaming和ZK中儲存的偏移量不一致而導致資料被消費了多次。EOS通過實現kafka低層次api,偏移量僅僅被ssc儲存在checkpoint中,消除了zk和ssc偏移量不一致的問題。缺點是無法使用基於zookeeper的kafka監控工具。
7.2.1 KafkaUtils.createDirectStream實戰
[AppleScript] 純文字檢視 複製程式碼
|
(2)檢視對應的效果
向topic中新增資料
檢視控制檯的輸出: