kafka: java生產者往kafka topic傳送資料傳送失敗
阿新 • • 發佈:2019-01-02
在使用java寫的kafka生產者把資料傳送給kafka的topic時,遇到傳送三次且資料無法傳送成功的Error.
2016-09-08 10:32:49,695- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 0 for 1 topic(s) Set(test09026)
2016-09-08 10:32:49,936- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:49,962- [INFO] Disconnecting from 172.17 .17.98:9092
2016-09-08 10:32:51,056- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:51,057- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:51,060- [WARN] Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply $mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:51,084- [INFO] Back off for 100 ms before retrying send. Remaining retries = 3
2016-09-08 10:32:51,186- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 3 for 1 topic(s) Set(test09026)
2016-09-08 10:32:51,188- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:51,194- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:51,195- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:52,207- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:52,207- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:52,207- [WARN] Failed to send producer request with correlation id 5 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:52,210- [INFO] Back off for 100 ms before retrying send. Remaining retries = 2
2016-09-08 10:32:52,311- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 6 for 1 topic(s) Set(test09026)
2016-09-08 10:32:52,313- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:52,321- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:52,339- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:53,346- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:53,346- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:53,347- [WARN] Failed to send producer request with correlation id 8 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:53,348- [INFO] Back off for 100 ms before retrying send. Remaining retries = 1
2016-09-08 10:32:53,450- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 9 for 1 topic(s) Set(test09026)
2016-09-08 10:32:53,453- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:53,456- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:53,457- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:54,462- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:54,462- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:54,466- [WARN] Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:54,468- [INFO] Back off for 100 ms before retrying send. Remaining retries = 0
2016-09-08 10:32:54,582- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 12 for 1 topic(s) Set(test09026)
2016-09-08 10:32:54,585- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:54,588- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:54,588- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:54,591- [ERROR] Failed to send requests for topics test09026 with correlation ids in [0,12]
Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at ProducerDemo.run(ProducerDemo.java:32)
我是從window伺服器往linux伺服器傳送內容的。
解決方案:
1.在windows的host檔案中加上linux伺服器ip和hostname的對映。
2.在server.properteies中修改如下配置
#advertised.listeners=PLAINTEXT://your.host.name:9092
修改成
advertised.listeners=PLAINTEXT://172.16.128.208:9092