spring-kafka原始碼解析
前言:
關於Kafka,是一個比較流行的MQ工具,也是多數公司比較常用的。有關於Kafka的一些基本內容讀者可以參考官方文件,瞭解一下生產者消費者的使用。kafka的搭建筆者也不再詳述,網路上有很多文章介紹。
這篇文章主要是從原始碼的角度來分析一下Spring對kafka的使用封裝
筆者搭建的kafka版本為 kafka_2.11-0.11.0.1
1.原生的使用生產者、消費者
引入maven為:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency>
1)生產者程式碼如下
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("test", "Hello")); producer.close(); } }
總結:通過以上可知,生產者比較簡單,主要就是建立Producer,然後執行send方法即可
2)消費者程式碼如下
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
總結:消費者主要就是通過KafkaConsumer.poll()方法拉取ConsumerRecords獲取消費內容
2.Spring-kafka使用生產者、消費者
maven引入
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.8.RELEASE</version>
</dependency>
1)配置檔案application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
spring.kafka.consumer.group-id=test-consumer-group
#spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
2)建立PropsConfig類
@Configuration
@Data
public class PropsConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String broker;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
}
3)生產者程式碼
* 生產者KafkaTemplate建立,建立KafkaProducerConfig類,程式碼如下
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Autowired
PropsConfig propsConfig;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
* 生產者KafkaTemplate使用
@Component
public class ProducerServiceImpl implements ProducerService {
private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);
@Autowired
private KafkaTemplate template;
//傳送訊息方法
private void sendJson(String topic, String content) {
ListenableFuture<SendResult<String, String>> future = template.send(topic, content);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("msg OK. " + result.toString());
}
@Override
public void onFailure(Throwable ex) {
logger.error("msg send failed.", ex);
}
});
}
}
總結:生產者主要就是使用KafkaTemplate.send()方法來實現
2)消費者程式碼
* 建立消費者Config類,建立類KafkaConsumerConfig,程式碼如下
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Autowired
PropsConfig propsConfig;// 配置檔案物件
// 主要就是為了建立一下兩個Factory
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
return propsMap;
}
}
* 建立Listener監聽,主要是@KafkaListener
public class MyListener {
private static final String TPOIC = "test";
// 監聽topic為test的0,1,2三個partition
@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0","1","2" })})
public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id0 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("p0 Received message={}", message);
}
}
}
}
總結:可以看到這裡的消費者方式與傳統的消費者方式有很大差異,下面我們主要來分析消費者的程式碼
3.Spring-kafka生產者原始碼分析
通過上面的分析可以看到,生產者主要就是KafkaTemplate.send()方法,下面我們就來分析一下這個方法
// KafkaTemplate.send(String topic, V data)
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);// 這裡同原生的使用是一致的,將消費封裝為ProducerRecord
return doSend(producerRecord);// 主要在這裡
}
// KafkaTemplate.doSend(final ProducerRecord<K, V> producerRecord)
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
// 1.獲取生產者
// this.producerFactory.createProducer()可以看到是通過工廠類來建立的
final Producer<K, V> producer = getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
// 2.通過send方法傳送資料,並呼叫回撥方法
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
try {
if (exception == null) {
future.set(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null
&& KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
}
}
else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
producerRecord.partition(),
producerRecord.key(),
producerRecord.value(),
exception);
}
}
}
finally {
producer.close();
}
}
});
// 3.重新整理
if (this.autoFlush) {
flush();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
}
return future;
}
總結:傳送的方法比較簡單,就是簡單的通過Producer.send()來發送訊息,同原生的方式基本沒有區別
4.Spring-kafka消費者原始碼分析
消費者的實現比較隱蔽,我們主觀能看到的就是兩個註解@EnableKafka和@KafkaListener
按照Spring的一貫風格那就應該是這兩個註解實現的,下面我們來看下這兩個註解,首先來分析@EnableKafka
1)@EnableKafka註解分析
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)// 主要功能就是倒入一個類,我們來看下這個類
public @interface EnableKafka {
}
// KafkaBootstrapConfiguration
@Configuration
public class KafkaBootstrapConfiguration {
@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
return new KafkaListenerAnnotationBeanPostProcessor();
}
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry();
}
}
總結:可以看到KafkaBootstrapConfiguration的主要功能也就是建立兩個bean
* KafkaListenerAnnotationBeanPostProcessor,註解如下
從註解可知,其主要功能就是監聽@KafkaListener註解(我們重點分析這個類)
/**
* Bean post-processor that registers methods annotated with {@link KafkaListener}
* to be invoked by a Kafka message listener container created under the covers
* by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* according to the parameters of the annotation.
*
* <p>Annotated methods can use flexible arguments as defined by {@link KafkaListener}.
* KafkaListenerEndpointRegistry,註解如下
可知,其主要功能就是為KafkaListenerEndpoint介面管理MessageListenerContainer,我們暫時沒用到這個功能,故先不分析
有興趣的讀者可以參考這篇部落格:http://www.cnblogs.com/huangfox/p/9798446.html 裡面是其一個應用場景
/**
* Creates the necessary {@link MessageListenerContainer} instances for the
* registered {@linkplain KafkaListenerEndpoint endpoints}. Also manages the
* lifecycle of the listener containers, in particular within the lifecycle
* of the application context.
5.KafkaListenerAnnotationBeanPostProcessor的分析
類結構如下:
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
關於BeanPostProcessor,我們並不陌生,在之前的Spring原始碼系列分析中,實現這個介面的類,會實現其postProcessAfterInitialization()方法,可用來修改bean物件,如果讀者想對該介面有進一步瞭解,可參考:https://blog.csdn.net/elim168/article/details/76146351
1)KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization()方法分析
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
// 1.獲取bean對應的Class
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 2.查詢類是否有@KafkaListener註解(本例中類沒有相關注解)
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<Method>();
// 3.查詢類中方法上是否有對應的@KafkaListener註解,本例中是註解再方法上的
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {
@Override
public Set<KafkaListener> inspect(Method method) {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
}
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
new ReflectionUtils.MethodFilter() {
@Override
public boolean matches(Method method) {
return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;
}
});
multiMethods.addAll(methodsWithHandler);
}
// 4.annotatedMethods不為空,直接走else邏輯
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
if (this.logger.isTraceEnabled()) {
this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
}
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
// 主要邏輯就在這,處理方法上的@KafkaListener,接下來我們單獨分析這個方法
processKafkaListener(listener, method, bean, beanName);
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
2)KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(listener, method, bean, beanName);
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
// 主要功能在這裡
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}
// processListener()
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,
Object adminTarget, String beanName) {
// 1.封裝Endpoint
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPattern(resolvePattern(kafkaListener));
String group = kafkaListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
// 2.我們可以主動指定@KafkaListener的containerFactory,在本例中,我們沒有主動指定
KafkaListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
// 將endpoint註冊到registrar,我們繼續看下這個方法
this.registrar.registerEndpoint(endpoint, factory);
}
//KafkaListenerEndpointRegistrar.registerEndpoint()
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// 1.將endpoint封裝為KafkaListenerEndpointDescriptor
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
// 將descriptor新增到endpointDescriptors
// List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();
this.endpointDescriptors.add(descriptor);
}
}
}
一連串的分析,我們最終得到一個含有KafkaListener基本資訊的Endpoint,最後Endpoint被封裝到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被新增到KafkaListenerEndpointRegistrar.endpointDescriptors中,也就是一個list中,結束了。
那麼放到這個list的後續處理呢?
我們可以看下這個KafkaListenerEndpointRegistrar的類結構
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
實現了InitializingBean介面,這個介面中有一個afterPropertiesSet()方法,會在類載入完成之後自動執行,那麼我們來看下這個方法
3)KafkaListenerEndpointRegistrar.afterPropertiesSet()初始化方法
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
// 遍歷descriptor中的每一個KafkaListenerEndpointDescriptor,並執行對應的方法
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
// 關鍵在這裡
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
// KafkaListenerEndpointRegistry.registerListenerContainer()
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
registerListenerContainer(endpoint, factory, false);
}
@SuppressWarnings("unchecked")
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
// 1.建立Endpoint對應的MessageListenerContainer,將建立好的MessageListenerContainer放入listenerContainers
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
// 2.如果我們的KafkaListener註解中有對應的group資訊,則將container新增到對應的group中
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
// 3.startImmediately預設為false,則不執行start方法
if (startImmediately) {
startIfNecessary(container);
}
}
}
我們將建立Container的方法單獨拎出來介紹
4)KafkaListenerEndpointRegistry.createListenerContainer(endpoint, factory);
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
// 主要在這裡,通過工廠來建立Container
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
...
return listenerContainer;
}
// AbstractKafkaListenerContainerFactory.createListenerContainer()
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
// 重點在這裡
C instance = createContainerInstance(endpoint);
// 一系列設定引數的動作
...
// 初始化Container的"topics", "topicPartitions", "topicPattern","messageListener", "ackCount", "ackTime"等引數
endpoint.setupListenerContainer(instance, this.messageConverter);
initializeContainer(instance);
return instance;
}
// 預設ConcurrentKafkaListenerContainerFactory.createContainerInstance()
// 由該方法可知,最終建立的容器是ConcurrentMessageListenerContainer,根據使用者設定的引數
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
Collection<TopicPartitionInitialOffset> topicPartitions = endpoint.getTopicPartitions();
if (!topicPartitions.isEmpty()) {
ContainerProperties properties = new ContainerProperties(
topicPartitions.toArray(new TopicPartitionInitialOffset[topicPartitions.size()]));
return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
}
else {
Collection<String> topics = endpoint.getTopics();
if (!topics.isEmpty()) {
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[topics.size()]));
return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
}
else {
ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
}
}
}
總結:經過以上一些列的分析,我們@KafkaListener註解變為ConcurrentMessageListenerContainer類,這個Container中包含了我們所需要的topic相關資訊
6.ConcurrentMessageListenerContainer的原始碼分析
類結構如下:
其實現了Lifecycle介面,Lifecycle是一個神奇的介面,它定義了物件的生命週期方法,任何Spring管理的物件都可以實現這個藉口,當ApplicationContext自身啟動和停止時,它會自動調動上下文中所有生命週期的實現。更多關於Lifecycle的內容可參考:https://www.cnblogs.com/wade-luffy/p/6074088.html
所以可以知道:ConcurrentMessageListenerContainer在容器初始化完成時候會自動呼叫start()方法
1)ConcurrentMessageListenerContainer.start()
// AbstractMessageListenerContainer.start()
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
Assert.isTrue(
this.containerProperties.getMessageListener() instanceof KafkaDataListener,
"A " + GenericMessageListener.class.getName() + " implementation must be provided");
// 直接呼叫子類的doStart()方法
doStart();
}
}
}
// ConcurrentMessageListenerContainer.doStart()
@Override
protected void doStart() {
if (!isRunning()) {
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
// 1.將concurrency設定為topicPartitions大小,本例中我們的consumer監聽3個分片
if (topicPartitions != null
&& this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
setRunning(true);
// 2.根據分片數建立KafkaMessageListenerContainer,每一個分片建立一個Container
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container;
if (topicPartitions == null) {
container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
}
else {
container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
partitionSubset(containerProperties, i));
}
if (getBeanName() != null) {
container.setBeanName(getBeanName() + "-" + i);
}
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
// 3.啟動Container,並新增其到containers中
// 重點就在這個start方法,啟動了KafkaMessageListenerContainer,下面單獨來分析
container.start();
this.containers.add(container);
}
}
}
2)KafkaMessageListenerContainer.start()方法分析
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
Assert.isTrue(
this.containerProperties.getMessageListener() instanceof KafkaDataListener,
"A " + GenericMessageListener.class.getName() + " implementation must be provided");
// 直接呼叫子類KafkaMessageListenerContainer.doStart()方法
doStart();
}
}
}
// KafkaMessageListenerContainer.doStart()
@Override
protected void doStart() {
// 1.如果已啟動,則直接返回
if (isRunning()) {
return;
}
ContainerProperties containerProperties = getContainerProperties();
...
// 2.獲取對應的監聽類,本例中就是MyListener,被包裝到BatchMessagingMessageListenerAdapter中
Object messageListener = containerProperties.getMessageListener();
Assert.state(messageListener != null, "A MessageListener is required");
if (messageListener instanceof GenericAcknowledgingMessageListener) {
// 2.1 將監聽類MyListener賦值到acknowledgingMessageListener
this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
}
else if (messageListener instanceof GenericMessageListener) {
this.listener = (GenericMessageListener<?>) messageListener;
}
...
// 3.根據this.listener, this.acknowledgingMessageListener建立對應的listenerConsumer
this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
setRunning(true);
// 4.最後一步,將listenerConsumer作為一個任務提交
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}
總結:看到這裡,可以看出,最終將我們@KafkaListener中的topicPartitions
@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0","1","2" })})
轉換為ListenerConsumer,每一個partition生成一個ListenerConsumer
7.ListenerConsumer的工作
ListenerConsumer是一個私有類,在KafkaMessageListenerContainer類裡面,類結構如下(可知,其實現了一個Runnable介面)
1)主要成員變數及構造方法
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
private final ContainerProperties containerProperties = getContainerProperties();
private final OffsetCommitCallback commitCallback = this.containerProperties.getCommitCallback() != null
? this.containerProperties.getCommitCallback()
: new LoggingCommitCallback();
// 都是一些比較熟悉的屬性,關鍵是這個consumer,主要的消費功能就是它來實現的
private final Consumer<K, V> consumer;
private final Map<String, Map<Integer, Long>> offsets = new HashMap<String, Map<Integer, Long>>();
private final boolean autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
...
// 下面來看一下ListenerConsumer的構造方法
@SuppressWarnings("unchecked")
ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
this.theListener = listener == null ? ackListener : listener;
// 1.最終要的方法,構造Consumer
final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId, KafkaMessageListenerContainer.this.clientIdSuffix);
this.consumer = consumer;
ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
// 2.subscribe對應的topic
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
if (this.containerProperties.getTopicPattern() != null) {
consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
}
else {
consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
}
}
// 3.或者assign給定的partition
else {
List<TopicPartitionInitialOffset> topicPartitions =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new HashMap<>(topicPartitions.size());
for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
this.definedPartitions.put(topicPartition.topicPartition(),
new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
}
consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}
...
}
總結:通過構造方法,ListenerConsumer完成了Consumer的建立以及topic和partition的監聽
2)run()方法,主要業務實現方法
@Override
public void run() {
this.consumerThread = Thread.currentThread();
if (this.theListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
}
if (this.transactionManager != null) {
ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
}
this.count = 0;
this.last = System.currentTimeMillis();
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
}
long lastReceive = System.currentTimeMillis();
long lastAlertAt = lastReceive;
// 不停輪詢實現topic的消費
while (isRunning()) {
try {
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
processSeeks();
// 1.使用poll方法拉取資料
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
this.lastPoll = System.currentTimeMillis();
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
}
// 2.獲取到資料後執行監聽器的回撥方法
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
// 回撥在此
invokeListener(records);
}
else {
if (this.containerProperties.getIdleEventInterval() != null) {
long now = System.currentTimeMillis();
if (now > lastReceive + this.containerProperties.getIdleEventInterval()
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
publishIdleContainerEvent(now - lastReceive);
lastAlertAt = now;
if (this.theListener instanceof ConsumerSeekAware) {
seekPartitions(getAssignedPartitions(), true);
}
}
}
}
}
...
}
...
}
//invokeListener(records);
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}
else {
invokeRecordListener(records);
}
}
...
// 一層層追蹤下來最終到了
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
@SuppressWarnings("rawtypes") Producer producer) throws Error {
try {
if (this.acknowledgingMessageListener != null) {
// 最終實現在這裡,即呼叫BatchMessagingMessageListenerAdapter.onMessage方法
// BatchMessagingMessageListenerAdapter是一個包裝類,包裝了真正需要呼叫的類和對應方法
// 在本例中就是MyListener.listenPartition0()方法,至此,消費就結束了
this.acknowledgingMessageListener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
}
else {
this.listener.onMessage(record);
}
ackCurrent(record, producer);
}
...
return null;
}
總結:
1)@EnableKafka註解引入KafkaListenerAnnotationBeanPostProcessor類
2)KafkaListenerAnnotationBeanPostProcessor類監聽所有帶有@KafkaListener註解的類或方法
3)將每一個帶有@KafkaListener註解的類或方法封裝為一個MethodKafkaListenerEndpoint
4)將KafkaListenerEndpoint和對應的KafkaListenerContainerFactory封裝到一個KafkaListenerEndpointDescriptor中,並將其新增到KafkaListenerEndpointRegistrar.endpointDescriptors中
5)遍歷每一個KafkaListenerEndpointDescriptor,為其建立一個MessageListenerContainer,在本例中具體實現類為ConcurrentMessageListenerContainer
6)ConcurrentMessageListenerContainer在ApplicationContext建立完成之後,自動呼叫其start方法,start方法會根據我們在@KafkaListener中設定的topic和分片數來建立對應數量的KafkaMessageListenerContainer,並呼叫其start方法
7)KafkaMessageListenerContainer.start()方法會建立一個ListenerConsumer,ListenerConsumer是一個Runnable介面實現類
8)ListenerConsumer在構造的時候會建立一個Consumer,並assign對應的topic和partitions,然後執行一個while迴圈,在迴圈中不停的執行consumer.poll()方法拉取資料,並回調@KafkaListener對應的方法
至此,Spring-kafka的消費者程式碼分析結束。