1. 程式人生 > >sparkstreaming多consumer消費kafka報錯問題

sparkstreaming多consumer消費kafka報錯問題

交流QQ: 824203453
 

版本: sparkstreaming 2.2      kafka  0.10

sparkstreaming 整合kafka後(Direct模式),同一個groupid下的多個spark-streaming consumer消費kafka中的資料時,會報錯。(如何重現問題:把sparkstreaming集合kafka的程式,執行兩次即可)

報錯如下:

該主題下的資料不能訪問。

問題剖析:

假定消費主題為helloTopic8 , 該主題設定的分割槽數量為3個。

當使用kafka的javaAPI消費資料時,如果啟動同一個組下的多個consumer,程式不會報錯,但是真正能消費到資料的consumer數量,只能和消費的主題的partition的個數一致(這裡為3個)。其他的consumer不能消費到資料。

而當spark streaming 程式整合kafka後,DStream中的rdd的分割槽數和消費的helloTopic8的partition數量一致(也就是3個),所以執行spark streaming程式,也就相當於啟動了3個task來讀取kafka中helloTopic8的資料。

當重複再啟動一個程式時,會重新消費到helloTopic8中的3歌分割槽的數量,導致原來的程式讀取不到相應的資料,隨即報錯。

示例程式碼:

    val conf = new SparkConf()
    .setAppName(this.getClass.getSimpleName)
    .setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hdp-02:9092,hdp-03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer], 
      "group.id" -> "group_hello",
      "auto.offset.reset" -> "earliest" // 最早的
//      "enable.auto.commit" -> (false: java.lang.Boolean)
     ) // 是否自動提交offset

    // 通過呼叫KafkaUtils API 來建立一個DStream
    val topics = Array("helloTopic8")
    val directStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      // 訂閱主題 注意需要給定訊息的型別
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    directStream.map(_.value()).map((_,1)).reduceByKey(_+_).print()
    // 啟動  阻塞
    ssc.start()
    ssc.awaitTermination()

該問題 類似於下面這個問題:

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

可參考:https://blog.csdn.net/qq_21439395/article/details/80412688

交流QQ: 824203453

相關推薦

sparkstreamingconsumer消費kafka問題

交流QQ: 824203453   版本: sparkstreaming 2.2      kafka  0.10 sparkstreaming 整合kafka後(Direct模式),同一個groupid下的多個spark-streaming consumer

Streaming消費kafka:java.lang.NoClassDefFoundError: net/jpountz/util/SafeUtils

1.問題描述 Streaming消費kafka訊息,報以下錯誤: 18/11/22 18:14:55 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 9) java.lang.NoClassDefFound

java 連接Kafkajava.nio.channels.ClosedChannelExcep

img 由於 客戶端 客戶端訪問 主機名 ava 技術分享 gin color Java 客戶端連接Kafka報如下錯誤 java.nio.channels.ClosedChannelExcep 是由於Kafka server.properties中的advertised.

flume連線kafka Excessively large list allocation request detected: 1818583411 items! Connection clos

1.問題描述: flume然後sinkTokafka,flume是1.6.0然後kafka是0.8.2.1。flume和kafka自測都沒有問題,但是flumeTokafka就會: 2018-11-21 01:09:16,119 (lifecycleSupervisor-1-1) [INFO

kafkaClosedChannelException

● 開啟kafka生產者寫入資料時報錯 報錯如下: [[email protected] bin]# ./kafka-console-producer.sh --broker-list 192.168.0.21:9092 --topic ws [201

tensorflow1.12 GPU協同訓練tensorflow.python.framework.errors_impl.NotFoundError: libnccl.so.2

       tensroflow為了提高多模型訓練速度,需要多個GPU同時工作,而且我們一般使用的工作站都是8塊tesla K80,如果能將8塊顯示卡的計算力充分利用起來,將會大大提高模型訓練的速度,縮短模型訓練時間。        這幾天看到tensorflow的mor

遇到問題--mongodb---個criteria.orOperator或者個criteria.andOperator

報錯在使用java驅動構造mongodb查詢過程中發現多個criteria.orOperator或者多個criteria.andOperator報錯。報錯資訊如下:you can't add a second '$or' expression specified $or 解決

python執行緒操作:No handlers could be found for logger "websocket"

用python模擬對講機線上工具,在伺服器上成功連線的連線上限是1020個,需要確定遇到這個瓶頸的原因 1.連線失敗時的報錯資訊為 No handlers could be found for logger "websocket" 解決方法: import logging

storm整合kafkaorg.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for ...

寫了一個storm整合kfaka的程式,kafkaSpout消費的資料作為storm的資料來源。執行報錯如下: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.Keep

maven模組打包編譯 無法載入core包

1.偶爾會出現這樣的情況,之前沒有特別的在乎,後來發現不得不解決了,找了許久才發現了 2.解決如下:註釋父級的maven-plugin外掛,只保留view子模組的maven-plugin外掛,這樣就不會報錯了,具體原理還不太明白,有知道的可以給我留言哈~

kafka

1.Error while executing topic command : replication factor: 1 larger than available brokers: 017/04/06 09:47:15 ERROR admin.TopicCommand$

Keras+Django次load model

最近在做一個文字分類工具,功能包括上傳樣本,使用樣本訓練model,save訓練好的model並且使用model對文字進行分類。用到框架有Keras和Django。訓練階段將訓練好的模型儲存到指定目錄。

vue中使用axios.all() 方法發起個請求控制檯的解決方法

今天在專案中使用axios時發現axios.all() 方法可以執行但是控制檯報錯,後來在論壇中看到是由於axios.all() 方法並沒有掛載到 axios物件上,需要我們手動去新增 == 只需要在

kafka 建立消費者 consumer zookeeper is not a recognized option

kafka 建立消費者報錯 consumer zookeeper is not a recognized option 2018年08月08日 22:30:48 csdn_sunlighting 閱讀數:9064 在做kafka測試的時候,使用命令bin/kafka-co

SparkStreaming程式執行SparkStreaming-Kafka- Couldn't find leaders for Set

 這個異常意思是Spark找不到partition的Leader。檢視監控後發現,在異常發生的時間點,有一個Broker掛掉了。可是對應Topic的replica設定的2,就算掛掉一個,應該有replica頂上啊。後來發現,這是由於存在Partition的Replica沒有跟Leader保持同步更新,也就是

java客戶端進行kafka測試時,生產者不能生產資訊,消費者不能消費資訊,而且沒有,但是在虛擬機器中沒有任何錯誤

我的kafka在虛擬機器中都正常啟動,而且在虛擬機器中生產和消費資訊都沒有錯誤,但是使用window客戶端生產和消費時不成功,而且也不報任何錯誤,生產和消費訊息時都停留一段時間然後結束。以下是我提供的一種思路,但不一定適合所有這樣的錯誤:1.先看一下虛擬機器系統中kafka安

模組專案spring整合dubbo,服務方正常啟動,消費方啟動解決記錄

服務方寫法: <dubbo:application name="babasport-service-product"/> <!--2.設定註冊中心的地址 zookeeper--> <!-- 叢集式配置:&l

wampserver 安裝個php版本號之關鍵問題

amp rec released detail receive duang 之前 報錯 onf 近期喜歡上用wampserver來搭建php本地執行環境 主要是一鍵安裝 特easy 之前一直用的是 appserv 也挺好用的 用了wamp後 才發現wamp

yum :保護庫版本

yum  報錯在執行yum update更新時,報錯“保護多庫版本”解決:yum update --setopt=protected_multilib=false本文出自 “一萬年太久,只爭朝夕” 博客,請務必保留此出處http://zengwj1949.blog.51cto.com/10747365/196

MyBatis個接口參數:Available parameters are [0, 1, param1, param2], 及解決方法

pan 解決 而且 crm ger int mybatis 添加 為什麽 1. sql語句如下:  SELECT * FROM tb_crm_user WHERE id = #{userId, jdbcType=INTEGER} AND