@Async
在開發過程中,我們有很多的業務場景都會遇到使用執行緒池的情況,例如定時任務使用的就是ScheduledThreadPoolExecutor
。而有些時候使用執行緒池的場景就是會將一些可以進行非同步操作的業務放線上程池中去完成,例如在生成訂單的時候給使用者傳送簡訊,生成訂單的結果不應該被髮送簡訊的成功與否所左右,也就是說生成訂單這個主操作是不依賴於傳送簡訊這個操作,所以我們就可以把傳送簡訊這個操作置為非同步操作。而要想完成非同步操作,一般使用的一個是訊息伺服器MQ,一個就是執行緒池。今天我們就來看看在Java中常用的Spring框架中如何去使用執行緒池來完成非同步操作,以及分析背後的原理。
在Spring4中,Spring中引入了一個新的註解@Async
,這個註解讓我們在使用Spring完成非同步操作變得非常方便。
在SpringBoot環境中,要使用@Async
註解,我們需要先在啟動類上加上@EnableAsync
註解。這個與在SpringBoot中使用@Scheduled
註解需要在啟動類中加上@EnableScheduling
是一樣的道理(當然你使用古老的XML配置也是可以的,但是在SpringBoot環境中,建議的是全註解開發),具體原理下面會分析。加上@EnableAsync
註解後,如果我們想在呼叫一個方法的時候開啟一個新的執行緒開始非同步操作,我們只需要在這個方法上加上@Async
@EnableAsync
我們先來分析@EnableAsync
註解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
/**
* Indicate the 'async' annotation type to be detected at either class
* or method level.
* <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
* {@code @javax.ejb.Asynchronous} annotation will be detected.
* <p>This attribute exists so that developers can provide their own
* custom annotation type to indicate that a method (or all methods of
* a given class) should be invoked asynchronously.
*/
//預設情況下,要開啟非同步操作,要在相應的方法或者類上加上@Async註解或者EJB3.1規範下的@Asynchronous註解。
//這個屬性使得開發人員可以自己設定開啟非同步操作的註解
Class<? extends Annotation> annotation() default Annotation.class;
/**
* Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
* to standard Java interface-based proxies.
* <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
* <p>The default is {@code false}.
* <p>Note that setting this attribute to {@code true} will affect <em>all</em>
* Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
* For example, other beans marked with Spring's {@code @Transactional} annotation
* will be upgraded to subclass proxying at the same time. This approach has no
* negative impact in practice unless one is explicitly expecting one type of proxy
* vs. another — for example, in tests.
*/
//預設false,不建議開啟,因為會把所有Spring管理的Bean都設為cglib代理
boolean proxyTargetClass() default false;
/**
* Indicate how async advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Async} annotation on such a method within a local call will be ignored
* since Spring's interceptor does not even kick in for such a runtime scenario.
* For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
*/
//預設的mode是AdviceMode.PROXY
/**
@Aspect一開始是AspectJ推出的Java註解形式,後來Spring AOP也支援使用這種形式表示切面,但實際上底層實現和AspectJ毫無關係,畢竟Spring AOP是動態代理,和靜態代理是不相容的。
AspectJ 在執行前直接將橫切關注點編織到實際程式碼中,是靜態代理。
而在Spring AOP中實現的是動態代理,預設使用JDK動態代理,由於JDK動態代理的天然限制,只能對介面做代理,所以當直接對類代理的時候使用的CGLIB代理,無論JDK動態代理還是CGLIB代理都是動態代理,而AspectJ是靜態代理。
但是Spring AOP相容了AspectJ的註解顯式,但是我們要明確,我們在Spring AOP中使用的@Aspect註解與AspectJ是沒有太大關係的。
由於Spring AOP是動態代理的,他存在一些缺陷,當同一類中的方法相互呼叫時,內部的方法如果有代理方法的話,也是不會呼叫代理方法的,這個時候代理方法是會失效的,原因就在於當在同一個類中進行方法調間用時,預設是```this.insideMethod()```,在Spring AOP動態代理中,依舊會呼叫原內部方法,而不是代理後的內部方法。AspectJ這種靜態代理的方式會解決上述問題,所以當你想要使用這種特性的話,可以使用ASPECTJ方式。
但是個人還是不建議使用這種方式,想要解決同一個類中方法間呼叫,內部方法不走代理方法的問題,其實還是有其他的解決方法的,沒有必要這個問題拋棄Spring AOP而使用靜態代理。
Spring AOP和AspectJ的比較 https://www.jianshu.com/p/872d3dbdc2ca
*/
AdviceMode mode() default AdviceMode.PROXY;
/**
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
* after all other post-processors, so that it can add an advisor to
* existing proxies rather than double-proxy.
*/
int order() default Ordered.LOWEST_PRECEDENCE;
}
我們又看到了@Import
註解,還是一樣的套路。
AsyncConfigurationSelector
@Override
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
//EnableAsync的預設mode是Proxy,會注入ProxyAsyncConfiguration這個配置類
case PROXY:
return new String[] { ProxyAsyncConfiguration.class.getName() };
case ASPECTJ:
return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
default:
return null;
}
}
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
//在這個類中注入了AsyncAnnotationBeanPostProcessor,很顯然這是個BeanPostProcessor
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
//必須要有@EnableAsync
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//看是否有自定義的開啟非同步操作註解
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//設定執行緒池配置,屬性值在父類中獲得
if (this.executor != null) {
bpp.setExecutor(this.executor);
}
//設定異常處理器配置,屬性值在父類中獲得
if (this.exceptionHandler != null) {
bpp.setExceptionHandler(this.exceptionHandler);
}
//預設不使用CGLIB代理
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
//預設的優先順序是最低
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
protected AnnotationAttributes enableAsync;
protected Executor executor;
protected AsyncUncaughtExceptionHandler exceptionHandler;
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
//AsyncConfigurer用來配置執行緒池配置以及異常處理器,而且在Spring環境中最多隻能有一個,在這裡我們知道了,如果想要自己去配置執行緒池,只需要實現AsyncConfigurer介面,並且不可以在Spring環境中有多個實現AsyncConfigurer的類。
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer.getAsyncExecutor();
this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
}
}
AsyncAnnotationBeanPostProcessor
這個BeanPostBeanPostProcessor
很顯然會對帶有能夠引發非同步操作的註解(比如@Async
)的Bean進行處理。我們來具體看一下。
@SuppressWarnings("serial")
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
/**
* The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".
* <p>Note that the initial lookup happens by type; this is just the fallback
* in case of multiple executor beans found in the context.
* @since 4.2
* @see AnnotationAsyncExecutionInterceptor#DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
protected final Log logger = LogFactory.getLog(getClass());
private Class<? extends Annotation> asyncAnnotationType;
private Executor executor;
private AsyncUncaughtExceptionHandler exceptionHandler;
public AsyncAnnotationBeanPostProcessor() {
setBeforeExistingAdvisors(true);
}
/**
* Set the 'async' annotation type to be detected at either class or method
* level. By default, both the {@link Async} annotation and the EJB 3.1
* {@code javax.ejb.Asynchronous} annotation will be detected.
* <p>This setter property exists so that developers can provide their own
* (non-Spring-specific) annotation type to indicate that a method (or all
* methods of a given class) should be invoked asynchronously.
* @param asyncAnnotationType the desired annotation type
*/
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
this.asyncAnnotationType = asyncAnnotationType;
}
/**
* Set the {@link Executor} to use when invoking methods asynchronously.
* <p>If not specified, default executor resolution will apply: searching for a
* unique {@link TaskExecutor} bean in the context, or for an {@link Executor}
* bean named "taskExecutor" otherwise. If neither of the two is resolvable,
* a local default executor will be created within the interceptor.
* @see AsyncAnnotationAdvisor#AsyncAnnotationAdvisor(Executor, AsyncUncaughtExceptionHandler)
* @see AnnotationAsyncExecutionInterceptor#getDefaultExecutor(BeanFactory)
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
public void setExecutor(Executor executor) {
this.executor = executor;
}
/**
* Set the {@link AsyncUncaughtExceptionHandler} to use to handle uncaught
* exceptions thrown by asynchronous method executions.
* @since 4.1
*/
public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
我們注意到他有重寫父類的setBeanFactory
,這個方法是不是有點熟悉呢,它是BeanFactoryAware
介面中的方法,AsyncAnnotationBeanPostProcessor
的父類實現了這個介面,在我們很久之前分析過的Bean的初始化中,是有提到過這個介面的,實現了Aware
型別介面的Bean,會在初始化Bean的時候呼叫相應的初始化方法,具體可以檢視AbstractAutowireCapableBeanFactory
#initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd)
方法。
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//建立一個Advisor
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
//如果有指定的註解,加入指定的註解
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
//將當前的BeanFacotry注入到Advisor中
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(2);
//asyncAnnotationTypes新增預設的註解
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
if (exceptionHandler != null) {
this.exceptionHandler = exceptionHandler;
}
//當沒有配置exceptionHandler時,使用預設的異常處理器
else {
this.exceptionHandler = new SimpleAsyncUncaughtExceptionHandler();
}
//建立一個advice,術語叫做通知,增強處理,實際上是一個MethodInterceptor,注入執行緒池和異常處理器屬性 AnnotationAsyncExecutionInterceptor
this.advice = buildAdvice(executor, this.exceptionHandler);
//建立一個切點
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
處理Bean的postProcessAfterInitialization
方法在祖先類AbstractAdvisingBeanPostProcessor
中。
從原始碼中可以看到。AsyncAnnotationBeanPostProcessor
是對Bean進行後置處理的BeanPostProcessor
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
//是否符合處理條件
if (isEligible(bean, beanName)) {
//如果符合條件
//建立ProxyFactory
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
//建立代理類,預設使用JDK代理,建立代理類的過程之前在講SPRING AOP中已經詳細介紹過了
return proxyFactory.getProxy(getProxyClassLoader());
}
// No async proxy needed.
return bean;
}
JdkDynamicAopProxy
的invoke方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MethodInvocation invocation;
Object oldProxy = null;
boolean setProxyContext = false;
TargetSource targetSource = this.advised.targetSource;
Class<?> targetClass = null;
Object target = null;
try {
//省略程式碼
Object retVal;
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
// May be null. Get as late as possible to minimize the time we "own" the target,
// in case it comes from a pool.
target = targetSource.getTarget();
if (target != null) {
targetClass = target.getClass();
}
// Get the interception chain for this method.
//獲取當前的Advicechain 通知鏈
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
// Check whether we have any advice. If we don't, we can fallback on direct
// reflective invocation of the target, and avoid creating a MethodInvocation.
if (chain.isEmpty()) {
// We can skip creating a MethodInvocation: just invoke the target directly
// Note that the final invoker must be an InvokerInterceptor so we know it does
// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
// We need to create a method invocation...
//當前的AdviceChain就包含了當時setBeanFactory方法生成的通知
invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
// Proceed to the joinpoint through the interceptor chain.
retVal = invocation.proceed();
}
// Massage return value if necessary.
//省略程式碼
}
return retVal;
}
finally {
if (target != null && !targetSource.isStatic()) {
// Must have come from TargetSource.
targetSource.releaseTarget(target);
}
if (setProxyContext) {
// Restore old proxy.
AopContext.setCurrentProxy(oldProxy);
}
}
}
ReflectiveMethodInvocation
#proceed()
@Override
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
//執行通知的invoke方法。
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
AsyncExecutionInterceptor
#invoke()
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//決定到底使用哪個執行緒池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//構造任務
Callable<Object> task = new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
}
};
//將任務放到執行緒池中
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
雖然上文中說了,Spring環境中只能有一個AsyncConfigurer
,但是不意味著,在Spring環境中只能配置一個執行緒池,在Spring環境中是可以配置多個執行緒池,而且我們可以在使用@Async
註解進行非同步操作的時候,通過在value屬性上指定執行緒池BeanName,這樣就可以指定相應的執行緒池來作為任務的載體。
determineAsyncExecutor
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
//獲取@Async註解上的value值
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
//如果@Async有指定執行緒池。則通過BeanName找到相應的執行緒池
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
//否則使用預設的執行緒池
else {
targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {
synchronized (this.executors) {
if (this.defaultExecutor == null) {
this.defaultExecutor = getDefaultExecutor(this.beanFactory);
}
targetExecutor = this.defaultExecutor;
}
}
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
總結
當我們想要在SpringBoot中方便的使用@Async
註解開啟非同步操作的時候,只需要實現AsyncConfigurer
介面(這樣就配置了預設執行緒池配置當然該類需要在Spring環境中),實現對執行緒池的配置,並在啟動類上加上@EnableAsync
註解,即可使得@Async
註解生效。更簡單的,我們甚至可以不顯式的實現AsyncConfigurer
,我們可以在Spring環境中配置多個Executor
型別的Bean,在使用@Async
註解時,可以指定執行緒池來執行任務。