Kafka整合SparkStreaming2.2的一下午(奇葩錯誤一堆)
叢集環境:CDH 5.13.0,Spark 2.2.0,Scala 2.11.0
ps:沒營養的錯誤。。有些難受。還是要多看看原始碼啊。。
1.重頭:Kafka整合SparkStreaming
此處選擇kafka 0.10版本
點進去首先就能看到關鍵東西 - Maven座標,我們選擇:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version >2.2.0</version>
</dependency>
這個整合依賴包包含很多元件,可以ctrl點選進去檢視
但除了這個,我還加了兩個包,便於本地測試:
<!--spark的核心包在spark-core在spark上已經有了-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version >
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
Scope設定為provided
注:打包外掛:
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.yj.test1</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
通過Spark叢集執行程式碼:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
def main(args: Array[String]): Unit = {
//配置spqrkconf引數
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_kafka_direct").setMaster("local[2]")
//構建sparkContext物件
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//構建StreamingContext物件,每個批處理的時間間隔
val ssc: StreamingContext = new StreamingContext(sc, Milliseconds(3000))
//設定checkpoint
// ssc.checkpoint("/user/spark/sparkstreaming/checkpoint")
//設定broker的地址資訊,有多個可以寫多個,用逗號隔開
val brokers = "cdh1:9092,cdh2:9092,cdh3:9092"
//設定引數資訊
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "fodr_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
// 建立一個DirectStream
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
val words: DStream[String] = stream.map(_.value()).flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
// val total: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
val windows: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((v1: Int, v2: Int) =>
v1 + v2, Seconds(3), Seconds(6))
windows.print()
// total.print()
ssc.start()
ssc.awaitTermination()
}
注:若想使用 updateStateByKey 則必須設定 checkpoint
叢集伺服器提交job:
spark2-submit --master yarn \
--deploy-mode client \
--num-executors 16 \
--executor-cores 2 \
--executor-memory 8g \
--class com.yj.test1 \
--driver-memory 2g \
/root/scala_test_bak-1.0-SNAPSHOT-jar-with-dependencies.jar
基本OK了。。
2.各種錯誤
錯誤1:首當其衝的一個bug:NoSuchMethodError
Exception in thread "streaming-start" java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe
這個錯誤真的太經典了,在整合Kafka和SparkStreaming時。。百度谷歌過一堆,很常見,但真正有營養的沒幾個
總結:發生這種錯誤,99%都是因為版本沒整合好!
比如:
Spark版本是不是2.2.0
Scala版本是不是2.11.0
kafka版本是不是高於0.10.0
我的問題就在於Spark預設的kafka版本為0.9,根本不是0.10!
改為0.10就ok了
仔細看看你的叢集環境,版本吧
錯誤2:檢查checkpoint,檔案不存在
以為是checkpoint那行程式碼有問題,hdfs路徑、許可權、需要提前建立什麼的,看了下原始碼:
def checkpoint(directory: String) {
if (directory != null) {
val path = new Path(directory)
val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
fs.mkdirs(path)
val fullPath = fs.getFileStatus(path).getPath().toString
sc.setCheckpointDir(fullPath)
checkpointDir = fullPath
} else {
checkpointDir = null
}
}
並沒有問題啊!(叢集預設路徑為hdfs)
排查發現:spark提交命令沒寫executor。。
錯誤3:
task數瘋狂彈跳上漲,感覺是checkpoint有問題,寫執行緒被無限拒絕,task無限重啟失敗
解決:換了一個topic就好了。。
錯誤4:Xshell5終端消費訊息看不到,無論怎麼生產訊息,就是看不到被消費
解決:換了一個SSH渠道就看到訊息了(xshell顯示問題)。。。
還有一些小問題夾雜其中,都有些莫名其妙。。日後使用時出現問題再總結吧