Kafka SASL/PLAIN 環境構建(Docker版)
前言
近來,老有朋友詢問Kafka SASL配置 & Demo測試
這篇文章的相關內容。近來無事,準備將所以的元件打成映象。此處,便講的為 Kafka SASL/PLAIN
許可權驗證映象模組的構建。
First Try
由於Docker化的Kafka存在一定的缺陷和問題。第一次嘗試的目的是在Docker內部將整個認證流程打通。操作細節概不細敘,按照上面的文章。配置ZK、配置Kafka、建立Topic、設定許可權即可。
### 使用映象 docker pull yanxml/kafka-acls:0.0.2 docker run yanxml/kafka-acls:0.0.2 /bin/bash # 啟動zk source /etc/profile /opt/apps/zookeeper/bin/start.sh # 啟動一個節點(暫時未完工 先做一個節點) /opt/apps/kafka/kafka1/app/bin/kafka-server-start.sh /opt/apps/kafka/kafka1/app/config/server.properties ### 許可權操作(詳細見上面2篇文章) ### 產生日誌 [
[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:9093 --topic sean-security --producer.config ../config/consumer.properties ### 消費 [[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --new-consumer
Second Try (To be continue)
設定對外對映的監聽埠,更新映象。打通測試用例。
主要設定的對映配置:
listeners=SASL_PLAINTEXT://:29092
advertised.linsteners=SASL_PLAINTEXT://192.168.1.16:29092
上述問題,我已經打成docker映象,你們去kafka-aclas,即可獲得。
### 使用映象 docker run -d -p 2181:2181 -p 2182:2182 -p 2183:2183 -p 29092:29092 -p 29093:29093 -p 29094:29094 --env advertised_host_name=192.168.1.16 yanxml/kafka-acls:0.0.8 /opt/apps/bin/start.sh <注意 advertised_host_name 要改成本地ip 並且不要使用localhost 和 127.0.0.1 > ### consumer & provider docker exec -it <container-id> /bin/bash source /etc/profile cd /opt/apps/kafka/kafka1/app/bin ### consumer ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --from-beginning ### provider ./kafka-console-producer.sh --broker-list localhost:29092 --topic sean-security --producer.config ../config/producer.properties ### create Topic & acls ./kafka-topics.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.3:2183 --describe --topic test ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --replication-factor 1 --partitions 1 --topic sean-security ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --list --topic sean-security
執行demos
除了上述進入docker映象使用kafka-console-producer.sh
指令碼使用,還可以在外部通過api使用。映象內內建建立了一個sean-security
的Topic,可以直接使用。Demos地址還是這個git地址https://github.com/SeanYanxml/bigdata。
執行後預期結果如下:
# provider
16:32:16,663 INFO ProducerConfig:223 - ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [192.168.1.16:29092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = PLAIN
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
16:32:17,037 INFO AbstractLogin:53 - Successfully logged in.
16:32:17,094 INFO AppInfoParser:109 - Kafka version : 1.0.0
16:32:17,095 INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:17 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:18 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:19 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:20 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:21 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:22 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:23 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:24 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:25 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:26 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:27 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:28 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:29 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:30 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:31 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:32 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:33 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:34 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:35 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:36 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:37 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:38 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:39 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:40 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:41 CST 2018 Data: Hello
ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:42 CST 2018 Data: Hello
#consumer
16:32:19,979 INFO ConsumerConfig:223 - ConsumerConfig values:
auto.commit.interval.ms = 1000
auto.offset.reset = latest
bootstrap.servers = [192.168.1.16:29092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = PLAIN
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
16:32:20,196 INFO AbstractLogin:53 - Successfully logged in.
16:32:20,285 INFO AppInfoParser:109 - Kafka version : 1.0.0
16:32:20,285 INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d
16:32:50,582 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null)
16:33:20,594 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions []
16:33:20,594 INFO AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=test] (Re-)joining group
16:33:20,731 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 1
16:33:20,733 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [sean-security-0]
16:33:50,769 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Marking the coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) dead
16:33:50,811 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null)
16:33:50,812 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Marking the coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) dead
16:33:50,926 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null)
16:34:20,971 ERROR ConsumerCoordinator:301 - [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition sean-security-0 at offset 46: The coordinator is not aware of this member.
16:34:20,972 WARN ConsumerCoordinator:246 - [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=46, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
16:34:20,973 WARN ConsumerCoordinator:246 - [Consumer clientId=consumer-1, groupId=test] Synchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=46, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
16:34:20,975 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions [sean-security-0]
16:34:20,976 INFO AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=test] (Re-)joining group
16:34:20,988 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 3
16:34:20,989 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [sean-security-0]
offset = 76, key = null, value = Hello
offset = 77, key = null, value = Hello
offset = 78, key = null, value = Hello
offset = 79, key = null, value = Hello
PS: 前面會出現一段時間連結不上的情況,我不知道是因為卡頓導致還是其它原因。不過,在生產環境,還是不建議使用docker來處理這個量級的資料。因為很可能就會卡死。需要後期的優化和網路的優化等等。不過本地的除錯還是可以的。
Q&A
- 部分時間連結不上,感覺是因為本地配置太差,造成一定的卡頓。
[[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:29092 --topic sean-security --producer.config ../config/producer.properties
>ll
>pp
>oo
>[2018-11-22 08:18:37,328] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time
[2018-11-22 08:18:37,351] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time
[2018-11-22 08:18:37,355] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time
[[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --from-beginning
ddasd
dada
dasd
adda
da
adadasd
dasda
dasda
adasd
dasda
dasd
dasd
[2018-11-22 08:23:11,471] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-80036] Offset commit failed on partition sean-security-0 at offset 0: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-11-22 08:23:11,476] WARN [Consumer clientId=consumer-1, groupId=console-consumer-80036] Asynchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-11-22 08:23:11,477] WARN [Consumer clientId=consumer-1, groupId=console-consumer-80036] Synchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=12, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
ddasd
- Kafka TOPIC_AUTHORIZATION_FAILED
- linsteners 和 advertised.linsteners配置錯誤
# 表現為broker連結不上
[2018-11-21 04:58:55,550] WARN [Controller-0-to-broker-0-send-thread]: Controller 0's connection to broker 192.168.1.16:29092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to 192.168.1.16:29092 (id: 0 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:269)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:223)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2018-11-21 04:58:55,550] WARN [Controller-0-to-broker-0-send-thread]: Controller 0's connection to broker 192.168.1.16:29092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to 192.168.1.16:29092 (id: 0 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:269)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:223)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
- Connection to node 0 could not be established. Broker may not be available.
同上
- 目錄衝突 2個broker共享目錄
[2018-11-21 10:25:06,236] FATAL [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Failed to acquire lock on file .lock in /opt/apps/kafka/kafka1/data. A Kafka instance in another process or thread is using this directory.
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:203)
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:199)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at kafka.log.LogManager.lockLogDirs(LogManager.scala:199)
at kafka.log.LogManager.<init>(LogManager.scala:85)
at kafka.log.LogManager$.apply(LogManager.scala:799)
at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:92)
at kafka.Kafka.main(Kafka.scala)
- linsteners配置錯誤
[2018-11-21 05:14:34,098] FATAL (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are PLAINTEXT
at scala.Predef$.require(Predef.scala:224)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1194)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
[2018-11-22 05:56:40,693] FATAL (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different name, listeners: SASL_PLAINTEXT://:9092,SASL_PLAINTEXT://:29092
at scala.Predef$.require(Predef.scala:224)
at kafka.utils.CoreUtils$.validate$1(CoreUtils.scala:270)
at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:280)
at kafka.server.KafkaConfig$$anonfun$getListeners$1.apply(KafkaConfig.scala:1120)
at kafka.server.KafkaConfig$$anonfun$getListeners$1.apply(KafkaConfig.scala:1119)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaConfig.getListeners(KafkaConfig.scala:1119)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1089)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
15:02:21,358 INFO VerifiableProperties:72 - Verifying properties
15:02:21,403 INFO VerifiableProperties:72 - Property metadata.broker.list is overridden to 127.0.0.1:9092
15:02:21,406 INFO VerifiableProperties:72 - Property serializer.class is overridden to kafka.serializer.StringEncoder
15:02:21,406 WARN VerifiableProperties:87 - Property zookeeper.connect is not valid
15:02:21,629 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 0 for 1 topic(s) Set(test3)
15:02:21,730 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:21,805 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:21,812 WARN BrokerPartitionInfo:87 - Error while fetching metadata [{TopicMetadata for topic test3 ->
No partition metadata for topic test3 due to org.apache.kafka.common.errors.LeaderNotAvailableException}] for topic [test3]: class org.apache.kafka.common.errors.LeaderNotAvailableException
15:02:21,816 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 1 for 1 topic(s) Set(test3)
15:02:21,817 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:21,853 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:21,854 WARN BrokerPartitionInfo:87 - Error while fetching metadata [{TopicMetadata for topic test3 ->
No partition metadata for topic test3 due to org.apache.kafka.common.errors.LeaderNotAvailableException}] for topic [test3]: class org.apache.kafka.common.errors.LeaderNotAvailableException
15:02:21,855 ERROR DefaultEventHandler:101 - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test3
15:02:21,856 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 3
15:02:21,958 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 2 for 1 topic(s) Set(test3)
15:02:21,959 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:21,973 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,042 INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing
15:02:52,043 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,049 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 4 to broker 0 with data for partitions test3-0
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
15:02:52,052 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 2
15:02:52,156 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 5 for 1 topic(s) Set(test3)
15:02:52,156 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:52,166 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,167 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,169 INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing
15:02:52,169 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,170 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 7 to broker 0 with data for partitions test3-0
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
15:02:52,171 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 1
15:02:52,273 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 8 for 1 topic(s) Set(test3)
15:02:52,273 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:52,287 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,287 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,288 INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing
15:02:52,289 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,289 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 10 to broker 0 with data for partitions test3-0
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
15:02:52,290 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 0
15:02:52,394 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 11 for 1 topic(s) Set(test3)
15:02:52,395 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:52,403 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,404 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,405 ERROR DefaultEventHandler:101 - Failed to send requests for topics test3 with correlation ids in [0,11]
Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:98)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:12,770 INFO VerifiableProperties:72 - Verifying properties
14:05:12,824 INFO VerifiableProperties:72 - Property metadata.broker.list is overridden to 192.168.1.16:9092,192.168.1.16:9093,192.168.1.16:9094
14:05:12,830 INFO VerifiableProperties:72 - Property serializer.class is overridden to kafka.serializer.StringEncoder
14:05:12,832 WARN VerifiableProperties:87 - Property zookeeper.connect is not valid
14:05:13,103 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 0 for 1 topic(s) Set(test)
14:05:13,204 INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing
14:05:13,251 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092
14:05:43,343 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,344 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,351 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 2 to broker 1 with data for partitions test-0
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,354 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 3
14:05:43,463 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(2,192.168.1.16,9094) with correlation id 3 for 1 topic(s) Set(test)
14:05:43,464 INFO SyncProducer:72 - Connected to 192.168.1.16:9094 for producing
14:05:43,467 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,469 WARN ClientUtils$:93 - Fetching topic metadata with correlation id 3 for topics [Set(test)] from broker [BrokerEndPoint(2,192.168.1.16,9094)] failed
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:86)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:82)
at kafka.utils.Logging$class.swallowError(Logging.scala:110)
at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:86)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,470 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,474 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(1,192.168.1.16,9093) with correlation id 3 for 1 topic(s) Set(test)
14:05:43,474 INFO SyncProducer:72 - Connected to 192.168.1.16:9093 for producing
14:05:43,501 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9093
14:05:43,501 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,503 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,503 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,504 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 5 to broker 1 with data for partitions test-0
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,505 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 2
14:05:43,609 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(2,192.168.1.16,9094) with correlation id 6 for 1 topic(s) Set(test)
14:05:43,611 INFO SyncProducer:72 - Connected to 192.168.1.16:9094 for producing
14:05:43,617 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,617 WARN ClientUtils$:93 - Fetching topic metadata with correlation id 6 for topics [Set(test)] from broker [BrokerEndPoint(2,192.168.1.16,9094)] failed
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:86)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:82)
at kafka.utils.Logging$class.swallowError(Logging.scala:110)
at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:86)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,618 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,618 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 6 for 1 topic(s) Set(test)
14:05:43,619 INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing
14:05:43,635 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092
14:05:43,636 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,638 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,638 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,639 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 8 to broker 1 with data for partitions test-2
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,640 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 1
14:05:43,743 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 9 for 1 topic(s) Set(test)
14:05:43,744 INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing
14:05:43,763 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092
14:05:43,763 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,764 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,765 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,766 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 11 to broker 1 with data for partitions test-2
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,767 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 0
14:05:43,870 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(1,192.168.1.16,9093) with correlation id 12 for 1 topic(s) Set(test)
14:05:43,870 INFO SyncProducer:72 - Connected to 192.168.1.16:9093 for producing
14:05:43,877 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9093
14:05:43,877 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,878 ERROR DefaultEventHandler:101 - Failed to send requests for topics test with correlation ids in [0,12]
Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:98)
at kafka.producer.Producer.send(Producer.scala:78)
at kafka.javaapi.producer.Producer.send(Producer.scala:35)
at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
Others
完成的基礎配置
#advertised.host.name=192.168.1.16
#advertised.port=29092
advertised.listeners=SASL_PLAINTEXT://192.168.1.16:29092
listeners=SASL_PLAINTEXT://:29092
#listeners=SASL_PLAINTEXT://192.168.1.16:29092
#listeners=SASL_PLAINTEXT://ip(127.0.0.1):port(9092)
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept all the users to use it.
allow.everyone.if.no.acl.found=true
super.users=User:admin;User:alice
Reference
ACLS
相關推薦
Kafka SASL/PLAIN 環境構建(Docker版)
前言 近來,老有朋友詢問Kafka SASL配置 & Demo測試 這篇文章的相關內容。近來無事,準備將所以的元件打成映象。此處,便講的為 Kafka SASL/PLAIN許可權驗證映象模組的構建。 First Try 由於Docker化的Kafka
(六)構建Docker私有倉庫、Gitlab倉庫和持續集成環境
持續集成 gitlab 私有倉庫 docker registry 環境說明IP功能eth0:192.168.124.139eth1:172.16.100.10Docker私有倉庫、Gitlab、持續集成eth0:192.168.124.138eth1:172.16.100.20Docker服務
搭建Kafka運行環境-Mac版
tgz topic 我們 控制 能力 conf get 日誌文件 message 停止kafka服務: kafka_2.12-0.10.2.1> bin/kafka-server-stop.sh kafka_2.12-0.10.2.1> bin/zookeepe
基於kubernetes構建Docker集群環境實戰
基於kubernetes構建Dockerkubernetes是google公司基於docker所做的一個分布式集群,有以下主件組成 etcd: 高可用存儲共享配置和服務發現,作為與minion機器上的flannel配套使用,作用是使每臺 minion上運行的docker擁有不同的ip段,最終目的是使不同mi
構建Docker鏡像實戰之構建Tomcat9.0鏡像(RPM一鍵安裝Java環境)
top spa tst wall 生成 png 一鍵 emp 指定 構建Docker鏡像實戰之構建Tomcat9.0鏡像(RPM一鍵安裝Java環境) tomcat是一個免費開源的輕量級web服務器,在中小型企和並發訪問量不高的場合普遍使用,是開發和調試JSP程序的首選。下
使用compose構建Docker多應用環境(Nginx+PHP+MySQL+Redis)
首先安裝compose #下載並移動到/usr/local/bin目錄下 curl -L "https://github.com/docker/compose/releases/download/1.23.1/docker-compose-$(uname -s)-$(uname -m)" \
kafka使用筆記-基於SASL認證的kafka偽叢集環境搭建及測試
繼 搭建免認證kafka單機 之後由於業務需要,搭建了基於SASL認證的kafka偽叢集環境。本次同樣使用的是 kafka_2.10-0.10.1.0.tgz 版本的kafka,整合zookeeper,只需要對此進行配置即可,無需單獨安裝。 一、準備工作 1、環境:ubuntu1
maven構建docker映象三部曲之一:準備環境
更簡單的部署 之前的實戰中,如果要在docker環境中執行java的web工程,通常先執行一個支援線上部署的tomcat容器,然後通過mavenn的tomcat7-maven-plugin外掛把工程線上部署到tomcat中,有沒有更簡便的方法呢?有,利
搭建秒殺環境:Docker + Nodejs + Kafka + Redis + MySQL
Java高階架構資訊 2018-12-18 19:52:26 秒殺活動可以說在網際網路上隨處可見,從12306搶票,到聚划算搶購,我們生活的方方面面都可以看到秒殺的身影。秒殺的架構設計也是對於一個架構師架構設計能力的一次考驗。本文的目的並不在於提供一個可以直接落地的設計方案,而是意在提
kafka叢集使用 SASL/PLAIN 認證
kafka叢集使用 SASL/PLAIN 認證 SASL/PLAIN 是一種簡單的 username/password 認證機制, 通常與 TLS 加密一起使用, 用於實現安全認證. Kafka 提供了一個預設的 SASL/PLAIN 實現, 可以做擴充套件後在生產環境使用. usern
Flume+Kafka環境構建和實戰
1. 準備工作 apache上下載 apache-flume-1.7.0, apache-kafka_2.12-0.11, apache-zookeeper-3.4.9 下載後分別解壓至/home/hadoop/bigdata並重命名目錄為flume, kafka, zk
二、基於kubernetes構建Docker叢集環境實戰
kubernetes是google公司基於docker所做的一個分散式叢集,有以下主件組成 etcd: 高可用儲存共享配置和服務發現,作為與minion機器上的flannel配套使用,作用是使每臺 minion上執行的docker擁有不同的ip段,最終目的是使不同minion上正在
Kafka JAAS Plain SASL 安全認證配置
1. 配置zookeeper 1. 為zookeeper新增 jaas 檔案 zookeeper { org.apache.kafka.common.security.plain.Pl
阿里雲構建Kafka單機叢集環境
簡介 在一臺ECS阿里雲伺服器上構建Kafa單個叢集環境需要如下的幾個步驟: 伺服器環境 JDK的安裝 ZooKeeper的安裝 Kafka的安裝 1. 伺服器環境 CPU: 1核 記憶體: 2048 MB (I/O優化) 1Mbp
kafka原始碼閱讀環境搭建(gradle構建工具、idea)
1.安裝gradle工具,下載地址:https://gradle.org/next-steps/?version=4.7&format=all2.配置環境變數,GRADLE_HOME,path,注意:要在系統變數中配置3.cmd進入dos視窗,gradle -v檢視版
演算法第四版eclipse環境構建,BinarySearch執行
該文章是基於eclipse軟體,eclipse下載網上自查, 在網上也找了很多關於此類問題的文章,也對我解決問題起到了非常大的作用 如果有不能搭建成功的,歡迎聯絡qq1136075279,共同學習,共同進步 一、匯入外部庫 這本書有它自己的外部庫,需要匯入
Kafka安全認證SASL/PLAIN,並和springBoot整合
kafka_2.11-1.1.0.tgz、zookeeper-3.4.10.tar.gz版本 1. kafka配置 kafka解壓目錄下工作 # 1.新建配置檔案 vi ./config/kafka_server_jaas.conf # 檔案內
centos環境下docker搭建jira-7.11.1破解版
基於centos7環境下搭建一個jira7.11.1最新版。破解補丁需要的請在下面留言。 pull docker 映象: jira:7.11.1(目前的最新版本) mysql:5.7 docker pull cptactionhank/atlassian-jir
用前端姿勢玩docker【三】基於nvm的前端環境構建技巧
### 前言 * 安裝docker啥的就不說了,這裡重點強調一下,docker的環境問題。本人的環境: 虛擬機器centos => docker => NAT => container 因為需要不斷更換網路環境,如若使用橋接,需要不斷調整網絡卡的IP,使虛機與宿主機保持在同一網段,所以乾脆用了NAT,此處
kafka SASL認證介紹及自定義SASL PLAIN認證功能
[toc] 使用者認證功能,是一個成熟元件不可或缺的功能。在0.9版本以前kafka是沒有使用者認證模組的(或者說只有SSL),好在kafka0.9版本以後逐漸釋出了多種使用者認證功能,彌補了這一缺陷(這裡僅介紹SASL)。 本篇會先介紹當前kafka的四種認證方式,然後過一遍部署SASL/PLAIN認證