1. 程式人生 > >Spark Streaming 接入 kafka 之 sasl配置

Spark Streaming 接入 kafka 之 sasl配置

被kafka的新版配置折磨的死去活來的,終於搞定了。。。放鬆一下寫此篇部落格以記錄一下。

開發環境

  • spark 2.2.0
  • scala 2.11.8 (目前為止,高版本的scala貌似對kafka的支援還有坑。。。)
  • sbt(目前為止,順便說一下,如果是mac 10.13 之後的系統,並且使用 IntelliJ IDEA的話,sbt的版本要選擇 1.0.3左右的,選擇1.1.0之後那又是會爽的死去活來的)
  • IntelliJ IDEA 社群版 2017.2.16
  • kafka的版本,由於設定了sasl認證,Kafka 的版本要1.0之後的,具體的對應關係可以檢視 此官網連結

為了方便大家配置貼出我的sbt配置:

name := "kstreaming"

version := "0.1"

scalaVersion := "2.11.8"

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion,
  "org.apache.spark"
%% "spark-streaming" % sparkVersion, "org.apache.spark" %% "spark-hive" % sparkVersion, "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion )

當然了連結的普通例子還是以官網的為準,戳這裡
迴歸正題,接著說咱們的sasl配置,相對於官網的配置,只需要修改以下部位…

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka
.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092,anotherhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean), "security.protocol" -> "SASL_PLAINTEXT", // 這兩項是我們自己的sasl設定,相對於官網的程式碼也只改了這兩行 "sasl.mechanism" -> "PLAIN" // 這兩項是我們自己的sasl設定,相對於官網的程式碼也只改了這兩行 ) val topics = Array("topicA", "topicB") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.map(record => (record.key, record.value))

接下來就是在 IntelliJ IDEA裡邊跑以下結果了,當然要順手設定以下IntelliJ IDEA了~

  • 介面右上角執行的按鈕旁有個下拉框,下拉選擇 Edit Configurations
  • Confirguration介面的 VM options選項的最右邊有個按鈕
  • 在裡邊新增如下程式碼即可
-Dspark.master=local
-Djava.security.auth.login.config=kafka_client_jaas.conf

第一行是單機執行的意思
第二行是告訴spark執行的時候把認證資訊加到執行環境中
當然這裡邊設定了個檔案,這個檔案要放到工作目錄中~
檔案的內容呢,如下

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

完成了這三步簡單的配置之後,跑起來就清爽了!!!
廢話不多說,敲程式碼去了 T_T