1. 程式人生 > >kafka遇到問題一:釋出者釋出訊息拋異常org.apache.kafka.clients.producer.internals.ErrorLoggingCallback

kafka遇到問題一:釋出者釋出訊息拋異常org.apache.kafka.clients.producer.internals.ErrorLoggingCallback

最近一個專案中使用到kafka作為分散式訊息系統,於是網上找了些資料學習下kafka,在學習過程中遇到了如題的異常,google了下,找到了解決方法

實驗環境

Tips:本機Mac系統

服務

  • zookeeper :單節點 程序ID:27318
  • kafka服務:叢集 程序ID:27329、27345、27342
  • 訊息釋出者(控制檯模式):程序ID:27373
  • 訊息訂閱者(控制檯模式):程序ID:27376

如:

kuangaiyongs-MacBook-Pro:bin kay$ jps -l
27376 kafka.tools.ConsoleConsumer
27329 kafka.Kafka 27345 kafka.Kafka 27397 sun.tools.jps.Jps 27318 org.apache.zookeeper.server.quorum.QuorumPeerMain 27373 kafka.tools.ConsoleProducer 25294 27342 kafka.Kafka

topic資訊

kuangaiyongs-MacBook-Pro:bin kay$ ./kafka-topics.sh --zookeeper localhost:2181 --describe
Topic:Multibrokerapplication
PartitionCount:1 ReplicationFactor:3 Configs: Topic: Multibrokerapplication Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2 Topic:hello-kafka PartitionCount:1 ReplicationFactor:1 Configs: Topic: hello-kafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:my-replicated-topic
PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,1,2 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

啟動釋出者控制檯

kuangaiyongs-MacBook-Pro:bin kay$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

my first message

在控制檯輸入my first message,丟擲如下

[2018-06-25 09:52:05,348] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2018-06-25 09:52:05,349] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2018-06-25 09:52:05,349] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2018-06-25 09:52:05,349] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

啟動訂閱者控制檯

kuangaiyongs-MacBook-Pro:bin kay$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --from-beginning --topic test

確實也不能收到釋出者發出的訊息,並有如下異常丟擲:

[2018-06-25 09:52:29,920] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,10.200.20.95,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

原因排查

檢查服務啟動情況

正常

檢查埠

正常

檢查配置

正常

檢查輸入是否有誤

正常

topic

按照這個解決方法,檢查了三臺kafka服務的配置,配置原本是這樣的:

listeners=PLAINTEXT://:9092
# The port the socket server listens on
#port=9092
listeners=PLAINTEXT://:9093
# The port the socket server listens on
port=9093
listeners=PLAINTEXT://:9094
# The port the socket server listens on
port=9094

按照解決方案修改配置後,重啟各服務,驗證發現異常沒有了,釋出者釋出的訊息,訂閱者可以正常收到

附上修改後的部分配置內容:

listeners=PLAINTEXT://0.0.0.0:9092
# The port the socket server listens on
port=9092
listeners=PLAINTEXT://0.0.0.0:9093
# The port the socket server listens on
port=9093
listeners=PLAINTEXT://0.0.0.0:9094
# The port the socket server listens on
port=9094

原因分析

暫未分析為什麼如此配置就OK了

總結

為了避免不必要的報錯,如下兩項配置內容建議按照這個來:

  • 監聽配置加上對應的IP地址

  • port埠配置項前的#號去掉