Spring kafka Integration整合
Spring Integration Kafka Adapter
The Spring Integration Kafka Adapter provides client components for Apache Kafka. Apache Kafka is a distributed publish-subscribe messaging system that is designed for high throughput (terabytes of data) and low latency (miliseconds).
For more information on Kafka and its design goals, see
In particular, Spring Integration Kafka provides:
- A consumer client library based on the Simple Consumer API with the following features:
- Dependency injection-friendly
ConnectionFactory
for infrastructure management; KafkaTemplate
for broker read operations, following the general Spring Template model, with the ability of reading from specific partitions and offsets;- Message-driven
KafkaMessageListenerContainer
with support for:- Listening to specific partitions and starting offsets;
- Customizing offset management via the
OffsetManager
abstraction, with the ability of choosing between various offset storage and update strategies; - Manual acknowledgment of offsets for asynchronous operation;
- Dependency injection-friendly
- Inbound and outbound channel adapters for Spring Integration
Quick Start
See the Spring Integration kafka Sample for a simple Spring Boot application that sends and receives messages.
Checking out and building
Currently Spring Integration Kafka adapter is built against Kafka 0.8.2.1 that is backed by Scala 2.10.4.
In order to build the project:
./gradlew build
In order to install this into your local maven cache:
./gradlew install
Spring Integration Kafka project currently supports the following components. Please keep in mind that this is very early stage in development and do not yet fully make use of all the features that Kafka provides.
- Outbound Channel Adapter
- Message Driven Channel Adapter based on the simple consumer API
- Inbound Channel Adapter based on the High level consumer API
Outbound Channel Adapter:
The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka. The channel is defined in the application context and then wired in the application that sends messages to Kafka. After that, sender applications can publish
to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be used to populate the payload of the Kafka message, and the kafka_messageKey
header
of the Spring Integration message will be used to populate the key of the Kafka message.
The target topic and partition for publishing the message can be customized through the kafka_topic
andkafka_partitionId
headers,
respectively.
Here's an example for sending a message with an arbitrary payload and the String "key"
as value on the test
topic.
final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class); channel.send( MessageBuilder.withPayload(payload) .setHeader(KafkaHeaders.MESSAGE_KEY, "key") // Note: the header was `messageKey` in earlier versions .setHeader(KafkaHeaders.TOPIC, "test") // Note: the header was `topic` in earlier versions .build() );
In addition, the <int-kafka:outbound-channel-adapter>
provides the ability to extract the key, target topic,
and target partition by applying SpEL expressions on the outbound message. To that end, it supports the mutually exclusive pairs of attributes topic
/topic-expression
, message-key
/message-key-expression
,
and partition-id
/partition-id-expression
,
to allow the specification of topic
,message-key
and partition-id
respectively
as static values on the adapter, or to dynamically evaluate their values at runtime against the request message.
Important. The KafkaHeaders
interface contains constants used for interacting with
headers. The messageKey
andtopic
default
headers now require a kafka_
prefix. When migrating from an earlier version that used the old headers, you need
to specify message-key-expression="headers.messageKey"
and topic-expression="headers.topic"
on
the <int-kafka:outbound-channel-adapter>
, or simply change the headers upstream to the new headers fromKafkaHeaders
using
a <header-enricher>
or MessageBuilder
.
Or, of course, configure them on the adapter if you are using constant values.
Here is how kafka outbound channel adapter is configured:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext" auto-startup="false" channel="inputToKafka" topic="foo" message-key-expression="header.messageKey"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/> </int-kafka:outbound-channel-adapter>
The key aspect in this configuration is the producer-context-ref. The producer context contains all the producer configurations for the topics that this adapter is expected to handle. The adapter will subscribe to a channel and any message sent to that channel will be handled by this adapter. You can also configure a poller depending on the type of the channel used. For example, in the above configuration, we use a queue based channel and thus a poller is configured with a task executor. If no messages are available in the queue it will timeout immediately because of the receive-timeout configuration. Then it will poll again with a delay of 1 second.
The producer context is at the heart of the kafka outbound adapter. Here is an example of how it is configured.
<int-kafka:producer-context id="kafkaProducerContext"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="localhost:9092" key-class-type="java.lang.String" value-class-type="java.lang.String" topic="test1" value-serializer="kafkaSerializer" key-serializer="kafkaSerializer" compression-type="none"/> <int-kafka:producer-configuration broker-list="localhost:9092" topic="test2" compression-type="none"/> <int-kafka:producer-configuration broker-list="localhost:9092" topic="regextopic.*" compression-type="gzip"/> </int-kafka:producer-configurations> </int-kafka:producer-context>
There are a few things going on here. So, lets go one by one. First of all, producer context is simply a holder of, as the name indicates, a context for the Kafka producer. It contains one ore more producer configurations. Each producer configuration is ultimately gets translated into a Kafka native producer. Each producer configuration is per topic based right now. If you go by the above example, there are two producers generated from this configuration - one for topic named test1 and another for test2. Each producer can take the following:
broker-list List of comma separated brokers that this producer connects to
topic Topic name or Java regex pattern of topic name
compression-type Compression method to be used. Supported compression types are `none`, `gzip` and `snappy`. Default is `none`.
value-serializer Serializer to be used for encoding messages.
key-serializer Serializer to be used for encoding the partition key
key-class-type Type of the key class. This will be ignored if no key-encoder is provided
value-class-type Type of the value class. This will be ignored if no value-encoder is provided.
partitioner Custom implementation of a Kafka Partitioner interface.
batch-bytes Number of bytes to batch at the producer. If async is false, then this has no effect.
The value-serializer
and key-serializer
are
referring to other spring beans. They are essentially implementations of an interface provided by Kafka, the Serializer interface. Here is an example of configuring an encoder.
<bean id="kafkaSerializer" class="com.acme.KryoSerializer"/>
Spring Integration Kafka allows the reuse of pre-0.8.2 Encoder
s. To do so, the attributes key-encoder
and value-encoder
can
be used instead of key-serializer
and value-serializer
respectively.
For either the key or value, you can configure either a Serializer
or an Encoder
but
not both.
Spring Integration Kafka provides Apache Avro backed encoders out of the box, as this is a popular choice for serialization in the big data spectrum. If no encoders are specified as beans, the default encoders provided by Kafka will be used. On that not, if the encoder is configured only for the message and not for the key, the same encoder will be used for both. These are standard Kafka behaviors. Spring Integration Kafka adapter does simply enforce those behaviours. Kafka default encoder expects the data to come as byte arrays and it is a no-op encoder, i.e. it just takes the byte array as it is. When default encoders are used, there are two ways a message can be sent. Either, the sender of the message to the channel can simply put byte arrays as message key and payload. Or, the key and value can be sent as Java Serializable object. In the latter case, the Kafka adapter will automatically convert them to byte arrays before sending to Kafka broker. If the encoders are default and the objects sent are not serializable, then that would cause an error. By providing explicit encoders it is totally up to the developer to configure how the objects are serialized. In that case, the objects may or may not implement the Serializable interface.
A bit more on the Avro support. There are two flavors of Avro encoders provided, one based on the Avro ReflectDatum and the other based on SpecificDatum. The encoding using reflection is fairly simple as you only have to configure your POJO or other class types along with the XML. Here is an example.
<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> <constructor-arg value="java.lang.String" /> </bean>
Reflection based encoding may not be appropriate for large scale systems and Avro's SpecificDatum based encoders can be a better fit. In this case, you can generate a specific Avro object (a glorified POJO) from a schema definition. The generated object will store the schema as well. In order to do this, you need to generate the Avro object separately though. There are both maven and gradle plugins available to do code generation automatically. You have to provide the avdl or avsc files to specify your schema. Once you take care of these steps, you can simply configure a specific datum based Avro encoder (see the first example above) and pass along the fully qualified class name of the generated Avro object for which you want to encode instances. The samples project has examples of using both of these encoders.
Encoding String for key and value is a very common use case and Kafka provides a StringEncoder out of the box. It takes a Kafka specific VerifiableProperties object along with its constructor that wraps a regular Java.util.Properties object. The StringEncoder is great when writing a direct Java client that talks to Kafka. However, when using Spring Integration Kafka adapter, it introduces unnecessary steps to create these properties objects. Therefore, we provide a wrapper class for this same StringEncoder as part of the SI kafka support, which makes using it from Spring a bit easier. You can inject any properties to it in the Spring way. Kafka StringEncoder looks at a specific property for the type of encoding scheme used. In the wrapper bean provided, this property can simply be injected as a value without constructing any other objects. Spring Integration provided StringEncoder is available in the package org.springframework.integration.kafka.serializer.common.StringEncoder. The avro support for serialization is also available in a package called avro under serializer.
Tuning Producer Properties
Kafka Producer API provides several [Producer Configs] (http://kafka.apache.org/documentation.html#producerconfigs)
to fine-tune producers. To specify those properties, producer-context
element supports optional producer-properties
attribute
that can reference the Spring properties bean. These properties will be applied to all Producer Configurations within the producer context. For example:
<bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="message.send.max.retries">5</prop> <prop key="send.buffer.bytes">5242880</prop> </props> </property> </bean> <int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties"> <int-kafka:producer-configurations> <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration> <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration> ... <int-kafka:producer-configurations> </int-kafka:producer-context>
Inbound Channel Adapter:
The Inbound channel adapter is used to consume messages from Kafka. These messages will be placed into a channel as Spring Integration specific Messages. Kafka provides two types of consumer API's primarily. One is called the High Level Consumer and the other is the Simple Consumer. High Level consumer is pretty complex inside. Nonetheless, for the client, using the high level API is straightforward. Although easy to use, High level consumer does not provide any offset management. So, if you want to rewind and re-fetch messages, it is not possible to do so using the High Level Consumer API. Offsets are managed by the Zookeeper internally in the High Level Consumer. If your use case does not require any offset management or re-reading messages from the same consumer, then high level consumer is a perfect fit. Spring Integration Kafka inbound channel adapter currently supports only the High Level Consumer. Here are the details of configuring one.
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="false" channel="inputFromKafka"> <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5"/> </int-kafka:inbound-channel-adapter>
Since this inbound channel adapter uses a Polling Channel under the hood, it must be configured with a Poller. A notable difference between the poller configured with this inbound adapter and other pollers used in Spring Integration is that the receive-timeout specified on this poller does not have any effect. The reason for this is because of the way Kafka implements iterators on the consumer stream. It is using a BlockingQueue internally and thus it would wait indefinitely. Instead of interrupting the underlying thread, we are leveraging a direct Kafka support for consumer time out. It is configured on the consumer context. Everything else is pretty much the same as in a regular inbound adapter. Any message that it receives will be sent to the channel configured with it.
Inbound Kafka Adapter must specify a kafka-consumer-context-ref element and here is how it is configured:
<int-kafka:consumer-context id="consumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default" value-decoder="valueDecoder" key-decoder="valueDecoder" max-messages="5000"> <int-kafka:topic id="test1" streams="4"/> <int-kafka:topic id="test2" streams="4"/> </int-kafka:consumer-configuration> <int-kafka:consumer-configuration group-id="default3" value-decoder="kafkaSpecificDecoder" key-decoder="kafkaReflectionDecoder" max-messages="10"> <int-kafka:topic-filter pattern="regextopic.*" streams="4" exclude="false"/> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context>
consumer-configuration
supports consuming from specific topic using a topic
child
element or from multiple topics matching a topic regex using topic-filter
child element. topic-filter
supports
both whitelist and blacklist filter based onexclude
attribute.
Consumer context requires a reference to a zookeeper-connect which dictates all the zookeeper specific configuration details. Here is how a zookeeper-connect is configured.
<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" />
zk-connect attribute is where you would specify the zookeeper connection. All the other attributes get translated into their zookeeper counter-part attributes by the consumer.
In the above consumer context, you can also specify a consumer-timeout value which would be used to timeout the consumer in case of no messages to consume. This timeout would be applicable to all the streams (threads) in the consumer. The default value for this in Kafka is -1 which would make it wait indefinitely. However, Sping Integration overrides it to be 5 seconds by default in order to make sure that no threads are blocking indefinitely in the lifecycle of the application and thereby giving them a chance to free up any resources or locks that they hold. It is recommended to override this value so as to meet any specific use case requirements. By providing a reasonable consumer-timeout on the context and a fixed-delay value on the poller, this inbound adapter is capable of simulating a message driven behaviour.
consumer context takes consumer-configurations which are at the core of the inbound adapter. It is a group of one or more consumer-configuration elements which consists of a consumer group dictated by the group-id. Each consumer-configuration can be configured with one or more kafka-topics.
In the above example provided, we have a single consumer-configuration that consumes messages from two topics each having 4 streams. These streams are fundamentally equivalent to the number of partitions that a topic is configured with in the producer. For instance, if you configure your topic with 4 partitions, then the maximum number of streams that you may have in the consumer is also 4. Any more than this would be a no-op. If you have less number of streams than the available partitions, then messages from multiple partitions will be sent to available streams. Therefore, it is a good practice to limit the number of streams for a topic in the consumer configuration to the number of partitions configured for the topic. There may be situations in which a partition may be gone during runtime and in that case the stream receiving data from the partition will simply timeout and whenever this partition comes back, it would start read data from it again.
Consumer configuration can also be configured with optional decoders for key and value. The default decoders provided by Kafka are basically no-ops and would consume as byte arrays. If you provide a custom encoder for key/value in the producer, then it is recommended to provide corresponding decoders for the consumer. As discussed already in the outbound adapter, Spring Integration Kafka adapter gives Apache Avro based data serialization components out of the box. You can use any serialization component for this purpose as long as you implement the required encoder/decoder interfaces from Kafka. As with the Avro encoder support, decoders provided also implement Reflection and Specific datum based de-serialization. Here is how you would configure kafka decoder beans that is Avro backed.
Message Driven Channel Adapter:
The KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
)
uses the KafkaSimpleConsumer
(https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
internally. Although it is called 'simple', the API and usage is not so simple. To simplify the configuration and provide a higher-level API based on Spring Integration concepts, the KafkaMessageListenerContainer
has
been introduced. It supports 'leader election' withorg.springframework.integration.kafka.core.ConnectionFactory
and
'offset management' withorg.springframework.integration.kafka.listener.OffsetManager
abstractions. The DefaultConnectionFactory
requiresorg.springframework.integration.kafka.core.Configuration
for
Kafka. ZookeeperConfiguration
andBrokerAddressListConfiguration
are
presented as configuration options.
The KafkaMessageDrivenChannelAdapter
implements MessageProducer
,
reads a KafkaMessage
with its Metadata
and
sends it as a Spring Integration message to the provided MessageChannel
. The KafkaMessageListenerContainer
orConnectionFactory
and topics
pair
are required for the MessageDrivenChannelAdapter
configuration. The typical Java based configuration is:
@Bean public Configuration zkConfiguration() { return new ZookeeperConfiguration(new ZookeeperConnect()); } @Bean public ConnectionFactory kafkaConnectionFactory() { return new DefaultConnectionFactory(zkConfiguration()); } @Bean public MessageProducer kafkaMessageDrivenChannelAdapter() { KafkaMessageDrivenChannelAdapter adapter = new KafkaMessageDrivenChannelAdapter( new KafkaMessageListenerContainer(kafkaConnectionFactory(), "topic1", "topic2") ); adapter.setOutputChannel(inputChannel()); return adapter; }
As a variant, the KafkaMessageListenerContainer
can accept org.springframework.integration.kafka.core.Partition
array
argument to specify topics and their partitions pair.
The xml configuration variant is typical too:
<int-kafka:message-driven-channel-adapter id="adapter" channel="output" connection-factory="connectionFactory" key-decoder="decoder" payload-decoder="decoder" offset-manager="offsetManager" max-fetch="100" topics="${kafka.test.topic}"/>
Where offsetManager
is a bean that is an implementation oforg.springframework.integration.kafka.listener.OffsetManager
.
The default implementation isMetadataStoreOffsetManager
, which is based on the MetadataStore
to
store and fetch offsets
under the key based on the provided Partition
and
preconfigured consumerId
option. The KafkaMessageListenerContainer
takes
care aboutoffsets
management during its internal process. Another implementation is KafkaTopicOffsetManager
to
free application from any other external system like Redis for the MetadataStoreOffsetManager
.
The KafkaMessageListenerContainer
can be configured with concurrency
to
run several internalQueueingMessageListenerInvoker
concurrent fetch tasks.
Refer to the KafkaMessageDrivenChannelAdapter
and KafkaMessageListenerCo