kafka學習(7)生產者和消費者程式碼
首先,我們開啟kafka的api頁面,裡面都有詳細的樣例
http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer
訊息生產這要傳送訊息,需要經過三次握手,第一次獲取連線,第二次傳送訊息,第三次關閉連線。
下面我們通過構建一個例子,讓生產者通過JAVA程式碼,傳送一條訊息。
建立spring boot專案,在jar包依賴中,加入kafka的依賴包。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>
需要注意的是,依賴的jar包版本,要與我們部署的kafka版本一致,不然容易發生問題還不好解決。
下面我簡單傳送一個例子:
package com.example.producer; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class Test { public static void main(String[] args) { Properties props = new Properties(); // 與kafka連線 props.setProperty("bootstrap.servers", "master:9092,slave01:9092,slave02:9092"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 傳送 producer.send(new ProducerRecord<String, String>("mytopic2", "hello")); // 關閉 producer.close(); } }
執行之後,會報錯:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) at com.example.producer.Test.main(Test.java:13)
報錯原因是系統之間的傳遞資料,一般都是將資料序列化的。增加如下配置:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
然後我們執行以下,發現還是報錯:
00:42:42.927 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [master:9092, slave01:9092, slave02:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
00:42:43.048 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
00:42:43.056 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
00:42:45.617 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server master:9092 from bootstrap.servers as DNS resolution failed for master
00:42:48.170 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server slave01:9092 from bootstrap.servers as DNS resolution failed for slave01
00:42:50.721 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server slave02:9092 from bootstrap.servers as DNS resolution failed for slave02
00:42:50.723 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
00:42:50.726 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at com.example.producer.Test.main(Test.java:16)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:90)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:49)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:408)
... 2 more
原因很簡單,推訊息失敗了,我這裡的報錯的原因,是因為mytopic2這個主題,還沒有建立。改為mytopic主題後,傳送成功,如下。
16:03:11.739 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
16:03:11.956 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
16:03:11.972 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
16:03:11.989 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors
16:03:12.028 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time
16:03:12.542 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
16:03:12.543 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
16:03:12.543 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication:
16:03:12.544 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-reauthentication:
16:03:12.545 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication-no-reauth:
16:03:12.545 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-authentication:
16:03:12.546 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-reauthentication:
16:03:12.546 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name reauthentication-latency:
16:03:12.547 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
16:03:12.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
16:03:12.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
16:03:12.550 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
16:03:12.551 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
16:03:12.562 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size
16:03:12.563 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate
16:03:12.564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time
16:03:12.564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time
16:03:12.565 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request
16:03:12.566 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries
16:03:12.568 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size
16:03:12.570 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-split-rate
16:03:12.573 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Starting Kafka producer I/O thread.
16:03:12.575 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9094 (id: -3 rack: null) for sending metadata request
16:03:12.577 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9094 (id: -3 rack: null) using address /127.0.0.1
16:03:12.578 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.2.0
16:03:12.578 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 05fcfde8f69b0349
16:03:12.586 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started
16:03:12.606 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-sent
16:03:12.607 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-received
16:03:12.609 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.latency
16:03:12.610 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -3
16:03:12.828 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -3. Fetching API versions.
16:03:12.828 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -3.
16:03:12.833 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9093 (id: -2 rack: null) for sending metadata request
16:03:12.834 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9093 (id: -2 rack: null) using address /127.0.0.1
16:03:12.838 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-sent
16:03:12.840 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-received
16:03:12.842 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.latency
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -2. Fetching API versions.
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -2.
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9092 (id: -1 rack: null) for sending metadata request
16:03:12.844 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9092 (id: -1 rack: null) using address /127.0.0.1
16:03:12.844 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
16:03:12.846 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
16:03:12.847 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
16:03:12.847 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -3: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
16:03:12.856 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=mytopic) to node 127.0.0.1:9094 (id: -3 rack: null)
16:03:12.858 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -2: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.866 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -1: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.897 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updating last seen epoch from null to 0 for partition mytopic-0
16:03:12.897 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updating last seen epoch from null to 0 for partition mytopic-1
16:03:12.901 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: pqU8TrnFQvumP_0Nu9V0ag
16:03:12.901 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = pqU8TrnFQvumP_0Nu9V0ag, nodes = [127.0.0.1:9094 (id: 3 rack: null), 127.0.0.1:9092 (id: 1 rack: null), 127.0.0.1:9093 (id: 2 rack: null)], partitions = [Partition(topic = mytopic, partition = 0, leader = 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = mytopic, partition = 1, leader = 1, replicas = [1,3], isr = [1,3], offlineReplicas = [])], controller = 127.0.0.1:9092 (id: 1 rack: null))}
16:03:12.925 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9094 (id: 3 rack: null) using address /127.0.0.1
16:03:12.926 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-sent
16:03:12.926 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-received
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.latency
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 3
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 3. Fetching API versions.
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 3.
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
16:03:12.930 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 3: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.935 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.records-per-batch
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.bytes
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.compression-rate
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.record-retries
16:03:12.937 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.record-errors
16:03:13.005 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
16:03:13.005 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
16:03:13.006 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
16:03:13.006 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-reauthentication:
16:03:13.007 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication-no-reauth:
16:03:13.008 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-reauthentication:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name reauthentication-latency:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
16:03:13.010 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
16:03:13.010 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
16:03:13.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
16:03:13.013 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.bytes-sent
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.bytes-received
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.latency
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.bytes-sent
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.bytes-received
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.latency
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-sent
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-received
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.latency
16:03:13.018 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
16:03:13.018 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed
然後我們通過以下命令,啟動一個消費者,可以看到新新增進來的訊息
kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic mytopic
修改上述程式碼,採用for迴圈連續傳送100條訊息。可以看到消費者接收到的訊息順序和迴圈不是一致的。
public static void main(String[] args) {
Properties props = new Properties();
// 與kafka連線
props.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 傳送
for(int i = 0 ; i < 100 ; i++)
producer.send(new ProducerRecord<String, String>("mytopic", "hello"+i));
// 關閉
producer.close();
}
原因是訊息是存放在不同的分割槽中的,而消費者也是隨機從不同的分割槽去消費訊息的。既訊息在一個分割槽中是有序的,但是消費者並沒有指定消費哪個分割槽的訊息,就來回消費所有分割槽訊息。就是亂的。但如果只有一個分割槽,訊息消費就是有序的。
下面是消費者程式碼:
package com.example.producer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
// 與kafka連線
// 指定broker連結地址
props.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// key反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 每個消費者都必須屬於某一個消費組,所以必須指定group.id
props.put("group.id", "test");
// 構造生產者實力物件
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 拿資料
// 指定多主題方式1:
// List<String> topics = new ArrayList<String>();
// topics.add("mytopic");
// 指定多主題方式2:
List<String> topics = Arrays.asList("mytopic");
consumer.subscribe(topics);
try {
while (true) {
// timeout 阻塞時間,從kafka中取出100毫秒的資料,有可能一次取出0到N條
ConsumerRecords<String, String> records = consumer.poll(100);
// 遍歷
for (ConsumerRecord<String, String> record : records) {
// 拿出結果
System.out.println("消費者拿到的資料:" + record.value());
}
}
} finally {
// 關閉
consumer.close();
}
}
}
執行消費者程式碼,可以看到消費到訊息:
我們通過消費者程式碼,從kafka中拿到資料,是要往DB中寫的。但是往DB寫的過程中,也許會出現問題,但是此時kafka認為訊息已經被消費了,就會造成消費丟失,資料丟失的情況。