1. 程式人生 > 其它 >聊聊如何實現一個帶冪等模板的Kafka消費者

聊聊如何實現一個帶冪等模板的Kafka消費者

前言

不知道大家有沒有這樣的體驗,你跟你團隊的成員,宣導一些開發時注意事項,比如在使用訊息佇列時,在消費端處理訊息時,需根據業務場景,考慮一下冪等。後面走查程式碼的時,會發現一些資淺的開發,在需要冪等判斷的場景的情況下,仍然沒做冪等判斷。既然宣導無效,就乾脆實現一個帶冪等模板的消費者,然後開發基於這個模板進行消費端業務處理。本文就以spring-kafka舉例,聊聊如何實現一個帶冪等模板的kafka消費者

實現步驟

1、kafka自動提交改為手動提交

spring:
    kafka:
        consumer:
            #  是否自動提交偏移量,預設值是true,為了避免出現重複資料和資料丟失,可以把它設定為false,然後手動提交偏移量
            enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}

2、定義消費端模板抽象基類

@Slf4j
public abstract class BaseComusmeListener {

    @KafkaHandler
    public final void receive(@Payload String data, @Header(value = KafkaHeaders.RECEIVED_TOPIC,required = false) String receivedTopic,
                              @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY,required = false) String receivedMessageKey, @Header(value = KafkaHeaders.RECEIVED_TIMESTAMP,required = false) long receivedTimestamp, Acknowledgment ack){
        KafkaComsumePayLoad kafkaComsumePayLoad = buildKafkaComsumePayLoad(data,receivedTimestamp,receivedTopic,receivedMessageKey);

        boolean isRepeateConsume = isRepeateConsume(kafkaComsumePayLoad);
        if(isRepeateConsume){
            log.warn("messageKey:【{}】,topic:【{}】存在重複訊息資料-->【{}】",receivedMessageKey,receivedTopic,data);
            //手工確認
            ack.acknowledge();
            return;
        }

        if(doBiz(kafkaComsumePayLoad)){
            //手工確認
            ack.acknowledge();
        }


    }

    /**
     * 是否重複消費
     * @param kafkaComsumePayLoad
     * @return
     */
    public abstract boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad);

    /**
     * 業務處理
     * @param kafkaComsumerPayLoad
     */
    public abstract boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad);


    private KafkaComsumePayLoad buildKafkaComsumePayLoad(String data, long receivedTimestamp, String receivedTopic, String receivedMessageKey){
        return KafkaComsumePayLoad.builder()
                .data(data)
                .receivedTimestamp(receivedTimestamp)
                .receivedTopic(receivedTopic)
                .receivedMessageKey(receivedMessageKey)
                .build();
    }
}

3、自定義監聽註解【可選】

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
@Documented
@Component
public @interface LybGeekKafkaListener {

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "containerFactory")
    String containerFactory() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] topics() default {};


    @AliasFor(annotation = KafkaListener.class, attribute = "topicPattern")
    String topicPattern() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "topicPartitions")
    TopicPartition[] topicPartitions() default {};


    @AliasFor(annotation = KafkaListener.class, attribute = "containerGroup")
    String containerGroup() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "errorHandler")
    String errorHandler() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
    String groupId() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "idIsGroup")
    boolean idIsGroup() default true;


    @AliasFor(annotation = KafkaListener.class, attribute = "clientIdPrefix")
    String clientIdPrefix() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "beanRef")
    String beanRef() default "__listener";


    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "autoStartup")
    String autoStartup() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "properties")
    String[] properties() default {};


    @AliasFor(annotation = Component.class, attribute = "value")
    String value() default "";

}

3、重寫KafkaListener註解後置處理器【可選】

注: 因示例專案的springboot版本比較低,直接使用@LybGeekKafkaListener不起作用

public class LybGeekKafkaListenerAnnotationBeanPostProcessor<K, V>
		implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {

	private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";

	/**
	 * The bean name of the default {@link KafkaListenerContainerFactory}.
	 */
	public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";

	private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

	private final Log logger = LogFactory.getLog(getClass());

	private final ListenerScope listenerScope = new ListenerScope();

	private KafkaListenerEndpointRegistry endpointRegistry;

	private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

	private DefaultListableBeanFactory beanFactory;

	private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
			new KafkaHandlerMethodFactoryAdapter();

	private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();

	private final AtomicInteger counter = new AtomicInteger();

	private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();

	private BeanExpressionContext expressionContext;

	private Charset charset = StandardCharsets.UTF_8;

	@Override
	public int getOrder() {
		return LOWEST_PRECEDENCE;
	}

	/**
	 * Set the {@link KafkaListenerEndpointRegistry} that will hold the created
	 * endpoint and manage the lifecycle of the related listener container.
	 * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set.
	 */
	public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {
		this.endpointRegistry = endpointRegistry;
	}

	/**
	 * Set the name of the {@link KafkaListenerContainerFactory} to use by default.
	 * <p>If none is specified, "kafkaListenerContainerFactory" is assumed to be defined.
	 * @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name.
	 */
	public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {
		this.defaultContainerFactoryBeanName = containerFactoryBeanName;
	}

	/**
	 * Set the {@link MessageHandlerMethodFactory} to use to configure the message
	 * listener responsible to serve an endpoint detected by this processor.
	 * <p>By default, {@link DefaultMessageHandlerMethodFactory} is used and it
	 * can be configured further to support additional method arguments
	 * or to customize conversion and validation support. See
	 * {@link DefaultMessageHandlerMethodFactory} Javadoc for more details.
	 * @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
	 */
	public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {
		this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
	}

	/**
	 * Making a {@link BeanFactory} available is optional; if not set,
	 * {@link KafkaListenerConfigurer} beans won't get autodetected and an
	 * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
	 * @param beanFactory the {@link BeanFactory} to be used.
	 */
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		this.beanFactory = (DefaultListableBeanFactory) beanFactory;
		if (beanFactory instanceof ConfigurableListableBeanFactory) {
			this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
			this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
					this.listenerScope);
		}
	}

	/**
	 * Set a charset to use when converting byte[] to String in method arguments.
	 * Default UTF-8.
	 * @param charset the charset.
	 * @since 2.2
	 */
	public void setCharset(Charset charset) {
		Assert.notNull(charset, "'charset' cannot be null");
		this.charset = charset;
	}

	@Override
	public void afterSingletonsInstantiated() {
		this.registrar.setBeanFactory(this.beanFactory);

		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, KafkaListenerConfigurer> instances =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
			for (KafkaListenerConfigurer configurer : instances.values()) {
				configurer.configureKafkaListeners(this.registrar);
			}
		}

		if (this.registrar.getEndpointRegistry() == null) {
			if (this.endpointRegistry == null) {
				Assert.state(this.beanFactory != null,
						"BeanFactory must be set to find endpoint registry by bean name");
				this.endpointRegistry = this.beanFactory.getBean(
						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
						KafkaListenerEndpointRegistry.class);
			}
			this.registrar.setEndpointRegistry(this.endpointRegistry);
		}

		if (this.defaultContainerFactoryBeanName != null) {
			this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
		}

		// Set the custom handler method factory once resolved by the configurer
		MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
		if (handlerMethodFactory != null) {
			this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
		}
		else {
			addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
		}

		// Actually register all listeners
		this.registrar.afterPropertiesSet();

		beanFactory.removeBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME);

	}


	@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
		return bean;
	}

	@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			Collection<LybGeekKafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
			Map<Method, Set<LybGeekKafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<LybGeekKafkaListener>>) method -> {
						Set<LybGeekKafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("No @LybGeekKafkaListener annotations found on bean type: " + bean.getClass());
				}
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<LybGeekKafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					for (LybGeekKafkaListener listener : entry.getValue()) {
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				if (this.logger.isDebugEnabled()) {
					this.logger.debug(annotatedMethods.size() + " @LybGeekKafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
				}
			}
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

	/*
	 * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
	 */
	private Collection<LybGeekKafkaListener> findListenerAnnotations(Class<?> clazz) {
		Set<LybGeekKafkaListener> listeners = new HashSet<>();
		LybGeekKafkaListener ann = AnnotationUtils.findAnnotation(clazz, LybGeekKafkaListener.class);
		if (ann != null) {
			listeners.add(ann);
		}

		return listeners;
	}

	/*
	 * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
	 */
	private Set<LybGeekKafkaListener> findListenerAnnotations(Method method) {
		Set<LybGeekKafkaListener> listeners = new HashSet<>();
		LybGeekKafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, LybGeekKafkaListener.class);
		if (ann != null) {
			listeners.add(ann);
		}

		return listeners;
	}

	private void processMultiMethodListeners(Collection<LybGeekKafkaListener> classLevelListeners, List<Method> multiMethods,
			Object bean, String beanName) {

		List<Method> checkedMethods = new ArrayList<>();
		Method defaultMethod = null;
		for (Method method : multiMethods) {
			Method checked = checkProxy(method, bean);
			KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
			if (annotation != null && annotation.isDefault()) {
				final Method toAssert = defaultMethod;
				Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
						+ toAssert.toString() + " and " + method.toString());
				defaultMethod = checked;
			}
			checkedMethods.add(checked);
		}
		for (LybGeekKafkaListener classLevelListener : classLevelListeners) {
			MultiMethodKafkaListenerEndpoint<K, V> endpoint =
					new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
			processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
		}
	}

	protected void processKafkaListener(LybGeekKafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

	private Method checkProxy(Method methodArg, Object bean) {
		Method method = methodArg;
		if (AopUtils.isJdkDynamicProxy(bean)) {
			try {
				// Found a @LybGeekKafkaListener method on the target class for this JDK proxy ->
				// is it also present on the proxy itself?
				method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
				Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
				for (Class<?> iface : proxiedInterfaces) {
					try {
						method = iface.getMethod(method.getName(), method.getParameterTypes());
						break;
					}
					catch (NoSuchMethodException noMethod) {
					}
				}
			}
			catch (SecurityException ex) {
				ReflectionUtils.handleReflectionException(ex);
			}
			catch (NoSuchMethodException ex) {
				throw new IllegalStateException(String.format(
						"@LybGeekKafkaListener method '%s' found on bean target class '%s', " +
								"but not found in any interface(s) for bean JDK proxy. Either " +
								"pull the method up to an interface or switch to subclass (CGLIB) " +
								"proxies by setting proxy-target-class/proxyTargetClass " +
								"attribute to 'true'", method.getName(),
						method.getDeclaringClass().getSimpleName()), ex);
			}
		}
		return method;
	}

	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, LybGeekKafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.addListener(beanRef, bean);
		}
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}
		resolveKafkaProperties(endpoint, kafkaListener.properties());

		KafkaListenerContainerFactory<?> factory = null;
		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
		if (StringUtils.hasText(containerFactoryBeanName)) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			try {
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
				throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
						+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
						+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
			}
		}

		endpoint.setBeanFactory(this.beanFactory);
		String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
		if (StringUtils.hasText(errorHandlerBeanName)) {
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
		}
		this.registrar.registerEndpoint(endpoint, factory);
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.removeListener(beanRef);
		}
	}

	private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
		if (propertyStrings.length > 0) {
			Properties properties = new Properties();
			for (String property : propertyStrings) {
				String value = resolveExpressionAsString(property, "property");
				if (value != null) {
					try {
						properties.load(new StringReader(value));
					}
					catch (IOException e) {
						this.logger.error("Failed to load property " + property + ", continuing...", e);
					}
				}
			}
			endpoint.setConsumerProperties(properties);
		}
	}

	private String getEndpointId(LybGeekKafkaListener kafkaListener) {
		if (StringUtils.hasText(kafkaListener.id())) {
			return resolveExpressionAsString(kafkaListener.id(), "id");
		}
		else {
			return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
		}
	}

	private String getEndpointGroupId(LybGeekKafkaListener kafkaListener, String id) {
		String groupId = null;
		if (StringUtils.hasText(kafkaListener.groupId())) {
			groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
		}
		if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {
			groupId = id;
		}
		return groupId;
	}

	private TopicPartitionInitialOffset[] resolveTopicPartitions(LybGeekKafkaListener kafkaListener) {
		TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
		List<TopicPartitionInitialOffset> result = new ArrayList<>();
		if (topicPartitions.length > 0) {
			for (TopicPartition topicPartition : topicPartitions) {
				result.addAll(resolveTopicPartitionsList(topicPartition));
			}
		}
		return result.toArray(new TopicPartitionInitialOffset[0]);
	}

	private String[] resolveTopics(LybGeekKafkaListener kafkaListener) {
		String[] topics = kafkaListener.topics();
		List<String> result = new ArrayList<>();
		if (topics.length > 0) {
			for (String topic1 : topics) {
				Object topic = resolveExpression(topic1);
				resolveAsString(topic, result);
			}
		}
		return result.toArray(new String[0]);
	}

	private Pattern resolvePattern(LybGeekKafkaListener kafkaListener) {
		Pattern pattern = null;
		String text = kafkaListener.topicPattern();
		if (StringUtils.hasText(text)) {
			Object resolved = resolveExpression(text);
			if (resolved instanceof Pattern) {
				pattern = (Pattern) resolved;
			}
			else if (resolved instanceof String) {
				pattern = Pattern.compile((String) resolved);
			}
			else if (resolved != null) {
				throw new IllegalStateException(
						"topicPattern must resolve to a Pattern or String, not " + resolved.getClass());
			}
		}
		return pattern;
	}

	private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
		Object topic = resolveExpression(topicPartition.topic());
		Assert.state(topic instanceof String,
				"topic in @TopicPartition must resolve to a String, not " + topic.getClass());
		Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
		String[] partitions = topicPartition.partitions();
		PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
		Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
				"At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
		List<TopicPartitionInitialOffset> result = new ArrayList<>();
		for (String partition : partitions) {
			resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
		}

		for (PartitionOffset partitionOffset : partitionOffsets) {
			TopicPartitionInitialOffset topicPartitionOffset =
					new TopicPartitionInitialOffset((String) topic,
							resolvePartition(topic, partitionOffset),
							resolveInitialOffset(topic, partitionOffset),
							isRelative(topic, partitionOffset));
			if (!result.contains(topicPartitionOffset)) {
				result.add(topicPartitionOffset);
			}
			else {
				throw new IllegalArgumentException(
						String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
								topicPartitionOffset));
			}
		}
		return result;
	}

	private Integer resolvePartition(Object topic, PartitionOffset partitionOffset) {
		Object partitionValue = resolveExpression(partitionOffset.partition());
		Integer partition;
		if (partitionValue instanceof String) {
			Assert.state(StringUtils.hasText((String) partitionValue),
					"partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
			partition = Integer.valueOf((String) partitionValue);
		}
		else if (partitionValue instanceof Integer) {
			partition = (Integer) partitionValue;
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
					topic, partitionOffset.partition(), partitionValue.getClass()));
		}
		return partition;
	}

	private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
		Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
		Long initialOffset;
		if (initialOffsetValue instanceof String) {
			Assert.state(StringUtils.hasText((String) initialOffsetValue),
					"'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
			initialOffset = Long.valueOf((String) initialOffsetValue);
		}
		else if (initialOffsetValue instanceof Long) {
			initialOffset = (Long) initialOffsetValue;
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
					topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
		}
		return initialOffset;
	}

	private boolean isRelative(Object topic, PartitionOffset partitionOffset) {
		Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
		Boolean relativeToCurrent;
		if (relativeToCurrentValue instanceof String) {
			relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
		}
		else if (relativeToCurrentValue instanceof Boolean) {
			relativeToCurrent = (Boolean) relativeToCurrentValue;
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
					topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
		}
		return relativeToCurrent;
	}

	@SuppressWarnings("unchecked")
	private void resolveAsString(Object resolvedValue, List<String> result) {
		if (resolvedValue instanceof String[]) {
			for (Object object : (String[]) resolvedValue) {
				resolveAsString(object, result);
			}
		}
		else if (resolvedValue instanceof String) {
			result.add((String) resolvedValue);
		}
		else if (resolvedValue instanceof Iterable) {
			for (Object object : (Iterable<Object>) resolvedValue) {
				resolveAsString(object, result);
			}
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@LybGeekKafkaListener can't resolve '%s' as a String", resolvedValue));
		}
	}

	@SuppressWarnings("unchecked")
	private void resolvePartitionAsInteger(String topic, Object resolvedValue,
			List<TopicPartitionInitialOffset> result) {
		if (resolvedValue instanceof String[]) {
			for (Object object : (String[]) resolvedValue) {
				resolvePartitionAsInteger(topic, object, result);
			}
		}
		else if (resolvedValue instanceof String) {
			Assert.state(StringUtils.hasText((String) resolvedValue),
					"partition in @TopicPartition for topic '" + topic + "' cannot be empty");
			result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
		}
		else if (resolvedValue instanceof Integer[]) {
			for (Integer partition : (Integer[]) resolvedValue) {
				result.add(new TopicPartitionInitialOffset(topic, partition));
			}
		}
		else if (resolvedValue instanceof Integer) {
			result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
		}
		else if (resolvedValue instanceof Iterable) {
			for (Object object : (Iterable<Object>) resolvedValue) {
				resolvePartitionAsInteger(topic, object, result);
			}
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@LybGeekKafkaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
		}
	}

	private String resolveExpressionAsString(String value, String attribute) {
		Object resolved = resolveExpression(value);
		if (resolved instanceof String) {
			return (String) resolved;
		}
		else if (resolved != null) {
			throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
					+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
		}
		return null;
	}

	private Integer resolveExpressionAsInteger(String value, String attribute) {
		Object resolved = resolveExpression(value);
		Integer result = null;
		if (resolved instanceof String) {
			result = Integer.parseInt((String) resolved);
		}
		else if (resolved instanceof Number) {
			result = ((Number) resolved).intValue();
		}
		else if (resolved != null) {
			throw new IllegalStateException(
					"The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
							+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
		}
		return result;
	}

	private Boolean resolveExpressionAsBoolean(String value, String attribute) {
		Object resolved = resolveExpression(value);
		Boolean result = null;
		if (resolved instanceof Boolean) {
			result = (Boolean) resolved;
		}
		else if (resolved instanceof String) {
			result = Boolean.parseBoolean((String) resolved);
		}
		else if (resolved != null) {
			throw new IllegalStateException(
					"The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
							+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
		}
		return result;
	}

	private Object resolveExpression(String value) {
		return this.resolver.evaluate(resolve(value), this.expressionContext);
	}

	/**
	 * Resolve the specified value if possible.
	 * @param value the value to resolve
	 * @return the resolved value
	 * @see ConfigurableBeanFactory#resolveEmbeddedValue
	 */
	private String resolve(String value) {
		if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
			return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
		}
		return value;
	}

	private void addFormatters(FormatterRegistry registry) {
		for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
			registry.addConverter(converter);
		}
		for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
			registry.addConverter(converter);
		}
		for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {
			registry.addFormatter(formatter);
		}
	}

	private <T> Collection<T> getBeansOfType(Class<T> type) {
		if (LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
			return ((ListableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory)
					.getBeansOfType(type)
					.values();
		}
		else {
			return Collections.emptySet();
		}
	}

	/**
	 * An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying
	 * instance to use. Useful if the factory to use is determined once the endpoints
	 * have been registered but not created yet.
	 * @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory
	 */
	private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {

		private final DefaultFormattingConversionService defaultFormattingConversionService =
				new DefaultFormattingConversionService();

		private MessageHandlerMethodFactory messageHandlerMethodFactory;

		public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
			this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
		}

		@Override
		public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
			return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
		}

		private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
			if (this.messageHandlerMethodFactory == null) {
				this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
			}
			return this.messageHandlerMethodFactory;
		}

		private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
			DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
			Validator validator = LybGeekKafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
			if (validator != null) {
				defaultFactory.setValidator(validator);
			}
			defaultFactory.setBeanFactory(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory);

			ConfigurableBeanFactory cbf =
					LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
							(ConfigurableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory :
							null;


			this.defaultFormattingConversionService.addConverter(
					new BytesToStringConverter(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.charset));

			defaultFactory.setConversionService(this.defaultFormattingConversionService);

			List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();

			// Annotation-based argument resolution
			argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
			argumentResolvers.add(new HeadersMethodArgumentResolver());

			// Type-based argument resolution
			final GenericMessageConverter messageConverter =
					new GenericMessageConverter(this.defaultFormattingConversionService);
			argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
			argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {


				@Override
				public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
					Object resolved = super.resolveArgument(parameter, message);
					/*
					 * Replace KafkaNull list elements with null.
					 */
					if (resolved instanceof List) {
						List<?> list = ((List<?>) resolved);
						for (int i = 0; i < list.size(); i++) {
							if (list.get(i) instanceof KafkaNull) {
								list.set(i, null);
							}
						}
					}
					return resolved;
				}

				@Override
				protected boolean isEmptyPayload(Object payload) {
					return payload == null || payload instanceof KafkaNull;
				}

			});
			defaultFactory.setArgumentResolvers(argumentResolvers);

			defaultFactory.afterPropertiesSet();
			return defaultFactory;
		}

	}

	private static class BytesToStringConverter implements Converter<byte[], String> {


		private final Charset charset;

		BytesToStringConverter(Charset charset) {
			this.charset = charset;
		}

		@Override
		public String convert(byte[] source) {
			return new String(source, this.charset);
		}

	}

	private static class ListenerScope implements Scope {

		private final Map<String, Object> listeners = new HashMap<>();

		ListenerScope() {
			super();
		}

		public void addListener(String key, Object bean) {
			this.listeners.put(key, bean);
		}

		public void removeListener(String key) {
			this.listeners.remove(key);
		}

		@Override
		public Object get(String name, ObjectFactory<?> objectFactory) {
			return this.listeners.get(name);
		}

		@Override
		public Object remove(String name) {
			return null;
		}

		@Override
		public void registerDestructionCallback(String name, Runnable callback) {
		}

		@Override
		public Object resolveContextualObject(String key) {
			return this.listeners.get(key);
		}

		@Override
		public String getConversationId() {
			return null;
		}

	}

}

業務側如何使用

示例

@LybGeekKafkaListener(id = "createUser",topics = Constant.USER_TOPIC)
public class UserComsumer extends BaseComusmeListener {

    @Autowired
    private UserService userService;

    @Override
    public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {
        User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);

        return userService.isExistUserByUsername(user.getUsername());
    }

    @Override
    public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {
        User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);
        return userService.save(user);
    }
}

總結

有時候我們在宣導一些事情時,往往會發現即使我們已經說了N遍了,事情仍然會出現紕漏。這時候我們可以考慮把我們想宣導的東西工具化,通過工具來規範。比如有些業務,可能一些開發沒考慮全面,我們就可以基於業務,把一些核心的場景抽象成方法,然後開發人員基於這些抽象方法,做具體實現。

demo連結

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template