Spark Streaming 接入 kafka 之 sasl配置
阿新 • • 發佈:2019-01-27
被kafka的新版配置折磨的死去活來的,終於搞定了。。。放鬆一下寫此篇部落格以記錄一下。
開發環境
- spark 2.2.0
- scala 2.11.8 (目前為止,高版本的scala貌似對kafka的支援還有坑。。。)
- sbt(目前為止,順便說一下,如果是mac 10.13 之後的系統,並且使用 IntelliJ IDEA的話,sbt的版本要選擇 1.0.3左右的,選擇1.1.0之後那又是會爽的死去活來的)
- IntelliJ IDEA 社群版 2017.2.16
- kafka的版本,由於設定了sasl認證,Kafka 的版本要1.0之後的,具體的對應關係可以檢視 此官網連結
為了方便大家配置貼出我的sbt配置:
name := "kstreaming"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion
)
當然了連結的普通例子還是以官網的為準,戳這裡
迴歸正題,接著說咱們的sasl配置,相對於官網的配置,只需要修改以下部位…
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka .common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"security.protocol" -> "SASL_PLAINTEXT", // 這兩項是我們自己的sasl設定,相對於官網的程式碼也只改了這兩行
"sasl.mechanism" -> "PLAIN" // 這兩項是我們自己的sasl設定,相對於官網的程式碼也只改了這兩行
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
接下來就是在 IntelliJ IDEA裡邊跑以下結果了,當然要順手設定以下IntelliJ IDEA了~
- 介面右上角執行的按鈕旁有個下拉框,下拉選擇 Edit Configurations
- Confirguration介面的 VM options選項的最右邊有個按鈕
- 在裡邊新增如下程式碼即可
-Dspark.master=local
-Djava.security.auth.login.config=kafka_client_jaas.conf
第一行是單機執行的意思
第二行是告訴spark執行的時候把認證資訊加到執行環境中
當然這裡邊設定了個檔案,這個檔案要放到工作目錄中~
檔案的內容呢,如下
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
};
完成了這三步簡單的配置之後,跑起來就清爽了!!!
廢話不多說,敲程式碼去了 T_T