1. 程式人生 > >Kafka SASL/PLAIN 環境構建(Docker版)

Kafka SASL/PLAIN 環境構建(Docker版)

前言

近來,老有朋友詢問Kafka SASL配置 & Demo測試
這篇文章的相關內容。近來無事,準備將所以的元件打成映象。此處,便講的為 Kafka SASL/PLAIN許可權驗證映象模組的構建。

First Try

由於Docker化的Kafka存在一定的缺陷和問題。第一次嘗試的目的是在Docker內部將整個認證流程打通。操作細節概不細敘,按照上面的文章。配置ZK、配置Kafka、建立Topic、設定許可權即可。

###  使用映象
docker pull yanxml/kafka-acls:0.0.2
docker run yanxml/kafka-acls:0.0.2 /bin/bash
# 啟動zk
source /etc/profile
/opt/apps/zookeeper/bin/start.sh
# 啟動一個節點(暫時未完工 先做一個節點)
/opt/apps/kafka/kafka1/app/bin/kafka-server-start.sh /opt/apps/kafka/kafka1/app/config/server.properties

### 許可權操作(詳細見上面2篇文章)
### 產生日誌
[
[email protected]
bin]# ./kafka-console-producer.sh --broker-list localhost:9093 --topic sean-security --producer.config ../config/consumer.properties ### 消費 [[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --new-consumer

Second Try (To be continue)

設定對外對映的監聽埠,更新映象。打通測試用例。
主要設定的對映配置:

listeners=SASL_PLAINTEXT://:29092
advertised.linsteners=SASL_PLAINTEXT://192.168.1.16:29092

上述問題,我已經打成docker映象,你們去kafka-aclas,即可獲得。

### 使用映象
docker run -d -p 2181:2181 -p 2182:2182 -p 2183:2183  -p 29092:29092 -p 29093:29093 -p 29094:29094  --env advertised_host_name=192.168.1.16 yanxml/kafka-acls:0.0.8  /opt/apps/bin/start.sh

<注意 advertised_host_name 要改成本地ip 並且不要使用localhost 和 127.0.0.1 >

### consumer & provider
docker exec -it <container-id> /bin/bash
source /etc/profile
cd /opt/apps/kafka/kafka1/app/bin

### consumer
./kafka-console-consumer.sh  --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --from-beginning

### provider
./kafka-console-producer.sh --broker-list localhost:29092 --topic sean-security --producer.config ../config/producer.properties

### create Topic & acls

./kafka-topics.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.3:2183 --describe --topic test

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --replication-factor 1 --partitions 1 --topic sean-security

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --list --topic sean-security

執行demos

除了上述進入docker映象使用kafka-console-producer.sh指令碼使用,還可以在外部通過api使用。映象內內建建立了一個sean-security的Topic,可以直接使用。Demos地址還是這個git地址https://github.com/SeanYanxml/bigdata

執行後預期結果如下:

# provider
16:32:16,663  INFO ProducerConfig:223 - ProducerConfig values: 
	acks = all
	batch.size = 16384
	bootstrap.servers = [192.168.1.16:29092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 1
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = PLAIN
	security.protocol = SASL_PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

16:32:17,037  INFO AbstractLogin:53 - Successfully logged in.
16:32:17,094  INFO AppInfoParser:109 - Kafka version : 1.0.0
16:32:17,095  INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:17 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:18 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:19 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:20 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:21 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:22 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:23 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:24 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:25 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:26 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:27 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:28 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:29 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:30 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:31 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:32 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:33 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:34 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:35 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:36 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:37 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:38 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:39 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:40 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:41 CST 2018 Data: Hello
 ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:42 CST 2018 Data: Hello

#consumer

16:32:19,979  INFO ConsumerConfig:223 - ConsumerConfig values: 
	auto.commit.interval.ms = 1000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.1.16:29092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = test
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = PLAIN
	security.protocol = SASL_PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

16:32:20,196  INFO AbstractLogin:53 - Successfully logged in.
16:32:20,285  INFO AppInfoParser:109 - Kafka version : 1.0.0
16:32:20,285  INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d
16:32:50,582  INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null)
16:33:20,594  INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions []
16:33:20,594  INFO AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=test] (Re-)joining group
16:33:20,731  INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 1
16:33:20,733  INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [sean-security-0]
16:33:50,769  INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Marking the coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) dead
16:33:50,811  INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null)
16:33:50,812  INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Marking the coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) dead
16:33:50,926  INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null)
16:34:20,971 ERROR ConsumerCoordinator:301 - [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition sean-security-0 at offset 46: The coordinator is not aware of this member.
16:34:20,972  WARN ConsumerCoordinator:246 - [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=46, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
16:34:20,973  WARN ConsumerCoordinator:246 - [Consumer clientId=consumer-1, groupId=test] Synchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=46, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
16:34:20,975  INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions [sean-security-0]
16:34:20,976  INFO AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=test] (Re-)joining group
16:34:20,988  INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 3
16:34:20,989  INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [sean-security-0]
offset = 76, key = null, value = Hello
offset = 77, key = null, value = Hello
offset = 78, key = null, value = Hello
offset = 79, key = null, value = Hello

PS: 前面會出現一段時間連結不上的情況,我不知道是因為卡頓導致還是其它原因。不過,在生產環境,還是不建議使用docker來處理這個量級的資料。因為很可能就會卡死。需要後期的優化和網路的優化等等。不過本地的除錯還是可以的。

Q&A

  • 部分時間連結不上,感覺是因為本地配置太差,造成一定的卡頓。

[[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:29092 --topic sean-security --producer.config ../config/producer.properties
>ll
>pp
>oo
>[2018-11-22 08:18:37,328] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time
[2018-11-22 08:18:37,351] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time
[2018-11-22 08:18:37,355] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time



[[email protected] bin]# ./kafka-console-consumer.sh  --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --from-beginning
ddasd
dada
dasd
adda
da
adadasd
dasda
dasda
adasd
dasda
dasd
dasd
[2018-11-22 08:23:11,471] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-80036] Offset commit failed on partition sean-security-0 at offset 0: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-11-22 08:23:11,476] WARN [Consumer clientId=consumer-1, groupId=console-consumer-80036] Asynchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-11-22 08:23:11,477] WARN [Consumer clientId=consumer-1, groupId=console-consumer-80036] Synchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=12, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
ddasd

  • Kafka TOPIC_AUTHORIZATION_FAILED
  • linsteners 和 advertised.linsteners配置錯誤
# 表現為broker連結不上
[2018-11-21 04:58:55,550] WARN [Controller-0-to-broker-0-send-thread]: Controller 0's connection to broker 192.168.1.16:29092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to 192.168.1.16:29092 (id: 0 rack: null) failed.
	at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
	at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:269)
	at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:223)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)


	[2018-11-21 04:58:55,550] WARN [Controller-0-to-broker-0-send-thread]: Controller 0's connection to broker 192.168.1.16:29092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to 192.168.1.16:29092 (id: 0 rack: null) failed.
	at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
	at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:269)
	at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:223)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)

  • Connection to node 0 could not be established. Broker may not be available.

同上

  • 目錄衝突 2個broker共享目錄
[2018-11-21 10:25:06,236] FATAL [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Failed to acquire lock on file .lock in /opt/apps/kafka/kafka1/data. A Kafka instance in another process or thread is using this directory.
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:203)
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:199)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at kafka.log.LogManager.lockLogDirs(LogManager.scala:199)
	at kafka.log.LogManager.<init>(LogManager.scala:85)
	at kafka.log.LogManager$.apply(LogManager.scala:799)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
	at kafka.Kafka$.main(Kafka.scala:92)
	at kafka.Kafka.main(Kafka.scala)
  • linsteners配置錯誤
[2018-11-21 05:14:34,098] FATAL  (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are PLAINTEXT
	at scala.Predef$.require(Predef.scala:224)
	at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1194)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
	at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
	at kafka.Kafka$.main(Kafka.scala:82)
	at kafka.Kafka.main(Kafka.scala)

[2018-11-22 05:56:40,693] FATAL  (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different name, listeners: SASL_PLAINTEXT://:9092,SASL_PLAINTEXT://:29092
	at scala.Predef$.require(Predef.scala:224)
	at kafka.utils.CoreUtils$.validate$1(CoreUtils.scala:270)
	at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:280)
	at kafka.server.KafkaConfig$$anonfun$getListeners$1.apply(KafkaConfig.scala:1120)
	at kafka.server.KafkaConfig$$anonfun$getListeners$1.apply(KafkaConfig.scala:1119)
	at scala.Option.map(Option.scala:146)
	at kafka.server.KafkaConfig.getListeners(KafkaConfig.scala:1119)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1089)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
	at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
	at kafka.Kafka$.main(Kafka.scala:82)
	at kafka.Kafka.main(Kafka.scala)
	
	
15:02:21,358  INFO VerifiableProperties:72 - Verifying properties
15:02:21,403  INFO VerifiableProperties:72 - Property metadata.broker.list is overridden to 127.0.0.1:9092
15:02:21,406  INFO VerifiableProperties:72 - Property serializer.class is overridden to kafka.serializer.StringEncoder
15:02:21,406  WARN VerifiableProperties:87 - Property zookeeper.connect is not valid
15:02:21,629  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 0 for 1 topic(s) Set(test3)
15:02:21,730  INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:21,805  INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:21,812  WARN BrokerPartitionInfo:87 - Error while fetching metadata [{TopicMetadata for topic test3 -> 
No partition metadata for topic test3 due to org.apache.kafka.common.errors.LeaderNotAvailableException}] for topic [test3]: class org.apache.kafka.common.errors.LeaderNotAvailableException 
15:02:21,816  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 1 for 1 topic(s) Set(test3)
15:02:21,817  INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:21,853  INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:21,854  WARN BrokerPartitionInfo:87 - Error while fetching metadata [{TopicMetadata for topic test3 -> 
No partition metadata for topic test3 due to org.apache.kafka.common.errors.LeaderNotAvailableException}] for topic [test3]: class org.apache.kafka.common.errors.LeaderNotAvailableException 
15:02:21,855 ERROR DefaultEventHandler:101 - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test3
15:02:21,856  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 3
15:02:21,958  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 2 for 1 topic(s) Set(test3)
15:02:21,959  INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:21,973  INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,042  INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing
15:02:52,043  INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,049  WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 4 to broker 0 with data for partitions test3-0
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
15:02:52,052  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 2
15:02:52,156  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 5 for 1 topic(s) Set(test3)
15:02:52,156  INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:52,166  INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,167  INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,169  INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing
15:02:52,169  INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,170  WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 7 to broker 0 with data for partitions test3-0
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
15:02:52,171  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 1
15:02:52,273  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 8 for 1 topic(s) Set(test3)
15:02:52,273  INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:52,287  INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,287  INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,288  INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing
15:02:52,289  INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,289  WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 10 to broker 0 with data for partitions test3-0
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
15:02:52,290  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 0
15:02:52,394  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 11 for 1 topic(s) Set(test3)
15:02:52,395  INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing
15:02:52,403  INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092
15:02:52,404  INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092
15:02:52,405 ERROR DefaultEventHandler:101 - Failed to send requests for topics test3 with correlation ids in [0,11]
Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:98)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)

14:05:12,770  INFO VerifiableProperties:72 - Verifying properties
14:05:12,824  INFO VerifiableProperties:72 - Property metadata.broker.list is overridden to 192.168.1.16:9092,192.168.1.16:9093,192.168.1.16:9094
14:05:12,830  INFO VerifiableProperties:72 - Property serializer.class is overridden to kafka.serializer.StringEncoder
14:05:12,832  WARN VerifiableProperties:87 - Property zookeeper.connect is not valid
14:05:13,103  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 0 for 1 topic(s) Set(test)
14:05:13,204  INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing
14:05:13,251  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092
14:05:43,343  INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,344  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,351  WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 2 to broker 1 with data for partitions test-0
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,354  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 3
14:05:43,463  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(2,192.168.1.16,9094) with correlation id 3 for 1 topic(s) Set(test)
14:05:43,464  INFO SyncProducer:72 - Connected to 192.168.1.16:9094 for producing
14:05:43,467  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,469  WARN ClientUtils$:93 - Fetching topic metadata with correlation id 3 for topics [Set(test)] from broker [BrokerEndPoint(2,192.168.1.16,9094)] failed
java.io.EOFException
	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
	at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:86)
	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:82)
	at kafka.utils.Logging$class.swallowError(Logging.scala:110)
	at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:46)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:86)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,470  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,474  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(1,192.168.1.16,9093) with correlation id 3 for 1 topic(s) Set(test)
14:05:43,474  INFO SyncProducer:72 - Connected to 192.168.1.16:9093 for producing
14:05:43,501  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9093
14:05:43,501  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,503  INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,503  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,504  WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 5 to broker 1 with data for partitions test-0
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,505  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 2
14:05:43,609  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(2,192.168.1.16,9094) with correlation id 6 for 1 topic(s) Set(test)
14:05:43,611  INFO SyncProducer:72 - Connected to 192.168.1.16:9094 for producing
14:05:43,617  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,617  WARN ClientUtils$:93 - Fetching topic metadata with correlation id 6 for topics [Set(test)] from broker [BrokerEndPoint(2,192.168.1.16,9094)] failed
java.io.EOFException
	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
	at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:86)
	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:82)
	at kafka.utils.Logging$class.swallowError(Logging.scala:110)
	at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:46)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:86)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,618  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094
14:05:43,618  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 6 for 1 topic(s) Set(test)
14:05:43,619  INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing
14:05:43,635  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092
14:05:43,636  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,638  INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,638  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,639  WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 8 to broker 1 with data for partitions test-2
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,640  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 1
14:05:43,743  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 9 for 1 topic(s) Set(test)
14:05:43,744  INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing
14:05:43,763  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092
14:05:43,763  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,764  INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing
14:05:43,765  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,766  WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 11 to broker 1 with data for partitions test-2
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:43,767  INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 0
14:05:43,870  INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(1,192.168.1.16,9093) with correlation id 12 for 1 topic(s) Set(test)
14:05:43,870  INFO SyncProducer:72 - Connected to 192.168.1.16:9093 for producing
14:05:43,877  INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9093
14:05:43,877  INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093
14:05:43,878 ERROR DefaultEventHandler:101 - Failed to send requests for topics test with correlation ids in [0,12]
Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:98)
	at kafka.producer.Producer.send(Producer.scala:78)
	at kafka.javaapi.producer.Producer.send(Producer.scala:35)
	at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)

Others

完成的基礎配置
#advertised.host.name=192.168.1.16
#advertised.port=29092

advertised.listeners=SASL_PLAINTEXT://192.168.1.16:29092
listeners=SASL_PLAINTEXT://:29092

#listeners=SASL_PLAINTEXT://192.168.1.16:29092
#listeners=SASL_PLAINTEXT://ip(127.0.0.1):port(9092)
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept  all the users to use it.
allow.everyone.if.no.acl.found=true
super.users=User:admin;User:alice

Reference

ACLS

相關推薦

Kafka SASL/PLAIN 環境構建(Docker)

前言 近來,老有朋友詢問Kafka SASL配置 & Demo測試 這篇文章的相關內容。近來無事,準備將所以的元件打成映象。此處,便講的為 Kafka SASL/PLAIN許可權驗證映象模組的構建。 First Try 由於Docker化的Kafka

(六)構建Docker私有倉庫、Gitlab倉庫和持續集成環境

持續集成 gitlab 私有倉庫 docker registry 環境說明IP功能eth0:192.168.124.139eth1:172.16.100.10Docker私有倉庫、Gitlab、持續集成eth0:192.168.124.138eth1:172.16.100.20Docker服務

搭建Kafka運行環境-Mac

tgz topic 我們 控制 能力 conf get 日誌文件 message 停止kafka服務: kafka_2.12-0.10.2.1> bin/kafka-server-stop.sh kafka_2.12-0.10.2.1> bin/zookeepe

基於kubernetes構建Docker集群環境實戰

基於kubernetes構建Dockerkubernetes是google公司基於docker所做的一個分布式集群,有以下主件組成  etcd: 高可用存儲共享配置和服務發現,作為與minion機器上的flannel配套使用,作用是使每臺 minion上運行的docker擁有不同的ip段,最終目的是使不同mi

構建Docker鏡像實戰之構建Tomcat9.0鏡像(RPM一鍵安裝Java環境

top spa tst wall 生成 png 一鍵 emp 指定 構建Docker鏡像實戰之構建Tomcat9.0鏡像(RPM一鍵安裝Java環境) tomcat是一個免費開源的輕量級web服務器,在中小型企和並發訪問量不高的場合普遍使用,是開發和調試JSP程序的首選。下

使用compose構建Docker多應用環境(Nginx+PHP+MySQL+Redis)

首先安裝compose #下載並移動到/usr/local/bin目錄下 curl -L "https://github.com/docker/compose/releases/download/1.23.1/docker-compose-$(uname -s)-$(uname -m)" \

kafka使用筆記-基於SASL認證的kafka偽叢集環境搭建及測試

繼 搭建免認證kafka單機 之後由於業務需要,搭建了基於SASL認證的kafka偽叢集環境。本次同樣使用的是 kafka_2.10-0.10.1.0.tgz 版本的kafka,整合zookeeper,只需要對此進行配置即可,無需單獨安裝。 一、準備工作 1、環境:ubuntu1

maven構建docker映象三部曲之一:準備環境

更簡單的部署 之前的實戰中,如果要在docker環境中執行java的web工程,通常先執行一個支援線上部署的tomcat容器,然後通過mavenn的tomcat7-maven-plugin外掛把工程線上部署到tomcat中,有沒有更簡便的方法呢?有,利

搭建秒殺環境Docker + Nodejs + Kafka + Redis + MySQL

Java高階架構資訊 2018-12-18 19:52:26 秒殺活動可以說在網際網路上隨處可見,從12306搶票,到聚划算搶購,我們生活的方方面面都可以看到秒殺的身影。秒殺的架構設計也是對於一個架構師架構設計能力的一次考驗。本文的目的並不在於提供一個可以直接落地的設計方案,而是意在提

kafka叢集使用 SASL/PLAIN 認證

kafka叢集使用 SASL/PLAIN 認證 SASL/PLAIN 是一種簡單的 username/password 認證機制, 通常與 TLS 加密一起使用, 用於實現安全認證. Kafka 提供了一個預設的 SASL/PLAIN 實現, 可以做擴充套件後在生產環境使用. usern

Flume+Kafka環境構建和實戰

1. 準備工作 apache上下載 apache-flume-1.7.0, apache-kafka_2.12-0.11, apache-zookeeper-3.4.9 下載後分別解壓至/home/hadoop/bigdata並重命名目錄為flume, kafka, zk

二、基於kubernetes構建Docker叢集環境實戰

kubernetes是google公司基於docker所做的一個分散式叢集,有以下主件組成   etcd: 高可用儲存共享配置和服務發現,作為與minion機器上的flannel配套使用,作用是使每臺 minion上執行的docker擁有不同的ip段,最終目的是使不同minion上正在

Kafka JAAS Plain SASL 安全認證配置

1. 配置zookeeper 1. 為zookeeper新增 jaas 檔案 zookeeper { org.apache.kafka.common.security.plain.Pl

阿里雲構建Kafka單機叢集環境

簡介 在一臺ECS阿里雲伺服器上構建Kafa單個叢集環境需要如下的幾個步驟: 伺服器環境 JDK的安裝 ZooKeeper的安裝 Kafka的安裝 1. 伺服器環境 CPU: 1核 記憶體: 2048 MB (I/O優化) 1Mbp

kafka原始碼閱讀環境搭建(gradle構建工具、idea)

1.安裝gradle工具,下載地址:https://gradle.org/next-steps/?version=4.7&format=all2.配置環境變數,GRADLE_HOME,path,注意:要在系統變數中配置3.cmd進入dos視窗,gradle -v檢視版

演算法第四eclipse環境構建,BinarySearch執行

 該文章是基於eclipse軟體,eclipse下載網上自查, 在網上也找了很多關於此類問題的文章,也對我解決問題起到了非常大的作用 如果有不能搭建成功的,歡迎聯絡qq1136075279,共同學習,共同進步 一、匯入外部庫 這本書有它自己的外部庫,需要匯入

Kafka安全認證SASL/PLAIN,並和springBoot整合

kafka_2.11-1.1.0.tgz、zookeeper-3.4.10.tar.gz版本 1. kafka配置 kafka解壓目錄下工作 # 1.新建配置檔案 vi ./config/kafka_server_jaas.conf # 檔案內

centos環境docker搭建jira-7.11.1破解

基於centos7環境下搭建一個jira7.11.1最新版。破解補丁需要的請在下面留言。 pull docker 映象:  jira:7.11.1(目前的最新版本)    mysql:5.7 docker pull cptactionhank/atlassian-jir

用前端姿勢玩docker【三】基於nvm的前端環境構建技巧

### 前言 * 安裝docker啥的就不說了,這裡重點強調一下,docker的環境問題。本人的環境: 虛擬機器centos => docker => NAT => container 因為需要不斷更換網路環境,如若使用橋接,需要不斷調整網絡卡的IP,使虛機與宿主機保持在同一網段,所以乾脆用了NAT,此處

kafka SASL認證介紹及自定義SASL PLAIN認證功能

[toc] 使用者認證功能,是一個成熟元件不可或缺的功能。在0.9版本以前kafka是沒有使用者認證模組的(或者說只有SSL),好在kafka0.9版本以後逐漸釋出了多種使用者認證功能,彌補了這一缺陷(這裡僅介紹SASL)。 本篇會先介紹當前kafka的四種認證方式,然後過一遍部署SASL/PLAIN認證