Spring boot非同步註解原始碼解析
一、例子
我們先來看下面這個Demo。
pom.xml中maven依賴:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.14.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
啟動類SpringBootAsyncApplication.java
@SpringBootApplication
@EnableAsync
public class SpringBootAsyncApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootAsyncApplication.class, args);
}
}
DemoController.java
@RestController @RequestMapping(value = "/demos") @Slf4j public class DemoController { @Autowired private IDemoService demoService; @GetMapping("") public String test(@RequestParam String name) { long start = System.currentTimeMillis(); log.info("start send. ThreadName: {}", Thread.currentThread().getName()); demoService.send(name); long end = System.currentTimeMillis(); log.info("send end, time: {}", (end - start)); return "success"; } }
IDemoService介面實現類DemoServiceImpl.java
@Service @Slf4j public class DemoServiceImpl implements IDemoService { @Async @Override public void send(String name) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("async name={}, ThreadName: {}", name, Thread.currentThread().getName()); } }
啟動專案後,訪問GET http://127.0.0.1:8002/demos?name=test,得到如下結果:
2019-04-12 17:17:32.462 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : start send. ThreadName: http-nio-8002-exec-1
2019-04-12 17:17:32.466 INFO 12 --- [nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2019-04-12 17:17:32.467 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : send end, time: 5
2019-04-12 17:17:37.468 INFO 12 --- [cTaskExecutor-1] c.l.d.s.service.impl.DemoServiceImpl : async name=test, ThreadName: SimpleAsyncTaskExecutor-1
通過控制檯日誌列印,我們可以看到有兩個執行緒,一個是主執行緒,一個是非同步的執行緒。沒等非同步的執行緒執行完,主執行緒就直接執行完畢,返回響應結果了,這就是非同步的效果。
二、結論
2.1 實現非同步方式
- 開啟非同步配置,即在啟動類或者配置類上加@EnableAsync註解;
- 在方法或類上加@Async註解。
2.2 @Async註解
- 用@Async註解的方法,將使它在一個單獨的執行緒(例子中SimpleAsyncTaskExecutor-1執行緒)中執行,呼叫者不用等待被呼叫方法完成。
- 用@Async註解的方法,必須只應用於public方法上(只有public修飾的方法才能被進行代理)。
- @Async註解不能自呼叫,即在同一個類中呼叫非同步方法,否則不起作用(同一個類中呼叫方法的話會略過代理進行直接呼叫)。
三、疑問
為什麼會丟擲下面這段日誌?
[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
沒有發現用於非同步處理的任務執行器,既沒有TaskExecutor型別的bean,也沒有名為“taskExecutor”的bean。什麼意思?而且非同步開啟的執行緒為啥字首是SimpleAsyncTaskExecutor?帶著這些疑問,我們深入到原始碼中,看看到底發生了什麼。
四、原始碼解析
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.Async.java
/**
* Annotation that marks a method as a candidate for <i>asynchronous</i> execution.
* Can also be used at the type level, in which case all of the type's methods are
* considered as asynchronous.
*
* <p>In terms of target method signatures, any parameter types are supported.
* However, the return type is constrained to either {@code void} or
* {@link java.util.concurrent.Future}. In the latter case, you may declare the
* more specific {@link org.springframework.util.concurrent.ListenableFuture} or
* {@link java.util.concurrent.CompletableFuture} types which allow for richer
* interaction with the asynchronous task and for immediate composition with
* further processing steps.
*
* <p>A {@code Future} handle returned from the proxy will be an actual asynchronous
* {@code Future} that can be used to track the result of the asynchronous method
* execution. However, since the target method needs to implement the same signature,
* it will have to return a temporary {@code Future} handle that just passes a value
* through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult},
* or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.
*
* @author Juergen Hoeller
* @author Chris Beams
* @since 3.0
* @see AnnotationAsyncExecutionInterceptor
* @see AsyncAnnotationAdvisor
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
/**
* 指定非同步操作的限定符值
* A qualifier value for the specified asynchronous operation(s).
* <p>May be used to determine the target executor to be used when executing this
* method, matching the qualifier value (or the bean name) of a specific
* {@link java.util.concurrent.Executor Executor} or
* {@link org.springframework.core.task.TaskExecutor TaskExecutor}
* bean definition.
* <p>When specified on a class level {@code @Async} annotation, indicates that the
* given executor should be used for all methods within the class. Method level use
* of {@code Async#value} always overrides any value set at the class level.
* @since 3.1.2
*/
String value() default "";
}
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.EnableAsync.java
package org.springframework.scheduling.annotation;
import java.lang.annotation.Annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.AdviceMode;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.Ordered;
/**
* Enables Spring's asynchronous method execution capability, similar to functionality
* found in Spring's {@code <task:*>} XML namespace.
*
* <p>To be used together with @{@link Configuration Configuration} classes as follows,
* enabling annotation-driven async processing for an entire Spring application context:
*
* <pre class="code">
* @Configuration
* @EnableAsync
* public class AppConfig {
*
* }</pre>
*
* {@code MyAsyncBean} is a user-defined type with one or more methods annotated with
* either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
* annotation, or any custom annotation specified via the {@link #annotation} attribute.
* The aspect is added transparently for any registered bean, for instance via this
* configuration:
*
* <pre class="code">
* @Configuration
* public class AnotherAppConfig {
*
* @Bean
* public MyAsyncBean asyncBean() {
* return new MyAsyncBean();
* }
* }</pre>
*
* <p>By default, Spring will be searching for an associated thread pool definition:
* either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
* or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
* neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
* will be used to process async method invocations. Besides, annotated methods having a
* {@code void} return type cannot transmit any exception back to the caller. By default,
* such uncaught exceptions are only logged.
*
* <p>To customize all this, implement {@link AsyncConfigurer} and provide:
* <ul>
* <li>your own {@link java.util.concurrent.Executor Executor} through the
* {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li>
* <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
* AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler
* getAsyncUncaughtExceptionHandler()}
* method.</li>
* </ul>
*
* <pre class="code">
* @Configuration
* @EnableAsync
* public class AppConfig implements AsyncConfigurer {
*
* @Override
* public Executor getAsyncExecutor() {
* ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
* executor.setCorePoolSize(7);
* executor.setMaxPoolSize(42);
* executor.setQueueCapacity(11);
* executor.setThreadNamePrefix("MyExecutor-");
* executor.initialize();
* return executor;
* }
*
* @Override
* public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
* return MyAsyncUncaughtExceptionHandler();
* }
* }</pre>
*
* <p>If only one item needs to be customized, {@code null} can be returned to
* keep the default settings. Consider also extending from {@link AsyncConfigurerSupport}
* when possible.
*
* <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed
* Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method
* if you want a fully managed bean. In such circumstances it is no longer necessary to
* manually call the {@code executor.initialize()} method as this will be invoked
* automatically when the bean is initialized.
*
* <p>For reference, the example above can be compared to the following Spring XML
* configuration:
*
* <pre class="code">
* {@code
* <beans>
*
* <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>
*
* <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
*
* <bean id="asyncBean" class="com.foo.MyAsyncBean"/>
*
* <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
*
* </beans>
* }</pre>
*
* The above XML-based and JavaConfig-based examples are equivalent except for the
* setting of the <em>thread name prefix</em> of the {@code Executor}; this is because
* the {@code <task:executor>} element does not expose such an attribute. This
* demonstrates how the JavaConfig-based approach allows for maximum configurability
* through direct access to actual componentry.
*
* <p>The {@link #mode} attribute controls how advice is applied: If the mode is
* {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior
* of the proxying. Please note that proxy mode allows for interception of calls through
* the proxy only; local calls within the same class cannot get intercepted that way.
*
* <p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the
* value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in
* this case the {@code spring-aspects} module JAR must be present on the classpath, with
* compile-time weaving or load-time weaving applying the aspect to the affected classes.
* There is no proxy involved in such a scenario; local calls will be intercepted as well.
*
* @author Chris Beams
* @author Juergen Hoeller
* @author Stephane Nicoll
* @author Sam Brannen
* @since 3.1
* @see Async
* @see AsyncConfigurer
* @see AsyncConfigurationSelector
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
/**
* Indicate the 'async' annotation type to be detected at either class
* or method level.
* <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
* {@code @javax.ejb.Asynchronous} annotation will be detected.
* <p>This attribute exists so that developers can provide their own
* custom annotation type to indicate that a method (or all methods of
* a given class) should be invoked asynchronously.
*/
Class<? extends Annotation> annotation() default Annotation.class;
/**
* Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
* to standard Java interface-based proxies.
* <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
* <p>The default is {@code false}.
* <p>Note that setting this attribute to {@code true} will affect <em>all</em>
* Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
* For example, other beans marked with Spring's {@code @Transactional} annotation
* will be upgraded to subclass proxying at the same time. This approach has no
* negative impact in practice unless one is explicitly expecting one type of proxy
* vs. another — for example, in tests.
*/
boolean proxyTargetClass() default false;
/**
* Indicate how async advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Async} annotation on such a method within a local call will be ignored
* since Spring's interceptor does not even kick in for such a runtime scenario.
* For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
*/
AdviceMode mode() default AdviceMode.PROXY;
/**
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
* after all other post-processors, so that it can add an advisor to
* existing proxies rather than double-proxy.
*/
int order() default Ordered.LOWEST_PRECEDENCE;
}
@EnableAsync啟用Spring非同步方法執行功能,類似於Spring的<task:*>XML名稱空間,和@Configuration註解類一起使用,為整個Spring應用程式上下文啟用註釋驅動的非同步處理。
預設情況下,Spring將會搜尋一個關聯的執行緒池定義:要麼是一個在上下文中唯一的TaskExecutor型別的bean,要麼是一個名叫"taskExecutor"的Executor型別的bean;如果兩者都無法解決,SimpleAsyncTaskExecutor將用於處理非同步方法呼叫。此外,具有void返回型別的帶註釋的方法不能將任何異常傳回呼叫者。預設情況下,此類未捕獲異常只會被記錄下來。
為了定製所有這些,需要實現AsyncConfigurer類,並重寫AsyncConfigurer類中getAsyncExecutor()方法和getAsyncUncaughtExceptionHandler()方法。
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return MyAsyncUncaughtExceptionHandler();
}
}
如果只需要定製一個專案,則可以返回null以保持預設設定,如果可能的話還可以考慮從AsyncConfigurerSupport擴充套件。
注意:在上面的示例中,ThreadPoolTaskExecutor不是一個完全受管理Spring bean。如果你想要一個完全受管理的bean,可以將@Bean註解新增到getAsyncExecutor()方法上;在這種情況下,就沒有必要再手動呼叫executor.initialize()方法進行bean初始化了。
@Bean
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
return executor;
}
為了便於參考,可以將上面的示例與下面的Spring XML配置進行比較:
<beans>
<task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>
<task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
<bean id="asyncBean" class="com.foo.MyAsyncBean"/>
<bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
</beans>
上述基於xml和基於Java配置的示例除了設定執行器的執行緒名字首外,其餘都是等價的,這是因為<task:executor>元素沒有暴露這樣的屬性。這演示了基於Java配置的方法如何通過直接訪問實際元件來實現最大的可配置性。
#mode屬性控制應用如何被通知:如果mode是AdviceMode#PROXY(預設值),那麼其他屬性控制代理的行為。請注意,代理模式只允許攔截通過代理進行的呼叫,同一類內的本地呼叫不能以這種方式被攔截。
注意,如果#mode被設定為AdviceMode#ASPECTJ,那麼#proxyTargetClass屬性值將被忽略。還要注意,在這種情況下,spring-aspects模組JAR必須出現在類路徑上,編譯時或載入時應用切面到受影響的類上。在這種情況下不涉及代理,本地呼叫也會被攔截。
上面這些都是EnableAsync類註釋,具體是如何實現的?且看下面。
有沒有發現,EnableAsync類唯一的核心註解就是@Import(AsyncConfigurationSelector.class),我們來看下它的原始碼:
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AsyncConfigurationSelector.java
/**
* Selects which implementation of {@link AbstractAsyncConfiguration} should be used based
* on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class.
*
* @author Chris Beams
* @since 3.1
* @see EnableAsync
* @see ProxyAsyncConfiguration
*/
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
/**
* {@inheritDoc}
* @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for
* {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively
*/
@Override
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] { ProxyAsyncConfiguration.class.getName() };
case ASPECTJ:
return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
default:
return null;
}
}
}
根據匯入@Configuration類上EnableAsync#mode的值選擇AbstractAsyncConfiguration的哪個實現類應該被使用。
- 預設配置PROXY,使用ProxyAsyncConfiguration。
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.ProxyAsyncConfiguration.java
/**
* {@code @Configuration} class that registers the Spring infrastructure beans necessary
* to enable proxy-based asynchronous method execution.
*
* @author Chris Beams
* @author Stephane Nicoll
* @since 3.1
* @see EnableAsync
* @see AsyncConfigurationSelector
*/
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
/**
* 定義了一個AsyncAnnotationBeanPostProcessor類bean
*/
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
// 新建一個非同步註解bean後處理器
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
// 獲取@EnableAsync中使用者自定義annotation
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
// 如果不是預設註解,則設定非同步註解配置
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
// 設定執行緒任務執行器
if (this.executor != null) {
bpp.setExecutor(this.executor);
}
// 設定異常處理器
if (this.exceptionHandler != null) {
bpp.setExceptionHandler(this.exceptionHandler);
}
// 設定是否升級到CGLIB子類代理,預設不開啟
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
// 設定執行優先順序,預設最後執行
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AbstractAsyncConfiguration.java
/**
* Abstract base {@code Configuration} class providing common structure for enabling
* Spring's asynchronous method execution capability.
*
* @author Chris Beams
* @author Stephane Nicoll
* @since 3.1
* @see EnableAsync
*/
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
// 註解屬性
protected AnnotationAttributes enableAsync;
// 執行緒任務執行器
protected Executor executor;
// 異常處理器
protected AsyncUncaughtExceptionHandler exceptionHandler;
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer.getAsyncExecutor();
this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
}
}
可以看到介面org.springframework.scheduling.annotation.AsyncConfigurer的唯一實現類org.springframework.scheduling.annotation.AsyncConfigurerSupport:
/**
* A convenience {@link AsyncConfigurer} that implements all methods
* so that the defaults are used. Provides a backward compatible alternative
* of implementing {@link AsyncConfigurer} directly.
*
* @author Stephane Nicoll
* @since 4.1
*/
public class AsyncConfigurerSupport implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
return null;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
這就是上面註釋講的,可以通過實現AsyncConfigurer介面實現預設執行緒池和異常處理的定製化。
回到ProxyAsyncConfiguration類,這是一個Configuration類,在其中通過@bean注入了AsyncAnnotationBeanPostProcessor 類。
AsyncAnnotationBeanPostProcessor這個類的Bean初始化時,重寫了BeanFactoryAware介面setBeanFactory方法,對AsyncAnnotationAdvisor非同步註解切面進行了構造。
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
它會在普通bean屬性之後、初始化回撥(如InitializingBean#afterPropertiesSet() 或者一個自定義初始化方法)之前被呼叫。
AsyncAnnotationBeanPostProcessor的後置bean處理是通過其父類AbstractAdvisingBeanPostProcessor來實現的,該類實現了BeanPostProcessor介面,重寫postProcessAfterInitialization方法。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
//把Advisor新增進bean ProxyFactory-》AdvisedSupport-》Advised
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
//構造ProxyFactory代理工廠,新增代理的介面,設定切面,最後返回代理類:AopProxy
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
// No async proxy needed.
return bean;
}
JDK動態代理類JdkDynamicAopProxy實現AopProxy介面,最終執行的是InvocationHandler介面的invoke方法。
/**
* Implementation of {@code InvocationHandler.invoke}.
* <p>Callers will see exactly the exception thrown by the target,
* unless a hook method throws an exception.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MethodInvocation invocation;
Object oldProxy = null;
boolean setProxyContext = false;
TargetSource targetSource = this.advised.targetSource;
Class<?> targetClass = null;
Object target = null;
try {
if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
// The target does not implement the equals(Object) method itself.
return equals(args[0]);
}
else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
// The target does not implement the hashCode() method itself.
return hashCode();
}
else if (method.getDeclaringClass() == DecoratingProxy.class) {
// There is only getDecoratedClass() declared -> dispatch to proxy config.
return AopProxyUtils.ultimateTargetClass(this.advised);
}
else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
method.getDeclaringClass().isAssignableFrom(Advised.class)) {
// Service invocations on ProxyConfig with the proxy config...
return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
}
Object retVal;
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
// May be null. Get as late as possible to minimize the time we "own" the target,
// in case it comes from a pool.
target = targetSource.getTarget();
if (target != null) {
targetClass = target.getClass();
}
// Get the interception chain for this method.
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
// Check whether we have any advice. If we don't, we can fallback on direct
// reflective invocation of the target, and avoid creating a MethodInvocation.
if (chain.isEmpty()) {
// We can skip creating a MethodInvocation: just invoke the target directly
// Note that the final invoker must be an InvokerInterceptor so we know it does
// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
// We need to create a method invocation...
invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
// Proceed to the joinpoint through the interceptor chain.
retVal = invocation.proceed();
}
// Massage return value if necessary.
Class<?> returnType = method.getReturnType();
if (retVal != null && retVal == target &&
returnType != Object.class && returnType.isInstance(proxy) &&
!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
// Special case: it returned "this" and the return type of the method
// is type-compatible. Note that we can't help if the target sets
// a reference to itself in another returned object.
retVal = proxy;
}
else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
return retVal;
}
finally {
if (target != null && !targetSource.isStatic()) {
// Must have come from TargetSource.
targetSource.releaseTarget(target);
}
if (setProxyContext) {
// Restore old proxy.
AopContext.setCurrentProxy(oldProxy);
}
}
}
@Async註解的攔截器是AsyncExecutionInterceptor,它繼承了MethodInterceptor介面。而MethodInterceptor就是AOP規範中的Advice(切點的處理器)。chain不為空,執行第二個分支,構造ReflectiveMethodInvocation,然後執行proceed方法。
@Override
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),實際就是執行了AsyncExecutionInterceptor.invoke。
/**
* Intercept the given method invocation, submit the actual calling of the method to
* the correct task executor and return immediately to the caller.
* @param invocation the method to intercept and make asynchronous
* @return {@link Future} if the original method returns {@code Future}; {@code null}
* otherwise.
*/
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
}
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {
synchronized (this.executors) {
if (this.defaultExecutor == null) {
this.defaultExecutor = getDefaultExecutor(this.beanFactory);
}
targetExecutor = this.defaultExecutor;
}
}
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
@Aync註解有個value可以標註使用哪個executor,這裡的getExecutorQualifier就是尋找這個標識。這裡如果defaultExecutor為null的話,則獲取找預設的executor。
/**
* Retrieve or build a default executor for this advice instance.
* An executor returned from here will be cached for further use.
* <p>The default implementation searches for a unique {@link TaskExecutor} bean
* in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
* If neither of the two is resolvable, this implementation will return {@code null}.
* @param beanFactory the BeanFactory to use for a default executor lookup
* @return the default executor, or {@code null} if none available
* @since 4.2.6
* @see #findQualifiedExecutor(BeanFactory, String)
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
protected Executor getDefaultExecutor(BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
如果工程裡頭沒有定義預設的task executor的話,則獲取bean的時候會丟擲NoSuchBeanDefinitionException。這就是為什麼上面例子中會丟擲如下錯誤的原因。
[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java
/**
* This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
* bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
* If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
* this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
* for local use if no default could be found.
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
@Override
protected Executor getDefaultExecutor(BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
AsyncExecutionInterceptor重寫了getDefaultExecutor方法,先呼叫AsyncExecutionAspectSupport的getDefaultExecutor,如果預設的找不到,這裡new一個SimpleAsyncTaskExecutor。這也是為什麼上面例子中出現SimpleAsyncTaskExecutor執行緒字首的原因。
五、總結
整體流程大體可梳理為兩條線:
1.從註解開始:@EnableAsync--》ProxyAsyncConfiguration類構造一個bean(型別:AsyncAnnotationBeanPostProcessor)
2.從AsyncAnnotationBeanPostProcessor這個類的bean的生命週期走:AOP-Advisor切面初始化(setBeanFactory())--》AOP-生成代理類AopProxy(postProcessAfterInitialization())--》AOP-切點執行(InvocationHandler.invoke)
參考
https://segmentfault.com/a/1190000011339882
https://blog.csdn.net/qq_39470742/article/details/83382338
https://blog.csdn.net/fenglongmiao/article/details/82429460
https://www.bael