1. 程式人生 > 其它 >Spark StreamingContext連線Kafka錯誤ERROR StreamingContext: Error starting the context, marking it as sto

Spark StreamingContext連線Kafka錯誤ERROR StreamingContext: Error starting the context, marking it as sto

技術標籤:大資料kafkasparkscalazookeeper大資料

Spark StreamingContext連線Kafka錯誤

ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557)

at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
at org.apache.spark.streaming.DStreamGraph

1.先檢視kafka是否啟動成功

在這裡插入圖片描述

2.檢視kafka的topic的list

在這裡插入圖片描述

3.檢視程式中的kafka的配置,特別注意bootstrap.servers和topic

第一種

  val kafkaParams: Map[String, Object] = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "bigdata",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )

第二種

  val kafkaParams: Map[String, String] = Map(
    ConsumerConfig.GROUP_ID_CONFIG -> "bigdata",
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
  )