1. 程式人生 > >Spark-Streaming連線kafka0.8 能連線卻不能消費問題

Spark-Streaming連線kafka0.8 能連線卻不能消費問題

 話不多說,直接上程式碼

package cn.sparkstreaming.kafka

import kafka.serializer.{StringDecoder, Decoder}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}

import scala.reflect.ClassTag

/**
  * Created by Administrator on 2018/11/22.
  */
object SparkStreamDirectDemo {
  def main(args: Array[String]) {

    val conf = new SparkConf()
    conf.setAppName("spark_streaming")
    //conf.setMaster("local[*]")

    val sc = new SparkContext(conf)
    sc.setCheckpointDir("file:///segment2/Alarm_data/checkpoints")
    //sc.setCheckpointDir("checkpoints")
    sc.setLogLevel("ERROR")
    //多少秒消費一次
    val ssc = new StreamingContext(sc, Seconds(60))

    val topics = Map("WXALARM" -> 2)

    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "10.216.5.152:9093,10.216.5.153:9093,10.216.5.154:9093",
      "group.id" -> "WYWX_123",
      "auto.offset.reset" -> "smallest"
    )

    // 直連方式拉取資料,這種方式不會修改資料的偏移量,需要手動的更新
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("WXALARM")).map(_._2)
    //val lines = KafkaUtils.createStream(ssc, "10.216.5.152:2183,10.216.5.153:2183,10.216.5.154:2183", "WYWX", topics).map(_._2)

    //lines.print()
    lines.saveAsTextFiles("file:///segment2/Alarm_data/Direct.txt")
    ssc.start()
    ssc.awaitTermination()

  }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>esb_kafka</groupId>
    <artifactId>esb_kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.1</version>
        </dependency>
        <dependency>
            <groupId>io.spray</groupId>
            <artifactId>spray-json_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.1-901.jdbc4</version>
        </dependency>
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-ftp</artifactId>
            <version>2.13.2</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 因為在自己的叢集上是可以消費資料的,但是放在生產環境上不能消費,可能是kafka版本太低,也可能是cdh版本太高,導致不能相容

生產環境異常:

18/11/21 17:44:07 INFO spark.SparkContext: Running Spark version 2.3.0.cloudera4
18/11/21 17:44:07 INFO spark.SparkContext: Submitted application: spark_streaming
18/11/21 17:44:07 INFO spark.SecurityManager: Changing view acls to: root
18/11/21 17:44:07 INFO spark.SecurityManager: Changing modify acls to: root
18/11/21 17:44:07 INFO spark.SecurityManager: Changing view acls groups to: 
18/11/21 17:44:07 INFO spark.SecurityManager: Changing modify acls groups to: 
18/11/21 17:44:07 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
18/11/21 17:44:08 INFO util.Utils: Successfully started service 'sparkDriver' on port 7355.
18/11/21 17:44:08 INFO spark.SparkEnv: Registering MapOutputTracker
18/11/21 17:44:08 INFO spark.SparkEnv: Registering BlockManagerMaster
18/11/21 17:44:08 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/11/21 17:44:08 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/11/21 17:44:08 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-78ed315e-aa29-40b1-a44c-b1e05b0aea67
18/11/21 17:44:08 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
18/11/21 17:44:08 INFO spark.SparkEnv: Registering OutputCommitCoordinator
18/11/21 17:44:08 INFO util.log: Logging initialized @2256ms
18/11/21 17:44:08 INFO server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
18/11/21 17:44:08 INFO server.Server: Started @2371ms
18/11/21 17:44:08 INFO server.AbstractConnector: Started 
[email protected]
{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 18/11/21 17:44:08 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/jobs,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/jobs/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/jobs/job,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/jobs/job/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/stages,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/stages/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/stages/stage,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/stages/stage/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/stages/pool,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/stages/pool/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/storage,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/storage/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/storage/rdd,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/storage/rdd/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/environment,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/environment/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/executors,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/executors/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/executors/threadDump,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/executors/threadDump/json,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/static,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/api,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/jobs/job/kill,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO handler.ContextHandler: Started [email protected]{/stages/stage/kill,null,AVAILABLE,@Spark} 18/11/21 17:44:08 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hbwy37:4040 18/11/21 17:44:08 INFO spark.SparkContext: Added JAR file:/segment2/Alarm_data/esb_kafka-1.0-SNAPSHOT3.jar at spark://hbwy37:7355/jars/esb_kafka-1.0-SNAPSHOT3.jar with timestamp 1542793448794 18/11/21 17:44:08 INFO executor.Executor: Starting executor ID driver on host localhost 18/11/21 17:44:08 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 15748. 18/11/21 17:44:08 INFO netty.NettyBlockTransferService: Server created on hbwy37:15748 18/11/21 17:44:08 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/11/21 17:44:08 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hbwy37, 15748, None) 18/11/21 17:44:08 INFO storage.BlockManagerMasterEndpoint: Registering block manager hbwy37:15748 with 366.3 MB RAM, BlockManagerId(driver, hbwy37, 15748, None) 18/11/21 17:44:08 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hbwy37, 15748, None) 18/11/21 17:44:08 INFO storage.BlockManager: external shuffle service port = 7337 18/11/21 17:44:08 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, hbwy37, 15748, None) 18/11/21 17:44:09 INFO handler.ContextHandler: Started [email protected]{/metrics/json,null,AVAILABLE,@Spark} 18/11/21 17:44:10 INFO scheduler.EventLoggingListener: Logging events to hdfs://nameservice1/user/spark/spark2ApplicationHistory/local-1542793448839 18/11/21 17:44:10 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener 18/11/21 17:44:16 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.api.TopicData$.readFrom(FetchResponse.scala:98) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:196) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:212) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/11/21 17:44:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 18/11/21 17:44:16 ERROR scheduler.JobScheduler: Error running job streaming job 1542793455000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.api.TopicData$.readFrom(FetchResponse.scala:98) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:196) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:212) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.take(RDD.scala:1337) at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735) at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.api.TopicData$.readFrom(FetchResponse.scala:98) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:196) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:212) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381) ... 3 more Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.api.TopicData$.readFrom(FetchResponse.scala:98) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:196) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:212) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.take(RDD.scala:1337) at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735) at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.api.TopicData$.readFrom(FetchResponse.scala:98) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:196) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:212) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381) ... 3 more

找了許久也沒有找到對應的解決方法,考慮到cdh的spark包和原生態的spark包存在一定的差異,就去官方看了一下

果不其然

 

需要把 spark-streaming-kafka-0-8-assembly_2.11.jar  放到spark的jars目錄下

因為我們cdh裝的最新spark版本2.3.0(cdh暫時沒有spark2.4.0版本)

官方又推薦的是spark 2.4.0版本,我發現 無論是2.4.0還是2.3.0的spark 都是可以執行的

 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.4.0

希望我遇到的這個坑,能讓你解決當前問題