Spark Streaming連線Kafka入門教程
前言
首先要安裝好kafka,這裡不做kafka安裝的介紹,本文是Spark Streaming入門教程,只是簡單的介紹如何利用spark 連線kafka,並消費資料,由於博主也是才學,所以其中程式碼以實現為主,可能並不是最好的實現方式。
1、對應依賴
根據kafka版本選擇對應的依賴,我的kafka版本為0.10.1,spark版本2.2.1,然後在maven倉庫找到對應的依賴。
(Kafka專案在版本0.8和0.10之間引入了新的消費者API,因此有兩個獨立的相應Spark Streaming軟體包可用)
<dependency>
<groupId> org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
我用的是sbt,對應的依賴:
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.1"
2、下載依賴
在命令列執行
sbt eclipse
(我用的是eclipse sbt,具體可看我的其他部落格,具體命令根據自己的實際情況)
3、建立topic
建立測試用topic top1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic top1
4、啟動程式
下好依賴之後,根據官方文件提供的示例進行程式碼測試
下面的程式碼示例,主要實現spark 連線kafka,並將接收的資料打印出來,沒有實現複雜的功能。
package com.dkl.leanring.spark.kafka
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object KafaDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("KafaDemo")
//重新整理時間設定為1秒
val ssc = new StreamingContext(conf, Seconds(1))
//消費者配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.180.29.180:6667", //kafka叢集地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group", //消費者組名
"auto.offset.reset" -> "latest", //latest自動重置偏移量為最新的偏移量
"enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,則這個消費者的偏移量會在後臺自動提交
val topics = Array("top1") //消費主題,可以同時消費多個
//建立DStream,返回接收到的輸入資料
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
//列印獲取到的資料,因為1秒重新整理一次,所以資料長度大於0時才打印
stream.foreachRDD(f => {
if (f.count > 0)
f.foreach(f => println(f.value()))
})
ssc.start();
ssc.awaitTermination();
}
}
啟動上面的程式(本地eclipse啟動即可)
需要記住的要點
當在本地執行一個 Spark Streaming 程式的時候,不要使用 “local” 或者 “local[1]” 作為 master 的 URL 。這兩種方法中的任何一個都意味著只有一個執行緒將用於執行本地任務。如果你正在使用一個基於接收器(receiver)的輸入離散流(input DStream)(例如, sockets ,Kafka ,Flume 等),則該單獨的執行緒將用於執行接收器(receiver),而沒有留下任何的執行緒用於處理接收到的資料。因此,在本地執行時,總是用 “local[n]” 作為 master URL ,其中的 n > 執行接收器的數量。
將邏輯擴充套件到叢集上去執行,分配給 Spark Streaming 應用程式的核心(core)的核心數必須大於接收器(receiver)的數量。否則系統將接收資料,但是無法處理它。
我一開始沒有看到官網提醒的這一點,將示例中的local[2]改為local,現在已經在程式碼裡改回local[2]了,但是下面的截圖沒有替換,注意下。
5、傳送訊息
執行producer
bin/kafka-console-producer.sh --broker-list localhost:6667 --topic top1
然後依次傳送下面幾個訊息
hadoop
spark
kafka
中文測試
6、結果
然後在eclipse console就可以看到對應的資料了。
hadoop
spark
kafka
中文測試
為了直觀的展示和理解,附上截圖:
傳送訊息
結果