1. 程式人生 > >Kafka整合SparkStreaming2.2的一下午(奇葩錯誤一堆)

Kafka整合SparkStreaming2.2的一下午(奇葩錯誤一堆)

叢集環境:CDH 5.13.0Spark 2.2.0Scala 2.11.0

ps:沒營養的錯誤。。有些難受。還是要多看看原始碼啊。。

1.重頭:Kafka整合SparkStreaming

此處選擇kafka 0.10版本
點進去首先就能看到關鍵東西 - Maven座標,我們選擇:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version
>
2.2.0</version> </dependency>

這個整合依賴包包含很多元件,可以ctrl點選進去檢視
但除了這個,我還加了兩個包,便於本地測試:

<!--spark的核心包在spark-core在spark上已經有了-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version
>
<scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> <scope>provided</scope> </dependency>

Scope設定為provided

,打包時就不用加進去了,因為叢集環境本地存在這些包。

注:打包外掛:

<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.yj.test1</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

通過Spark叢集執行程式碼:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


def main(args: Array[String]): Unit = {

    //配置spqrkconf引數
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_kafka_direct").setMaster("local[2]")
    //構建sparkContext物件
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //構建StreamingContext物件,每個批處理的時間間隔
    val ssc: StreamingContext = new StreamingContext(sc, Milliseconds(3000))
    //設定checkpoint
//    ssc.checkpoint("/user/spark/sparkstreaming/checkpoint")

    //設定broker的地址資訊,有多個可以寫多個,用逗號隔開
    val brokers = "cdh1:9092,cdh2:9092,cdh3:9092"

    //設定引數資訊
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "fodr_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("test")

    // 建立一個DirectStream
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc, LocationStrategies.PreferConsistent, Subscribe[String, String](topics, kafkaParams)
    )


    stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }

    val words: DStream[String] = stream.map(_.value()).flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//    val total: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    val windows: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((v1: Int, v2: Int) =>
      v1 + v2, Seconds(3), Seconds(6))
    windows.print()
//    total.print()

    ssc.start()
    ssc.awaitTermination()
  }

注:若想使用 updateStateByKey 則必須設定 checkpoint

叢集伺服器提交job:

spark2-submit   --master yarn \
--deploy-mode client \
--num-executors 16 \
--executor-cores 2 \
--executor-memory 8g \
--class com.yj.test1 \
--driver-memory 2g \
/root/scala_test_bak-1.0-SNAPSHOT-jar-with-dependencies.jar

基本OK了。。

2.各種錯誤

錯誤1:首當其衝的一個bug:NoSuchMethodError

Exception in thread "streaming-start" java.lang.NoSuchMethodError:
 org.apache.kafka.clients.consumer.KafkaConsumer.subscribe

這個錯誤真的太經典了,在整合Kafka和SparkStreaming時。。百度谷歌過一堆,很常見,但真正有營養的沒幾個

總結:發生這種錯誤,99%都是因為版本沒整合好!
比如:
Spark版本是不是2.2.0
Scala版本是不是2.11.0
kafka版本是不是高於0.10.0
我的問題就在於Spark預設的kafka版本為0.9,根本不是0.10!
這裡寫圖片描述
改為0.10就ok了

仔細看看你的叢集環境,版本吧

錯誤2:檢查checkpoint,檔案不存在
這裡寫圖片描述
以為是checkpoint那行程式碼有問題,hdfs路徑、許可權、需要提前建立什麼的,看了下原始碼:

def checkpoint(directory: String) {
    if (directory != null) {
      val path = new Path(directory)
      val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
      fs.mkdirs(path)
      val fullPath = fs.getFileStatus(path).getPath().toString
      sc.setCheckpointDir(fullPath)
      checkpointDir = fullPath
    } else {
      checkpointDir = null
    }
  }

並沒有問題啊!(叢集預設路徑為hdfs)
排查發現:spark提交命令沒寫executor。。

錯誤3
這裡寫圖片描述
task數瘋狂彈跳上漲,感覺是checkpoint有問題,寫執行緒被無限拒絕,task無限重啟失敗
解決:換了一個topic就好了。。

錯誤4:Xshell5終端消費訊息看不到,無論怎麼生產訊息,就是看不到被消費
解決:換了一個SSH渠道就看到訊息了(xshell顯示問題)。。。

還有一些小問題夾雜其中,都有些莫名其妙。。日後使用時出現問題再總結吧