Kafka整合Spring-AcknowledgeMessageListener介面實現
前言
因工作需要,需在系統利用Kafka監聽介面,實現訊息佇列中,對訊息的消費,首選Kafka,因為看中其超高的吞吐量。
基本概念
- 1 Producer: 特指訊息的生產者
- 2 Consumer :特指訊息的消費者
- 3 Consumer Group :消費者組,可以並行消費Topic中partition的訊息
- 4 Broker:快取代理,Kafa 叢集中的一臺或多臺伺服器統稱為 broker。
- 5 Topic:特指 Kafka 處理的訊息源(feeds of messages)的不同分類。
- 6 Partition:Topic 物理上的分組,一個 topic 可以分為多個 partition,每個 partition 是一個有序的佇列。partition 中的每條訊息都會被分配一個有序的 id(offset)
- 7 Message:訊息,是通訊的基本單位,每個 producer 可以向一個 topic(主題)釋出一些訊息
- 8 稀疏索引:採用稀疏索引的方式,利用二分查詢,定位訊息。
整合Spring
- 新增Maven依賴
由於專案使用Maven進行管理,引入Kafka-Spring相關Jar包,需要新增依賴,此處使用的是Kafka0.10.2
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
1 版本相容性
配置完Maven依賴以後,還需要確認,因為Kafka與Spring有依賴關係,需要確定Spring的版本是否能和Kafka0.10.2完美相容,查閱Spring For Apache Kafka 文件可知:
Compatibility- Apache Kafka 0.10.2.0
- Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
- Annotation-based listeners require Spring Framework 4.1 or higher, however.
- Minimum Java version: 7.
Kafka 0.10.2 需要SpringFrameWork 4.3.7,但後續會逐漸相容SpringFrameWork更早期的版本,實踐發現,Kafka的生產者裡面的api會受SpringFrameWork版本影響,而消費者無影響,因此,可以保持專案中原有springframework不變。
2 排除重複包
引入Maven依賴以後,Kafka的maven依賴,自動包含了springframework相關jar包,需要排除。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
- 3 介面區別
Kafka消費者,實現有兩種方式:client客戶端和listener監聽介面,這裡因業務需要,採用監聽介面的方式實現,Spring提供了四種介面,如下所示:
public interface MessageListener<K, V> {} 1
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {} 2
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface BatchMessageListener<K, V> {} 3
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {} 4
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
對應的解釋如下
1、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.
使用MessageListener介面實現時,當消費者拉取訊息之後,消費完成會自動提交offset,即enable.auto.commit為true時,適合使用此介面
2、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
使用AcknowledgeMessageListener時,當消費者消費一條訊息之後,不會自動提交offset,需要手動ack,即enable.auto.commit為false時,適合使用此介面
3、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.
4、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
BatchMessageListener和BatchAcknowledgingMessageListener介面作用與上述兩個介面大體類似,只是適合批量消費訊息決定是否自動提交offset
由於業務較重,且offset自動提交時,出現消費異常或者消費失敗的情況,消費者容易丟失訊息,所以需要採用手動提交offset的方式,因此,這裡實現了AcknowledgeMessageListener介面。
Spring配置檔案
配置思路:
1、確定需要定義的beans:
- 1 consumerProperties 消費者的基本屬性,包括指定bootstrap.servers,group.id等
- 2 consumerFactory :消費者工廠,配置完consumerProperties 後,需要將consumerProperties 作為引數,配置進consumerFactory中
- 3 containProperties: 消費者容器屬性物件的bean,這個bean會指定後續自定義的監聽介面bean及ackMode(手動提交時,採取什麼提交方式)
- 4 messageListenerContainer:消費者容器,啟動監聽介面的bean,需要將先前定義的consumerFactory 、containProperties配置進這個bean,並定義其init-method = doStart,在啟動spring時,便會自動啟動監聽介面,同時,此bean指定了topic
- 5 kafkaMessageListener:監聽介面,這個介面由自己定義,需要將其配置進containProperties中,
具體完整消費者的配置檔案如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--1、consumer屬性配置,hashMap-->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
<entry key="group.id" value="${kafka.group.id}"/>
<entry key="enable.auto.commit" value="false"/>
<entry key="session.timeout.ms" value="15000"/>
<!--<entry key="auto.offset.reset" value="earliest"/>-->
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer.encoding" value="UTF8"/>
<entry key="value.deserializer.encoding" value="UTF8"/>
</map>
</constructor-arg>
</bean>
<!--2、Kafka消費者工廠,DefaultKafkaConsumerFactory-->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!--3、監聽介面,AcknowledgingMessageListener-->
<bean id="kafkaMessageListener" class="com.lianjia.bigdata.dataarch.auth.kafka.KafkaMessageListener">
<property name="threadPool" ref="kafkaWorkerThreadPool"/>
</bean>
<bean id="kafkaWorkerThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="20"/>
<property name="maxPoolSize" value="200"/>
<property name="queueCapacity" value="500"/>
<property name="keepAliveSeconds" value="1800"/>
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>
<!--4、Kafka消費者容器,屬性配置-->
<bean id="containProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="${kafka.topic}"/>
<property name="ackMode" value="MANUAL_IMMEDIATE"/>
<property name="messageListener" ref="kafkaMessageListener"/>
</bean>
<!--5、Kafka消費者容器-->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart" >
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containProperties"/>
</bean>
</bean>
示例程式碼
寫了個簡單的測試用例
生產者:
實現每秒定時向brokers傳送一條訊息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class SimpleKafkaProducer implements Runnable {
protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class);
@Override
public void run() {
Map<String, Object> sendProps = senderProps();
Producer producer = new KafkaProducer(sendProps);
Integer currentNum = 0;
try {
LOGGER.info("start produce message");
while (true){
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("testTopic",currentNum, currentNum);
producer.send(producerRecord);
LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value());
currentNum++;
Thread.sleep(1000);
}
}catch (Exception e){
LOGGER.error("send message fail", e);
}finally {
producer.close();
}
}
public static void main(String[] args) {
SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();
new Thread(simpleKafkaProducer).start();
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
消費者
public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
@Override
public void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {
//TODO 這裡具體實現個人業務邏輯
// 最後 呼叫acknowledgment的ack方法,提交offset
acknowledgment.acknowledge();
}
}
消費者使用示例:這裡參考spring官方文件,簡單實現了一個消費者監聽介面示例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
public class SimpleKafkaConsumer extends SpringUnitTest {
protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
@Resource(name = "kafkaMessageListener")
private KafkaMessageListener kafkaMessageListener;
@Test
public void TestLinstener(){
ContainerProperties containerProps = new ContainerProperties("testTopic");
containerProps.setMessageListener(kafkaMessageListener);
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("messageListenerContainer");
container.start();
}
private static KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(props);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
private static Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
實現acknowledgeMessageListener介面之前,查閱了網上現有的文件,結果不盡如人意,只能試著自己去參考spring官方文件,慢慢摸索,最終實現手動提交offset的監聽介面,當然,Kafka的知識點,遠不止這些,後續還將繼續學習。