1. 程式人生 > >kafka學習(7)生產者和消費者程式碼

kafka學習(7)生產者和消費者程式碼

首先,我們開啟kafka的api頁面,裡面都有詳細的樣例

http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer

訊息生產這要傳送訊息,需要經過三次握手,第一次獲取連線,第二次傳送訊息,第三次關閉連線。

下面我們通過構建一個例子,讓生產者通過JAVA程式碼,傳送一條訊息。

建立spring boot專案,在jar包依賴中,加入kafka的依賴包。

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.2.0</version>
		</dependency>

需要注意的是,依賴的jar包版本,要與我們部署的kafka版本一致,不然容易發生問題還不好解決。

下面我簡單傳送一個例子:

package com.example.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Test {
	public static void main(String[] args) {
		Properties props = new Properties();
		// 與kafka連線
		props.setProperty("bootstrap.servers", "master:9092,slave01:9092,slave02:9092");
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);
		// 傳送
		producer.send(new ProducerRecord<String, String>("mytopic2", "hello"));
		// 關閉
		producer.close();
	}
}

執行之後,會報錯:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
	at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.example.producer.Test.main(Test.java:13)

報錯原因是系統之間的傳遞資料,一般都是將資料序列化的。增加如下配置:

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

然後我們執行以下,發現還是報錯:

00:42:42.927 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [master:9092, slave01:9092, slave02:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

00:42:43.048 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
00:42:43.056 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
00:42:45.617 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server master:9092 from bootstrap.servers as DNS resolution failed for master
00:42:48.170 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server slave01:9092 from bootstrap.servers as DNS resolution failed for slave01
00:42:50.721 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server slave02:9092 from bootstrap.servers as DNS resolution failed for slave02
00:42:50.723 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
00:42:50.726 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.example.producer.Test.main(Test.java:16)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:90)
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:49)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:408)
	... 2 more

 原因很簡單,推訊息失敗了,我這裡的報錯的原因,是因為mytopic2這個主題,還沒有建立。改為mytopic主題後,傳送成功,如下。

16:03:11.739 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

16:03:11.956 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
16:03:11.972 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
16:03:11.989 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors
16:03:12.028 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time
16:03:12.542 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
16:03:12.543 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
16:03:12.543 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication:
16:03:12.544 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-reauthentication:
16:03:12.545 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication-no-reauth:
16:03:12.545 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-authentication:
16:03:12.546 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-reauthentication:
16:03:12.546 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name reauthentication-latency:
16:03:12.547 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
16:03:12.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
16:03:12.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
16:03:12.550 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
16:03:12.551 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
16:03:12.562 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size
16:03:12.563 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate
16:03:12.564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time
16:03:12.564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time
16:03:12.565 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request
16:03:12.566 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries
16:03:12.568 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size
16:03:12.570 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-split-rate
16:03:12.573 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Starting Kafka producer I/O thread.
16:03:12.575 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9094 (id: -3 rack: null) for sending metadata request
16:03:12.577 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9094 (id: -3 rack: null) using address /127.0.0.1
16:03:12.578 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.2.0
16:03:12.578 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 05fcfde8f69b0349
16:03:12.586 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started
16:03:12.606 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-sent
16:03:12.607 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-received
16:03:12.609 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.latency
16:03:12.610 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -3
16:03:12.828 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -3. Fetching API versions.
16:03:12.828 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -3.
16:03:12.833 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9093 (id: -2 rack: null) for sending metadata request
16:03:12.834 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9093 (id: -2 rack: null) using address /127.0.0.1
16:03:12.838 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-sent
16:03:12.840 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-received
16:03:12.842 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.latency
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -2. Fetching API versions.
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -2.
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9092 (id: -1 rack: null) for sending metadata request
16:03:12.844 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9092 (id: -1 rack: null) using address /127.0.0.1
16:03:12.844 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
16:03:12.846 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
16:03:12.847 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
16:03:12.847 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -3: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
16:03:12.856 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=mytopic) to node 127.0.0.1:9094 (id: -3 rack: null)
16:03:12.858 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -2: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.866 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -1: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.897 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updating last seen epoch from null to 0 for partition mytopic-0
16:03:12.897 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updating last seen epoch from null to 0 for partition mytopic-1
16:03:12.901 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: pqU8TrnFQvumP_0Nu9V0ag
16:03:12.901 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = pqU8TrnFQvumP_0Nu9V0ag, nodes = [127.0.0.1:9094 (id: 3 rack: null), 127.0.0.1:9092 (id: 1 rack: null), 127.0.0.1:9093 (id: 2 rack: null)], partitions = [Partition(topic = mytopic, partition = 0, leader = 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = mytopic, partition = 1, leader = 1, replicas = [1,3], isr = [1,3], offlineReplicas = [])], controller = 127.0.0.1:9092 (id: 1 rack: null))}
16:03:12.925 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9094 (id: 3 rack: null) using address /127.0.0.1
16:03:12.926 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-sent
16:03:12.926 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-received
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.latency
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 3
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 3. Fetching API versions.
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 3.
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
16:03:12.930 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 3: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.935 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.records-per-batch
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.bytes
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.compression-rate
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.record-retries
16:03:12.937 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.record-errors
16:03:13.005 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
16:03:13.005 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
16:03:13.006 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
16:03:13.006 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-reauthentication:
16:03:13.007 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication-no-reauth:
16:03:13.008 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-reauthentication:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name reauthentication-latency:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
16:03:13.010 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
16:03:13.010 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
16:03:13.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
16:03:13.013 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.bytes-sent
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.bytes-received
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.latency
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.bytes-sent
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.bytes-received
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.latency
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-sent
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-received
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.latency
16:03:13.018 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
16:03:13.018 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed

然後我們通過以下命令,啟動一個消費者,可以看到新新增進來的訊息

kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic mytopic

修改上述程式碼,採用for迴圈連續傳送100條訊息。可以看到消費者接收到的訊息順序和迴圈不是一致的。

	public static void main(String[] args) {
		Properties props = new Properties();
		// 與kafka連線
		props.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);
		// 傳送
		for(int i = 0 ; i < 100 ; i++)
		producer.send(new ProducerRecord<String, String>("mytopic", "hello"+i));
		// 關閉
		producer.close();
	}

原因是訊息是存放在不同的分割槽中的,而消費者也是隨機從不同的分割槽去消費訊息的。既訊息在一個分割槽中是有序的,但是消費者並沒有指定消費哪個分割槽的訊息,就來回消費所有分割槽訊息。就是亂的。但如果只有一個分割槽,訊息消費就是有序的。

 

下面是消費者程式碼:

package com.example.producer;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerTest {
	public static void main(String[] args) {
		Properties props = new Properties();
		// 與kafka連線
		// 指定broker連結地址
		props.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
		// key反序列化
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// value反序列化
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

		// 每個消費者都必須屬於某一個消費組,所以必須指定group.id
		props.put("group.id", "test");
		// 構造生產者實力物件
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		// 拿資料

		// 指定多主題方式1:
		// List<String> topics = new ArrayList<String>();
		// topics.add("mytopic");

		// 指定多主題方式2:
		List<String> topics = Arrays.asList("mytopic");
		consumer.subscribe(topics);
		try {
			while (true) {
				// timeout 阻塞時間,從kafka中取出100毫秒的資料,有可能一次取出0到N條
				ConsumerRecords<String, String> records = consumer.poll(100);
				// 遍歷
				for (ConsumerRecord<String, String> record : records) {
					// 拿出結果
					System.out.println("消費者拿到的資料:" + record.value());
				}
			}
		} finally {
			// 關閉
			consumer.close();
		}

	}
}

執行消費者程式碼,可以看到消費到訊息:

我們通過消費者程式碼,從kafka中拿到資料,是要往DB中寫的。但是往DB寫的過程中,也許會出現問題,但是此時kafka認為訊息已經被消費了,就會造成消費丟失,資料丟失的情況。