聊聊如何實現一個帶冪等模板的Kafka消費者
阿新 • • 發佈:2022-05-24
前言
不知道大家有沒有這樣的體驗,你跟你團隊的成員,宣導一些開發時注意事項,比如在使用訊息佇列時,在消費端處理訊息時,需根據業務場景,考慮一下冪等。後面走查程式碼的時,會發現一些資淺的開發,在需要冪等判斷的場景的情況下,仍然沒做冪等判斷。既然宣導無效,就乾脆實現一個帶冪等模板的消費者,然後開發基於這個模板進行消費端業務處理。本文就以spring-kafka舉例,聊聊如何實現一個帶冪等模板的kafka消費者
實現步驟
1、kafka自動提交改為手動提交
spring: kafka: consumer: # 是否自動提交偏移量,預設值是true,為了避免出現重複資料和資料丟失,可以把它設定為false,然後手動提交偏移量 enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}
2、定義消費端模板抽象基類
@Slf4j public abstract class BaseComusmeListener { @KafkaHandler public final void receive(@Payload String data, @Header(value = KafkaHeaders.RECEIVED_TOPIC,required = false) String receivedTopic, @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY,required = false) String receivedMessageKey, @Header(value = KafkaHeaders.RECEIVED_TIMESTAMP,required = false) long receivedTimestamp, Acknowledgment ack){ KafkaComsumePayLoad kafkaComsumePayLoad = buildKafkaComsumePayLoad(data,receivedTimestamp,receivedTopic,receivedMessageKey); boolean isRepeateConsume = isRepeateConsume(kafkaComsumePayLoad); if(isRepeateConsume){ log.warn("messageKey:【{}】,topic:【{}】存在重複訊息資料-->【{}】",receivedMessageKey,receivedTopic,data); //手工確認 ack.acknowledge(); return; } if(doBiz(kafkaComsumePayLoad)){ //手工確認 ack.acknowledge(); } } /** * 是否重複消費 * @param kafkaComsumePayLoad * @return */ public abstract boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad); /** * 業務處理 * @param kafkaComsumerPayLoad */ public abstract boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad); private KafkaComsumePayLoad buildKafkaComsumePayLoad(String data, long receivedTimestamp, String receivedTopic, String receivedMessageKey){ return KafkaComsumePayLoad.builder() .data(data) .receivedTimestamp(receivedTimestamp) .receivedTopic(receivedTopic) .receivedMessageKey(receivedMessageKey) .build(); } }
3、自定義監聽註解【可選】
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @KafkaListener @Documented @Component public @interface LybGeekKafkaListener { @AliasFor(annotation = KafkaListener.class, attribute = "id") String id() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "containerFactory") String containerFactory() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "topics") String[] topics() default {}; @AliasFor(annotation = KafkaListener.class, attribute = "topicPattern") String topicPattern() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "topicPartitions") TopicPartition[] topicPartitions() default {}; @AliasFor(annotation = KafkaListener.class, attribute = "containerGroup") String containerGroup() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "errorHandler") String errorHandler() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "groupId") String groupId() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "idIsGroup") boolean idIsGroup() default true; @AliasFor(annotation = KafkaListener.class, attribute = "clientIdPrefix") String clientIdPrefix() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "beanRef") String beanRef() default "__listener"; @AliasFor(annotation = KafkaListener.class, attribute = "concurrency") String concurrency() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "autoStartup") String autoStartup() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "properties") String[] properties() default {}; @AliasFor(annotation = Component.class, attribute = "value") String value() default ""; }
3、重寫KafkaListener註解後置處理器【可選】
注: 因示例專案的springboot版本比較低,直接使用@LybGeekKafkaListener不起作用
public class LybGeekKafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";
/**
* The bean name of the default {@link KafkaListenerContainerFactory}.
*/
public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
private final Log logger = LogFactory.getLog(getClass());
private final ListenerScope listenerScope = new ListenerScope();
private KafkaListenerEndpointRegistry endpointRegistry;
private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
private DefaultListableBeanFactory beanFactory;
private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
new KafkaHandlerMethodFactoryAdapter();
private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
private final AtomicInteger counter = new AtomicInteger();
private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
private BeanExpressionContext expressionContext;
private Charset charset = StandardCharsets.UTF_8;
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
/**
* Set the {@link KafkaListenerEndpointRegistry} that will hold the created
* endpoint and manage the lifecycle of the related listener container.
* @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set.
*/
public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {
this.endpointRegistry = endpointRegistry;
}
/**
* Set the name of the {@link KafkaListenerContainerFactory} to use by default.
* <p>If none is specified, "kafkaListenerContainerFactory" is assumed to be defined.
* @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name.
*/
public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {
this.defaultContainerFactoryBeanName = containerFactoryBeanName;
}
/**
* Set the {@link MessageHandlerMethodFactory} to use to configure the message
* listener responsible to serve an endpoint detected by this processor.
* <p>By default, {@link DefaultMessageHandlerMethodFactory} is used and it
* can be configured further to support additional method arguments
* or to customize conversion and validation support. See
* {@link DefaultMessageHandlerMethodFactory} Javadoc for more details.
* @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
*/
public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
}
/**
* Making a {@link BeanFactory} available is optional; if not set,
* {@link KafkaListenerConfigurer} beans won't get autodetected and an
* {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
* @param beanFactory the {@link BeanFactory} to be used.
*/
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = (DefaultListableBeanFactory) beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
this.listenerScope);
}
}
/**
* Set a charset to use when converting byte[] to String in method arguments.
* Default UTF-8.
* @param charset the charset.
* @since 2.2
*/
public void setCharset(Charset charset) {
Assert.notNull(charset, "'charset' cannot be null");
this.charset = charset;
}
@Override
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, KafkaListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
for (KafkaListenerConfigurer configurer : instances.values()) {
configurer.configureKafkaListeners(this.registrar);
}
}
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
KafkaListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
if (this.defaultContainerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
else {
addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
}
// Actually register all listeners
this.registrar.afterPropertiesSet();
beanFactory.removeBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME);
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<LybGeekKafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
Map<Method, Set<LybGeekKafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<LybGeekKafkaListener>>) method -> {
Set<LybGeekKafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
if (this.logger.isTraceEnabled()) {
this.logger.trace("No @LybGeekKafkaListener annotations found on bean type: " + bean.getClass());
}
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<LybGeekKafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (LybGeekKafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(annotatedMethods.size() + " @LybGeekKafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
/*
* AnnotationUtils.getRepeatableAnnotations does not look at interfaces
*/
private Collection<LybGeekKafkaListener> findListenerAnnotations(Class<?> clazz) {
Set<LybGeekKafkaListener> listeners = new HashSet<>();
LybGeekKafkaListener ann = AnnotationUtils.findAnnotation(clazz, LybGeekKafkaListener.class);
if (ann != null) {
listeners.add(ann);
}
return listeners;
}
/*
* AnnotationUtils.getRepeatableAnnotations does not look at interfaces
*/
private Set<LybGeekKafkaListener> findListenerAnnotations(Method method) {
Set<LybGeekKafkaListener> listeners = new HashSet<>();
LybGeekKafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, LybGeekKafkaListener.class);
if (ann != null) {
listeners.add(ann);
}
return listeners;
}
private void processMultiMethodListeners(Collection<LybGeekKafkaListener> classLevelListeners, List<Method> multiMethods,
Object bean, String beanName) {
List<Method> checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
if (annotation != null && annotation.isDefault()) {
final Method toAssert = defaultMethod;
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ toAssert.toString() + " and " + method.toString());
defaultMethod = checked;
}
checkedMethods.add(checked);
}
for (LybGeekKafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
}
}
protected void processKafkaListener(LybGeekKafkaListener kafkaListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
try {
// Found a @LybGeekKafkaListener method on the target class for this JDK proxy ->
// is it also present on the proxy itself?
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<?> iface : proxiedInterfaces) {
try {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
catch (NoSuchMethodException noMethod) {
}
}
}
catch (SecurityException ex) {
ReflectionUtils.handleReflectionException(ex);
}
catch (NoSuchMethodException ex) {
throw new IllegalStateException(String.format(
"@LybGeekKafkaListener method '%s' found on bean target class '%s', " +
"but not found in any interface(s) for bean JDK proxy. Either " +
"pull the method up to an interface or switch to subclass (CGLIB) " +
"proxies by setting proxy-target-class/proxyTargetClass " +
"attribute to 'true'", method.getName(),
method.getDeclaringClass().getSimpleName()), ex);
}
}
return method;
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, LybGeekKafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
String beanRef = kafkaListener.beanRef();
if (StringUtils.hasText(beanRef)) {
this.listenerScope.addListener(beanRef, bean);
}
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPattern(resolvePattern(kafkaListener));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
String group = kafkaListener.containerGroup();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String concurrency = kafkaListener.concurrency();
if (StringUtils.hasText(concurrency)) {
endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
}
String autoStartup = kafkaListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
}
resolveKafkaProperties(endpoint, kafkaListener.properties());
KafkaListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
endpoint.setBeanFactory(this.beanFactory);
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
}
this.registrar.registerEndpoint(endpoint, factory);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
}
}
private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
if (propertyStrings.length > 0) {
Properties properties = new Properties();
for (String property : propertyStrings) {
String value = resolveExpressionAsString(property, "property");
if (value != null) {
try {
properties.load(new StringReader(value));
}
catch (IOException e) {
this.logger.error("Failed to load property " + property + ", continuing...", e);
}
}
}
endpoint.setConsumerProperties(properties);
}
}
private String getEndpointId(LybGeekKafkaListener kafkaListener) {
if (StringUtils.hasText(kafkaListener.id())) {
return resolveExpressionAsString(kafkaListener.id(), "id");
}
else {
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
}
}
private String getEndpointGroupId(LybGeekKafkaListener kafkaListener, String id) {
String groupId = null;
if (StringUtils.hasText(kafkaListener.groupId())) {
groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
}
if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {
groupId = id;
}
return groupId;
}
private TopicPartitionInitialOffset[] resolveTopicPartitions(LybGeekKafkaListener kafkaListener) {
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
List<TopicPartitionInitialOffset> result = new ArrayList<>();
if (topicPartitions.length > 0) {
for (TopicPartition topicPartition : topicPartitions) {
result.addAll(resolveTopicPartitionsList(topicPartition));
}
}
return result.toArray(new TopicPartitionInitialOffset[0]);
}
private String[] resolveTopics(LybGeekKafkaListener kafkaListener) {
String[] topics = kafkaListener.topics();
List<String> result = new ArrayList<>();
if (topics.length > 0) {
for (String topic1 : topics) {
Object topic = resolveExpression(topic1);
resolveAsString(topic, result);
}
}
return result.toArray(new String[0]);
}
private Pattern resolvePattern(LybGeekKafkaListener kafkaListener) {
Pattern pattern = null;
String text = kafkaListener.topicPattern();
if (StringUtils.hasText(text)) {
Object resolved = resolveExpression(text);
if (resolved instanceof Pattern) {
pattern = (Pattern) resolved;
}
else if (resolved instanceof String) {
pattern = Pattern.compile((String) resolved);
}
else if (resolved != null) {
throw new IllegalStateException(
"topicPattern must resolve to a Pattern or String, not " + resolved.getClass());
}
}
return pattern;
}
private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
Object topic = resolveExpression(topicPartition.topic());
Assert.state(topic instanceof String,
"topic in @TopicPartition must resolve to a String, not " + topic.getClass());
Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
String[] partitions = topicPartition.partitions();
PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
"At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
List<TopicPartitionInitialOffset> result = new ArrayList<>();
for (String partition : partitions) {
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
}
for (PartitionOffset partitionOffset : partitionOffsets) {
TopicPartitionInitialOffset topicPartitionOffset =
new TopicPartitionInitialOffset((String) topic,
resolvePartition(topic, partitionOffset),
resolveInitialOffset(topic, partitionOffset),
isRelative(topic, partitionOffset));
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
}
}
return result;
}
private Integer resolvePartition(Object topic, PartitionOffset partitionOffset) {
Object partitionValue = resolveExpression(partitionOffset.partition());
Integer partition;
if (partitionValue instanceof String) {
Assert.state(StringUtils.hasText((String) partitionValue),
"partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
partition = Integer.valueOf((String) partitionValue);
}
else if (partitionValue instanceof Integer) {
partition = (Integer) partitionValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
topic, partitionOffset.partition(), partitionValue.getClass()));
}
return partition;
}
private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
Long initialOffset;
if (initialOffsetValue instanceof String) {
Assert.state(StringUtils.hasText((String) initialOffsetValue),
"'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
initialOffset = Long.valueOf((String) initialOffsetValue);
}
else if (initialOffsetValue instanceof Long) {
initialOffset = (Long) initialOffsetValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
}
return initialOffset;
}
private boolean isRelative(Object topic, PartitionOffset partitionOffset) {
Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
Boolean relativeToCurrent;
if (relativeToCurrentValue instanceof String) {
relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
}
else if (relativeToCurrentValue instanceof Boolean) {
relativeToCurrent = (Boolean) relativeToCurrentValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
}
return relativeToCurrent;
}
@SuppressWarnings("unchecked")
private void resolveAsString(Object resolvedValue, List<String> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolveAsString(object, result);
}
}
else if (resolvedValue instanceof String) {
result.add((String) resolvedValue);
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolveAsString(object, result);
}
}
else {
throw new IllegalArgumentException(String.format(
"@LybGeekKafkaListener can't resolve '%s' as a String", resolvedValue));
}
}
@SuppressWarnings("unchecked")
private void resolvePartitionAsInteger(String topic, Object resolvedValue,
List<TopicPartitionInitialOffset> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
}
}
else if (resolvedValue instanceof String) {
Assert.state(StringUtils.hasText((String) resolvedValue),
"partition in @TopicPartition for topic '" + topic + "' cannot be empty");
result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
}
else if (resolvedValue instanceof Integer[]) {
for (Integer partition : (Integer[]) resolvedValue) {
result.add(new TopicPartitionInitialOffset(topic, partition));
}
}
else if (resolvedValue instanceof Integer) {
result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
}
}
else {
throw new IllegalArgumentException(String.format(
"@LybGeekKafkaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
}
}
private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
}
else if (resolved != null) {
throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
return null;
}
private Integer resolveExpressionAsInteger(String value, String attribute) {
Object resolved = resolveExpression(value);
Integer result = null;
if (resolved instanceof String) {
result = Integer.parseInt((String) resolved);
}
else if (resolved instanceof Number) {
result = ((Number) resolved).intValue();
}
else if (resolved != null) {
throw new IllegalStateException(
"The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
return result;
}
private Boolean resolveExpressionAsBoolean(String value, String attribute) {
Object resolved = resolveExpression(value);
Boolean result = null;
if (resolved instanceof Boolean) {
result = (Boolean) resolved;
}
else if (resolved instanceof String) {
result = Boolean.parseBoolean((String) resolved);
}
else if (resolved != null) {
throw new IllegalStateException(
"The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
return result;
}
private Object resolveExpression(String value) {
return this.resolver.evaluate(resolve(value), this.expressionContext);
}
/**
* Resolve the specified value if possible.
* @param value the value to resolve
* @return the resolved value
* @see ConfigurableBeanFactory#resolveEmbeddedValue
*/
private String resolve(String value) {
if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
}
return value;
}
private void addFormatters(FormatterRegistry registry) {
for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
registry.addConverter(converter);
}
for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
registry.addConverter(converter);
}
for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {
registry.addFormatter(formatter);
}
}
private <T> Collection<T> getBeansOfType(Class<T> type) {
if (LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
return ((ListableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory)
.getBeansOfType(type)
.values();
}
else {
return Collections.emptySet();
}
}
/**
* An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying
* instance to use. Useful if the factory to use is determined once the endpoints
* have been registered but not created yet.
* @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory
*/
private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
private final DefaultFormattingConversionService defaultFormattingConversionService =
new DefaultFormattingConversionService();
private MessageHandlerMethodFactory messageHandlerMethodFactory;
public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
}
@Override
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
}
private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
if (this.messageHandlerMethodFactory == null) {
this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
}
return this.messageHandlerMethodFactory;
}
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
Validator validator = LybGeekKafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
if (validator != null) {
defaultFactory.setValidator(validator);
}
defaultFactory.setBeanFactory(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
ConfigurableBeanFactory cbf =
LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
(ConfigurableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory :
null;
this.defaultFormattingConversionService.addConverter(
new BytesToStringConverter(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.charset));
defaultFactory.setConversionService(this.defaultFormattingConversionService);
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
// Annotation-based argument resolution
argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
argumentResolvers.add(new HeadersMethodArgumentResolver());
// Type-based argument resolution
final GenericMessageConverter messageConverter =
new GenericMessageConverter(this.defaultFormattingConversionService);
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
Object resolved = super.resolveArgument(parameter, message);
/*
* Replace KafkaNull list elements with null.
*/
if (resolved instanceof List) {
List<?> list = ((List<?>) resolved);
for (int i = 0; i < list.size(); i++) {
if (list.get(i) instanceof KafkaNull) {
list.set(i, null);
}
}
}
return resolved;
}
@Override
protected boolean isEmptyPayload(Object payload) {
return payload == null || payload instanceof KafkaNull;
}
});
defaultFactory.setArgumentResolvers(argumentResolvers);
defaultFactory.afterPropertiesSet();
return defaultFactory;
}
}
private static class BytesToStringConverter implements Converter<byte[], String> {
private final Charset charset;
BytesToStringConverter(Charset charset) {
this.charset = charset;
}
@Override
public String convert(byte[] source) {
return new String(source, this.charset);
}
}
private static class ListenerScope implements Scope {
private final Map<String, Object> listeners = new HashMap<>();
ListenerScope() {
super();
}
public void addListener(String key, Object bean) {
this.listeners.put(key, bean);
}
public void removeListener(String key) {
this.listeners.remove(key);
}
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
return this.listeners.get(name);
}
@Override
public Object remove(String name) {
return null;
}
@Override
public void registerDestructionCallback(String name, Runnable callback) {
}
@Override
public Object resolveContextualObject(String key) {
return this.listeners.get(key);
}
@Override
public String getConversationId() {
return null;
}
}
}
業務側如何使用
示例
@LybGeekKafkaListener(id = "createUser",topics = Constant.USER_TOPIC)
public class UserComsumer extends BaseComusmeListener {
@Autowired
private UserService userService;
@Override
public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {
User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);
return userService.isExistUserByUsername(user.getUsername());
}
@Override
public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {
User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);
return userService.save(user);
}
}
總結
有時候我們在宣導一些事情時,往往會發現即使我們已經說了N遍了,事情仍然會出現紕漏。這時候我們可以考慮把我們想宣導的東西工具化,通過工具來規範。比如有些業務,可能一些開發沒考慮全面,我們就可以基於業務,把一些核心的場景抽象成方法,然後開發人員基於這些抽象方法,做具體實現。
demo連結
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template