1. 程式人生 > >spring-kafka原始碼解析

spring-kafka原始碼解析

前言:    

    關於Kafka,是一個比較流行的MQ工具,也是多數公司比較常用的。有關於Kafka的一些基本內容讀者可以參考官方文件,瞭解一下生產者消費者的使用。kafka的搭建筆者也不再詳述,網路上有很多文章介紹。

    這篇文章主要是從原始碼的角度來分析一下Spring對kafka的使用封裝

    筆者搭建的kafka版本為 kafka_2.11-0.11.0.1

 

1.原生的使用生產者、消費者

    引入maven為:

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.1.1</version>
</dependency>

    1)生產者程式碼如下

import org.apache.kafka.clients.producer.KafkaProducer;  
import org.apache.kafka.clients.producer.Producer;  
import org.apache.kafka.clients.producer.ProducerRecord;  
  
import java.util.Properties;  
  
public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();  
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++)
        	producer.send(new ProducerRecord<String, String>("test", "Hello"));
        	producer.close();
    }
}

    總結:通過以上可知,生產者比較簡單,主要就是建立Producer,然後執行send方法即可

 

    2)消費者程式碼如下

import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
  
import java.util.Arrays;  
import java.util.Properties;  
  
public class KafkaConsumerExample {  
    public static void main(String[] args) {  
        Properties props = new Properties();  
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");  
        props.put("enable.auto.commit", "true");  
        props.put("auto.commit.interval.ms", "1000");  
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);  
            for (ConsumerRecord<String, String> record : records)  
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());  
        }
    }
}

    總結:消費者主要就是通過KafkaConsumer.poll()方法拉取ConsumerRecords獲取消費內容

 

2.Spring-kafka使用生產者、消費者

    maven引入

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.8.RELEASE</version>
</dependency>

    1)配置檔案application.properties

spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
spring.kafka.consumer.group-id=test-consumer-group

#spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    2)建立PropsConfig類

@Configuration
@Data
public class PropsConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String broker;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

}

    3)生產者程式碼

    * 生產者KafkaTemplate建立,建立KafkaProducerConfig類,程式碼如下

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Autowired
    PropsConfig propsConfig;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

    * 生產者KafkaTemplate使用

@Component
public class ProducerServiceImpl implements ProducerService {
    private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);

    @Autowired
    private KafkaTemplate template;

    //傳送訊息方法
    private void sendJson(String topic, String content) {
        
        ListenableFuture<SendResult<String, String>> future = template.send(topic, content);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                logger.info("msg OK. " + result.toString());
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.error("msg send failed.", ex);
            }
        });
    }
}

    總結:生產者主要就是使用KafkaTemplate.send()方法來實現

 

    2)消費者程式碼

    * 建立消費者Config類,建立類KafkaConsumerConfig,程式碼如下

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Autowired
    PropsConfig propsConfig;// 配置檔案物件

    // 主要就是為了建立一下兩個Factory
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }
}

    * 建立Listener監聽,主要是@KafkaListener

public class MyListener {
    private static final String TPOIC = "test";
    // 監聽topic為test的0,1,2三個partition
    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0","1","2" })})
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }
}

    總結:可以看到這裡的消費者方式與傳統的消費者方式有很大差異,下面我們主要來分析消費者的程式碼

 

3.Spring-kafka生產者原始碼分析

    通過上面的分析可以看到,生產者主要就是KafkaTemplate.send()方法,下面我們就來分析一下這個方法

// KafkaTemplate.send(String topic, V data)
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);// 這裡同原生的使用是一致的,將消費封裝為ProducerRecord
    return doSend(producerRecord);// 主要在這裡
}

// KafkaTemplate.doSend(final ProducerRecord<K, V> producerRecord)
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    // 1.獲取生產者
    // this.producerFactory.createProducer()可以看到是通過工廠類來建立的
    final Producer<K, V> producer = getTheProducer();
    if (this.logger.isTraceEnabled()) {
        this.logger.trace("Sending: " + producerRecord);
    }
    final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    
    // 2.通過send方法傳送資料,並呼叫回撥方法
    producer.send(producerRecord, new Callback() {

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            try {
                if (exception == null) {
                    future.set(new SendResult<>(producerRecord, metadata));
                    if (KafkaTemplate.this.producerListener != null
                        && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                        KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
                                                                      producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                    }
                }
                else {
                    future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                    if (KafkaTemplate.this.producerListener != null) {
                        KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
                                                                    producerRecord.partition(),
                                                                    producerRecord.key(),
                                                                    producerRecord.value(),
                                                                    exception);
                    }
                }
            }
            finally {
                producer.close();
            }
        }

    });
    // 3.重新整理
    if (this.autoFlush) {
        flush();
    }
    if (this.logger.isTraceEnabled()) {
        this.logger.trace("Sent: " + producerRecord);
    }
    return future;
}

    總結:傳送的方法比較簡單,就是簡單的通過Producer.send()來發送訊息,同原生的方式基本沒有區別

 

4.Spring-kafka消費者原始碼分析

    消費者的實現比較隱蔽,我們主觀能看到的就是兩個註解@EnableKafka和@KafkaListener

    按照Spring的一貫風格那就應該是這兩個註解實現的,下面我們來看下這兩個註解,首先來分析@EnableKafka

 

    1)@EnableKafka註解分析

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)// 主要功能就是倒入一個類,我們來看下這個類
public @interface EnableKafka {
}

// KafkaBootstrapConfiguration
@Configuration
public class KafkaBootstrapConfiguration {

	@SuppressWarnings("rawtypes")
	@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
		return new KafkaListenerAnnotationBeanPostProcessor();
	}

	@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
	public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
		return new KafkaListenerEndpointRegistry();
	}

}

    總結:可以看到KafkaBootstrapConfiguration的主要功能也就是建立兩個bean

    * KafkaListenerAnnotationBeanPostProcessor,註解如下

        從註解可知,其主要功能就是監聽@KafkaListener註解(我們重點分析這個類)

/**
 * Bean post-processor that registers methods annotated with {@link KafkaListener}
 * to be invoked by a Kafka message listener container created under the covers
 * by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
 * according to the parameters of the annotation.
 *
 * <p>Annotated methods can use flexible arguments as defined by {@link KafkaListener}.

    * KafkaListenerEndpointRegistry,註解如下

        可知,其主要功能就是為KafkaListenerEndpoint介面管理MessageListenerContainer,我們暫時沒用到這個功能,故先不分析

        有興趣的讀者可以參考這篇部落格:http://www.cnblogs.com/huangfox/p/9798446.html 裡面是其一個應用場景 

/**
 * Creates the necessary {@link MessageListenerContainer} instances for the
 * registered {@linkplain KafkaListenerEndpoint endpoints}. Also manages the
 * lifecycle of the listener containers, in particular within the lifecycle
 * of the application context.

5.KafkaListenerAnnotationBeanPostProcessor的分析

    類結構如下:

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
		implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {

    關於BeanPostProcessor,我們並不陌生,在之前的Spring原始碼系列分析中,實現這個介面的類,會實現其postProcessAfterInitialization()方法,可用來修改bean物件,如果讀者想對該介面有進一步瞭解,可參考:https://blog.csdn.net/elim168/article/details/76146351 

 

    1)KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization()方法分析

	@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            // 1.獲取bean對應的Class
			Class<?> targetClass = AopUtils.getTargetClass(bean);
            // 2.查詢類是否有@KafkaListener註解(本例中類沒有相關注解)
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<Method>();
            // 3.查詢類中方法上是否有對應的@KafkaListener註解,本例中是註解再方法上的
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

						@Override
						public Set<KafkaListener> inspect(Method method) {
							Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
							return (!listenerMethods.isEmpty() ? listenerMethods : null);
						}

					});
			if (hasClassLevelListeners) {
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						new ReflectionUtils.MethodFilter() {

							@Override
							public boolean matches(Method method) {
								return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;
							}

						});
				multiMethods.addAll(methodsWithHandler);
			}
            // 4.annotatedMethods不為空,直接走else邏輯
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
				}
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
                        // 主要邏輯就在這,處理方法上的@KafkaListener,接下來我們單獨分析這個方法
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				if (this.logger.isDebugEnabled()) {
					this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
				}
			}
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

    2)KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(listener, method, bean, beanName);

protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
    Method methodToUse = checkProxy(method, bean);
    MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
    endpoint.setMethod(methodToUse);
    endpoint.setBeanFactory(this.beanFactory);
    // 主要功能在這裡
    processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}

// processListener()
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,
                               Object adminTarget, String beanName) {
    // 1.封裝Endpoint
    endpoint.setBean(bean);
    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    endpoint.setId(getEndpointId(kafkaListener));
    endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
    endpoint.setTopics(resolveTopics(kafkaListener));
    endpoint.setTopicPattern(resolvePattern(kafkaListener));
    String group = kafkaListener.group();
    if (StringUtils.hasText(group)) {
        Object resolvedGroup = resolveExpression(group);
        if (resolvedGroup instanceof String) {
            endpoint.setGroup((String) resolvedGroup);
        }
    }

    // 2.我們可以主動指定@KafkaListener的containerFactory,在本例中,我們沒有主動指定
    KafkaListenerContainerFactory<?> factory = null;
    String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
    if (StringUtils.hasText(containerFactoryBeanName)) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
        try {
            factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
        }
        catch (NoSuchBeanDefinitionException ex) {
            throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
                                                  + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
                                                  + " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
        }
    }

    // 將endpoint註冊到registrar,我們繼續看下這個方法
    this.registrar.registerEndpoint(endpoint, factory);
}

//KafkaListenerEndpointRegistrar.registerEndpoint()
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    Assert.notNull(endpoint, "Endpoint must be set");
    Assert.hasText(endpoint.getId(), "Endpoint id must be set");
    // 1.將endpoint封裝為KafkaListenerEndpointDescriptor
    KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
    synchronized (this.endpointDescriptors) {
        if (this.startImmediately) { // Register and start immediately
            this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                                                            resolveContainerFactory(descriptor), true);
        }
        else {
            
            // 將descriptor新增到endpointDescriptors
            // List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();
            this.endpointDescriptors.add(descriptor);
        }
    }
}

    一連串的分析,我們最終得到一個含有KafkaListener基本資訊的Endpoint,最後Endpoint被封裝到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被新增到KafkaListenerEndpointRegistrar.endpointDescriptors中,也就是一個list中,結束了。

    那麼放到這個list的後續處理呢?

    我們可以看下這個KafkaListenerEndpointRegistrar的類結構

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {

    實現了InitializingBean介面,這個介面中有一個afterPropertiesSet()方法,會在類載入完成之後自動執行,那麼我們來看下這個方法

 

    3)KafkaListenerEndpointRegistrar.afterPropertiesSet()初始化方法

@Override
public void afterPropertiesSet() {
    registerAllEndpoints();
}

protected void registerAllEndpoints() {
    synchronized (this.endpointDescriptors) {
        // 遍歷descriptor中的每一個KafkaListenerEndpointDescriptor,並執行對應的方法
        for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
            // 關鍵在這裡
            this.endpointRegistry.registerListenerContainer(
                descriptor.endpoint, resolveContainerFactory(descriptor));
        }
        this.startImmediately = true;  // trigger immediate startup
    }
}

// KafkaListenerEndpointRegistry.registerListenerContainer()
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    registerListenerContainer(endpoint, factory, false);
}

@SuppressWarnings("unchecked")
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
                                      boolean startImmediately) {
    Assert.notNull(endpoint, "Endpoint must not be null");
    Assert.notNull(factory, "Factory must not be null");

    String id = endpoint.getId();
    Assert.hasText(id, "Endpoint id must not be empty");
    synchronized (this.listenerContainers) {
        Assert.state(!this.listenerContainers.containsKey(id),
                     "Another endpoint is already registered with id '" + id + "'");
        
        // 1.建立Endpoint對應的MessageListenerContainer,將建立好的MessageListenerContainer放入listenerContainers
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
        
        // 2.如果我們的KafkaListener註解中有對應的group資訊,則將container新增到對應的group中
        if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
            List<MessageListenerContainer> containerGroup;
            if (this.applicationContext.containsBean(endpoint.getGroup())) {
                containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
            }
            else {
                containerGroup = new ArrayList<MessageListenerContainer>();
                this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
            }
            containerGroup.add(container);
        }
        // 3.startImmediately預設為false,則不執行start方法
        if (startImmediately) {
            startIfNecessary(container);
        }
    }
}

    我們將建立Container的方法單獨拎出來介紹

    4)KafkaListenerEndpointRegistry.createListenerContainer(endpoint, factory);

protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
                                                           KafkaListenerContainerFactory<?> factory) {

    // 主要在這裡,通過工廠來建立Container
    MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
    ...

    return listenerContainer;
}

// AbstractKafkaListenerContainerFactory.createListenerContainer()
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
    // 重點在這裡
    C instance = createContainerInstance(endpoint);
    // 一系列設定引數的動作
    ...

    // 初始化Container的"topics", "topicPartitions", "topicPattern","messageListener", "ackCount", "ackTime"等引數
    endpoint.setupListenerContainer(instance, this.messageConverter);
    initializeContainer(instance);

    return instance;
}

// 預設ConcurrentKafkaListenerContainerFactory.createContainerInstance()
// 由該方法可知,最終建立的容器是ConcurrentMessageListenerContainer,根據使用者設定的引數
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
    Collection<TopicPartitionInitialOffset> topicPartitions = endpoint.getTopicPartitions();
    if (!topicPartitions.isEmpty()) {
        ContainerProperties properties = new ContainerProperties(
            topicPartitions.toArray(new TopicPartitionInitialOffset[topicPartitions.size()]));
        return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
    }
    else {
        Collection<String> topics = endpoint.getTopics();
        if (!topics.isEmpty()) {
            ContainerProperties properties = new ContainerProperties(topics.toArray(new String[topics.size()]));
            return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
        }
        else {
            ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
            return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties);
        }
    }
}

    總結:經過以上一些列的分析,我們@KafkaListener註解變為ConcurrentMessageListenerContainer類,這個Container中包含了我們所需要的topic相關資訊

 

6.ConcurrentMessageListenerContainer的原始碼分析

    類結構如下:

    其實現了Lifecycle介面,Lifecycle是一個神奇的介面,它定義了物件的生命週期方法,任何Spring管理的物件都可以實現這個藉口,當ApplicationContext自身啟動和停止時,它會自動調動上下文中所有生命週期的實現。更多關於Lifecycle的內容可參考:https://www.cnblogs.com/wade-luffy/p/6074088.html 

 

    所以可以知道:ConcurrentMessageListenerContainer在容器初始化完成時候會自動呼叫start()方法

 

    1)ConcurrentMessageListenerContainer.start()

// AbstractMessageListenerContainer.start()
@Override
public final void start() {
    synchronized (this.lifecycleMonitor) {
        if (!isRunning()) {
            Assert.isTrue(
                this.containerProperties.getMessageListener() instanceof KafkaDataListener,
                "A " + GenericMessageListener.class.getName() + " implementation must be provided");
            // 直接呼叫子類的doStart()方法
            doStart();
        }
    }
}

// ConcurrentMessageListenerContainer.doStart()
@Override
protected void doStart() {
    if (!isRunning()) {
        ContainerProperties containerProperties = getContainerProperties();
        TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
        
        // 1.將concurrency設定為topicPartitions大小,本例中我們的consumer監聽3個分片
        if (topicPartitions != null
            && this.concurrency > topicPartitions.length) {
            this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
                             + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                             + topicPartitions.length);
            this.concurrency = topicPartitions.length;
        }
        setRunning(true);

        // 2.根據分片數建立KafkaMessageListenerContainer,每一個分片建立一個Container
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> container;
            if (topicPartitions == null) {
                container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
            }
            else {
                container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
                                                                partitionSubset(containerProperties, i));
            }
            if (getBeanName() != null) {
                container.setBeanName(getBeanName() + "-" + i);
            }
            if (getApplicationEventPublisher() != null) {
                container.setApplicationEventPublisher(getApplicationEventPublisher());
            }
            container.setClientIdSuffix("-" + i);
            
            // 3.啟動Container,並新增其到containers中
            // 重點就在這個start方法,啟動了KafkaMessageListenerContainer,下面單獨來分析
            container.start();
            this.containers.add(container);
        }
    }
}

    2)KafkaMessageListenerContainer.start()方法分析

@Override
public final void start() {
    synchronized (this.lifecycleMonitor) {
        if (!isRunning()) {
            Assert.isTrue(
                this.containerProperties.getMessageListener() instanceof KafkaDataListener,
                "A " + GenericMessageListener.class.getName() + " implementation must be provided");
            // 直接呼叫子類KafkaMessageListenerContainer.doStart()方法
            doStart();
        }
    }
}

// KafkaMessageListenerContainer.doStart()
@Override
protected void doStart() {
    // 1.如果已啟動,則直接返回
    if (isRunning()) {
        return;
    }
    ContainerProperties containerProperties = getContainerProperties();
    ...
    // 2.獲取對應的監聽類,本例中就是MyListener,被包裝到BatchMessagingMessageListenerAdapter中
    Object messageListener = containerProperties.getMessageListener();
    Assert.state(messageListener != null, "A MessageListener is required");
    if (messageListener instanceof GenericAcknowledgingMessageListener) {
        // 2.1 將監聽類MyListener賦值到acknowledgingMessageListener
        this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
    }
    else if (messageListener instanceof GenericMessageListener) {
        this.listener = (GenericMessageListener<?>) messageListener;
    }
    ...
    // 3.根據this.listener, this.acknowledgingMessageListener建立對應的listenerConsumer
    this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
    setRunning(true);
    
    // 4.最後一步,將listenerConsumer作為一個任務提交
    this.listenerConsumerFuture = containerProperties
        .getConsumerTaskExecutor()
        .submitListenable(this.listenerConsumer);
}

    總結:看到這裡,可以看出,最終將我們@KafkaListener中的topicPartitions

 @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0","1","2" })})

    轉換為ListenerConsumer,每一個partition生成一個ListenerConsumer

 

7.ListenerConsumer的工作

    ListenerConsumer是一個私有類,在KafkaMessageListenerContainer類裡面,類結構如下(可知,其實現了一個Runnable介面)

    1)主要成員變數及構造方法

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {

    private final ContainerProperties containerProperties = getContainerProperties();

    private final OffsetCommitCallback commitCallback = this.containerProperties.getCommitCallback() != null
        ? this.containerProperties.getCommitCallback()
        : new LoggingCommitCallback();

    // 都是一些比較熟悉的屬性,關鍵是這個consumer,主要的消費功能就是它來實現的
    private final Consumer<K, V> consumer;

    private final Map<String, Map<Integer, Long>> offsets = new HashMap<String, Map<Integer, Long>>();

    private final boolean autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
    ...
      
// 下面來看一下ListenerConsumer的構造方法   
@SuppressWarnings("unchecked")
ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
	Assert.state(!this.isAnyManualAck || !this.autoCommit,
			"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
	this.theListener = listener == null ? ackListener : listener;
        
    // 1.最終要的方法,構造Consumer
	final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
			this.consumerGroupId, KafkaMessageListenerContainer.this.clientIdSuffix);
	this.consumer = consumer;

	ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);

    // 2.subscribe對應的topic
	if (KafkaMessageListenerContainer.this.topicPartitions == null) {
		if (this.containerProperties.getTopicPattern() != null) {
			consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
		}
		else {
			consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
		}
	}
        
    // 3.或者assign給定的partition
	else {
		List<TopicPartitionInitialOffset> topicPartitions =
				Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
		this.definedPartitions = new HashMap<>(topicPartitions.size());
		for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
			this.definedPartitions.put(topicPartition.topicPartition(),
					new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
		}
		consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
	}
    ...
}

    總結:通過構造方法,ListenerConsumer完成了Consumer的建立以及topic和partition的監聽

 

    2)run()方法,主要業務實現方法

@Override
public void run() {
    this.consumerThread = Thread.currentThread();
    if (this.theListener instanceof ConsumerSeekAware) {
        ((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
    }
    if (this.transactionManager != null) {
        ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
    }
    this.count = 0;
    this.last = System.currentTimeMillis();
    if (isRunning() && this.definedPartitions != null) {
        initPartitionsIfNeeded();
    }
    long lastReceive = System.currentTimeMillis();
    long lastAlertAt = lastReceive;
    
    // 不停輪詢實現topic的消費
    while (isRunning()) {
        try {
            if (!this.autoCommit && !this.isRecordAck) {
                processCommits();
            }
            processSeeks();
            
            // 1.使用poll方法拉取資料
            ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
            this.lastPoll = System.currentTimeMillis();

            if (records != null && this.logger.isDebugEnabled()) {
                this.logger.debug("Received: " + records.count() + " records");
            }
            // 2.獲取到資料後執行監聽器的回撥方法
            if (records != null && records.count() > 0) {
                if (this.containerProperties.getIdleEventInterval() != null) {
                    lastReceive = System.currentTimeMillis();
                }
                // 回撥在此
                invokeListener(records);
            }
            else {
                if (this.containerProperties.getIdleEventInterval() != null) {
                    long now = System.currentTimeMillis();
                    if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                        && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                        publishIdleContainerEvent(now - lastReceive);
                        lastAlertAt = now;
                        if (this.theListener instanceof ConsumerSeekAware) {
                            seekPartitions(getAssignedPartitions(), true);
                        }
                    }
                }
            }
        }
        ...
    }
    ...
}

//invokeListener(records);
private void invokeListener(final ConsumerRecords<K, V> records) {
    if (this.isBatchListener) {
        invokeBatchListener(records);
    }
    else {
        invokeRecordListener(records);
    }
}
...
// 一層層追蹤下來最終到了
    private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
                                                    @SuppressWarnings("rawtypes") Producer producer) throws Error {
    try {
        if (this.acknowledgingMessageListener != null) {
            // 最終實現在這裡,即呼叫BatchMessagingMessageListenerAdapter.onMessage方法
            // BatchMessagingMessageListenerAdapter是一個包裝類,包裝了真正需要呼叫的類和對應方法
            // 在本例中就是MyListener.listenPartition0()方法,至此,消費就結束了
            this.acknowledgingMessageListener.onMessage(record,
                                                        this.isAnyManualAck
                                                        ? new ConsumerAcknowledgment(record)
                                                        : null);
        }
        else {
            this.listener.onMessage(record);
        }
        ackCurrent(record, producer);
    }
    ...
        return null;
}

 

總結:

    1)@EnableKafka註解引入KafkaListenerAnnotationBeanPostProcessor類

    2)KafkaListenerAnnotationBeanPostProcessor類監聽所有帶有@KafkaListener註解的類或方法

    3)將每一個帶有@KafkaListener註解的類或方法封裝為一個MethodKafkaListenerEndpoint

    4)將KafkaListenerEndpoint和對應的KafkaListenerContainerFactory封裝到一個KafkaListenerEndpointDescriptor中,並將其新增到KafkaListenerEndpointRegistrar.endpointDescriptors中

    5)遍歷每一個KafkaListenerEndpointDescriptor,為其建立一個MessageListenerContainer,在本例中具體實現類為ConcurrentMessageListenerContainer

    6)ConcurrentMessageListenerContainer在ApplicationContext建立完成之後,自動呼叫其start方法,start方法會根據我們在@KafkaListener中設定的topic和分片數來建立對應數量的KafkaMessageListenerContainer,並呼叫其start方法

    7)KafkaMessageListenerContainer.start()方法會建立一個ListenerConsumer,ListenerConsumer是一個Runnable介面實現類

    8)ListenerConsumer在構造的時候會建立一個Consumer,並assign對應的topic和partitions,然後執行一個while迴圈,在迴圈中不停的執行consumer.poll()方法拉取資料,並回調@KafkaListener對應的方法

 

   至此,Spring-kafka的消費者程式碼分析結束。