1. 程式人生 > 實用技巧 >Spring中非同步註解@Async的使用、原理及使用時可能導致的問題

Spring中非同步註解@Async的使用、原理及使用時可能導致的問題

前言

其實最近都在研究事務相關的內容,之所以寫這麼一篇文章是因為前面寫了一篇關於迴圈依賴的文章:

面試必殺技,講一講Spring中的迴圈依賴

然後,很多同學碰到了下面這個問題,添加了Spring提供的一個非同步註解@Async迴圈依賴無法被解決了,下面是一些讀者的留言跟群裡同學碰到的問題:

本著講一個知識點就要講明白、講透徹的原則,我決定單獨寫一篇這樣的文章對@Async這個註解做一下詳細的介紹,這個註解帶來的問題遠遠不止迴圈依賴這麼簡單,如果對它不夠熟悉的話建議慎用。

文章要點

@Async的基本使用

這個註解的作用在於可以讓被標註的方法非同步執行,但是有兩個前提條件

  1. 配置類上新增@EnableAsync
    註解
  2. 需要非同步執行的方法的所在類由Spring管理
  3. 需要非同步執行的方法上添加了@Async註解

我們通過一個Demo體會下這個註解的作用吧

第一步,配置類上開啟非同步:

@EnableAsync
@Configuration
@ComponentScan("com.dmz.spring.async")
public class Config { }

第二步,

@Component  // 這個類本身要被Spring管理
public class DmzAsyncService { @Async // 新增註解表示這個方法要非同步執行
public void testAsync(){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("testAsync invoked");
}
}

第三步,測試非同步執行

public class Main {
public static void main(String[] args) {
AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(Config.class);
DmzAsyncService bean = ac.getBean(DmzAsyncService.class);
bean.testAsync();
System.out.println("main函式執行完成");
}
}
// 程式執行結果如下:
// main函式執行完成
// testAsync invoked

通過上面的例子我們可以發現,DmzAsyncService中的testAsync方法是非同步執行的,那麼這背後的原理是什麼呢?我們接著分析

原理分析

我們在分析某一個技術的時候,最重要的事情是,一定一定要找到程式碼的入口,像Spring這種都很明顯,入口必定是在@EnableAsync這個註解上面,我們來看看這個註解幹了啥事(本文基於5.2.x版本)

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 這裡是重點,匯入了一個ImportSelector
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync { // 這個配置可以讓程式設計師配置需要被檢查的註解,預設情況下檢查的就是@Async註解
Class<? extends Annotation> annotation() default Annotation.class; // 預設使用jdk代理
boolean proxyTargetClass() default false; // 預設使用Spring AOP
AdviceMode mode() default AdviceMode.PROXY; // 在後續分析我們會發現,這個註解實際往容器中添加了一個
// AsyncAnnotationBeanPostProcessor,這個後置處理器實現了Ordered介面
// 這個配置主要代表了AsyncAnnotationBeanPostProcessor執行的順序
int order() default Ordered.LOWEST_PRECEDENCE;
}

上面這個註解做的最重要的事情就是匯入了一個AsyncConfigurationSelector,這個類的原始碼如下:

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; @Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
// 預設會使用SpringAOP進行代理
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
} }

這個類的作用是像容器中註冊了一個ProxyAsyncConfiguration,這個類的繼承關係如下:

我們先看下它的父類AbstractAsyncConfiguration,其原始碼如下:

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware { @Nullable
protected AnnotationAttributes enableAsync; @Nullable
protected Supplier<Executor> executor; @Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; // 這裡主要就是檢查將其匯入的類上是否有EnableAsync註解
// 如果沒有的話就報錯
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
} // 將容器中配置的AsyncConfigurer注入
// 非同步執行嘛,所以我們可以配置使用的執行緒池
// 另外也可以配置異常處理器
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
} }

再來看看ProxyAsyncConfiguration這個類的原始碼

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
// 將通過AsyncConfigurer配置好的執行緒池跟異常處理器設定到這個後置處理器中
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
} }

這個類本身是一個配置類,它的作用是向容器中新增一個AsyncAnnotationBeanPostProcessor。到這一步我們基本上就可以明白了,@Async註解的就是通過AsyncAnnotationBeanPostProcessor這個後置處理器生成一個代理物件來實現非同步的,接下來我們就具體看看AsyncAnnotationBeanPostProcessor是如何生成代理物件的,我們主要關注一下幾點即可:

  1. 是在生命週期的哪一步完成的代理?
  2. 切點的邏輯是怎麼樣的?它會對什麼樣的類進行攔截?
  3. 通知的邏輯是怎麼樣的?是如何實現非同步的?

基於上面幾個問題,我們進行逐一分析

是在生命週期的哪一步完成的代理?

我們抓住重點,AsyncAnnotationBeanPostProcessor是一個後置處理器器,按照我們對Spring的瞭解,大概率是在這個後置處理器的postProcessAfterInitialization方法中完成了代理,直接定位到這個方法,這個方法位於父類AbstractAdvisingBeanPostProcessor中,具體程式碼如下:

public Object postProcessAfterInitialization(Object bean, String beanName) {
// 沒有通知,或者是AOP的基礎設施類,那麼不進行代理
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
return bean;
} // 對已經被代理的類,不再生成代理,只是將通知新增到代理類的邏輯中
// 這裡通過beforeExistingAdvisors決定是將通知新增到所有通知之前還是新增到所有通知之後
// 在使用@Async註解的時候,beforeExistingAdvisors被設定成了true
// 意味著整個方法及其攔截邏輯都會非同步執行
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
} // 判斷需要對哪些Bean進行來代理
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
return bean;
}

果不其然,確實是在這個方法中完成的代理。接著我們就要思考,切點的過濾規則是什麼呢?

切點的邏輯是怎麼樣的?

其實也不難猜到肯定就是類上添加了@Async註解或者類中含有被@Async註解修飾的方法。基於此,我們看看這個isEligible這個方法的實現邏輯,這個方位位於AbstractBeanFactoryAwareAdvisingPostProcessor中,也是AsyncAnnotationBeanPostProcessor的父類,對應程式碼如下:

// AbstractBeanFactoryAwareAdvisingPostProcessor的isEligible方法
// 呼叫了父類
protected boolean isEligible(Object bean, String beanName) {
return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&
super.isEligible(bean, beanName));
} protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
} protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
// 這裡完成的判斷
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}

實際上最後就是根據advisor來確定是否要進行代理,在Spring中AOP相關的API及原始碼解析,原來AOP是這樣子的這篇文章中我們提到過,advisor實際就是一個綁定了切點的通知,那麼AsyncAnnotationBeanPostProcessor這個advisor是什麼時候被初始化的呢?我們直接定位到AsyncAnnotationBeanPostProcessorsetBeanFactory方法,其原始碼如下:

public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory); // 在這裡new了一個AsyncAnnotationAdvisor
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
// 完成了初始化
this.advisor = advisor;
}

我們來看看AsyncAnnotationAdvisor中的切點匹配規程是怎麼樣的,直接定位到這個類的buildPointcut方法中,其原始碼如下:

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 就是根據這兩個匹配器進行匹配的
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}

程式碼很簡單,就是根據cpc跟mpc兩個匹配器來進行匹配的,第一個是檢查類上是否有@Async註解,第二個是檢查方法是是否有@Async註解。

那麼,到現在為止,我們已經知道了它在何時建立代理,會為什麼物件建立代理,最後我們還需要解決一個問題,代理的邏輯是怎麼樣的,非同步到底是如何實現的?

通知的邏輯是怎麼樣的?是如何實現非同步的?

前面也提到了advisor是一個綁定了切點的通知,前面分析了它的切點,那麼現在我們就來看看它的通知邏輯,直接定位到AsyncAnnotationAdvisor中的buildAdvice方法,原始碼如下:

protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}

簡單吧,加了一個攔截器而已,對於interceptor型別的物件,我們關注它的核心方法invoke就行了,程式碼如下:

public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); // 非同步執行嘛,先獲取到一個執行緒池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
} // 然後將這個方法封裝成一個 Callable物件傳入到執行緒池中執行
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 將任務提交到執行緒池
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

導致的問題及解決方案

問題1:迴圈依賴報錯

就像在這張圖裡這個讀者問的問題,

分為兩點回答:

第一:迴圈依賴為什麼不能被解決?

這個問題其實很簡單,在《面試必殺技,講一講Spring中的迴圈依賴》這篇文章中我從兩個方面分析了迴圈依賴的處理流程

  1. 簡單物件間的迴圈依賴處理
  2. AOP物件間的迴圈依賴處理

按照這種思路,@Async註解導致的迴圈依賴應該屬於AOP物件間的迴圈依賴,也應該能被處理。但是,重點來了,解決AOP物件間迴圈依賴的核心方法是三級快取,如下:

在三級快取快取了一個工廠物件,這個工廠物件會呼叫getEarlyBeanReference方法來獲取一個早期的代理物件的引用,其原始碼如下:

protected Object getEarlyBeanReference(String beanName, RootBeanDefinition mbd, Object bean) {
Object exposedObject = bean;
if (!mbd.isSynthetic() && hasInstantiationAwareBeanPostProcessors()) {
for (BeanPostProcessor bp : getBeanPostProcessors()) {
// 看到這個判斷了嗎,通過@EnableAsync匯入的後置處理器
// AsyncAnnotationBeanPostProcessor根本就不是一個SmartInstantiationAwareBeanPostProcessor
// 這就意味著即使我們通過AsyncAnnotationBeanPostProcessor建立了一個代理物件
// 但是早期暴露出去的用於給別的Bean進行注入的那個物件還是原始物件
if (bp instanceof SmartInstantiationAwareBeanPostProcessor) {
SmartInstantiationAwareBeanPostProcessor ibp = (SmartInstantiationAwareBeanPostProcessor) bp;
exposedObject = ibp.getEarlyBeanReference(exposedObject, beanName);
}
}
}
return exposedObject;
}

看完上面的程式碼迴圈依賴的問題就很明顯了,因為早期暴露的物件跟最終放入容器中的物件不是同一個,所以報錯了。報錯的具體位置我在你知道Spring是怎麼將AOP應用到Bean的生命週期中的嗎?文章末尾已經分析過了,本文不再贅述

解決方案

就以上面讀者給出的Demo為例,只需要在為B注入A時新增一個@Lazy註解即可

@Component
public class B implements BService { @Autowired
@Lazy
private A a; public void doSomething() {
}
}

這個註解的作用在於,當為B注入A時,會為A生成一個代理物件注入到B中,當真正呼叫代理物件的方法時,底層會呼叫getBean(a)去建立A物件,然後呼叫方法,這個註解的處理時機是在org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveDependency方法中,處理這個註解的程式碼位於org.springframework.context.annotation.ContextAnnotationAutowireCandidateResolver#buildLazyResolutionProxy,這些程式碼其實都在我之前的文章中分析過了

Spring雜談 | Spring中的AutowireCandidateResolver

談談Spring中的物件跟Bean,你知道Spring怎麼建立物件的嗎?

所以本文不再做詳細分析

問題2:預設執行緒池不會複用執行緒

我覺得這是這個註解最坑的地方,沒有之一!我們來看看它預設使用的執行緒池是哪個,在前文的原始碼分析中,我們可以看到決定要使用執行緒池的方法是org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor。其原始碼如下:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 可以在@Async註解中配置執行緒池的名字
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// 獲取預設的執行緒池
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}

最終會呼叫到org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor這個方法中

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

可以看到,它預設使用的執行緒池是SimpleAsyncTaskExecutor。我們不看這個類的原始碼,只看它上面的檔案註釋,如下:

主要說了三點

  1. 為每個任務新起一個執行緒
  2. 預設執行緒數不做限制
  3. 不復用執行緒

就這三點,你還敢用嗎?只要你的任務耗時長一點,說不定伺服器就給你來個OOM

解決方案

最好的辦法就是使用自定義的執行緒池,主要有這麼幾種配置方法

  1. 在之前的原始碼分析中,我們可以知道,可以通過AsyncConfigurer來配置使用的執行緒池

如下:

public class DmzAsyncConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
// 建立自定義的執行緒池
}
}
  1. 直接在@Async註解中配置要使用的執行緒池的名稱

如下:

public class A implements AService {

	private B b;

	@Autowired
public void setB(B b) {
System.out.println(b);
this.b = b;
} @Async("dmzExecutor")
public void doSomething() {
}
}
@EnableAsync
@Configuration
@ComponentScan("com.dmz.spring.async")
@Aspect
public class Config {
@Bean("dmzExecutor")
public Executor executor(){
// 建立自定義的執行緒池
return executor;
}
}

總結

本文主要介紹了Spring中非同步註解的使用、原理及可能碰到的問題,針對每個問題文中也給出了方案。希望通過這篇文章能幫助你徹底掌握@Async註解的使用,知其然並知其所以然!

如果本文對你由幫助的話,記得點個贊吧!也歡迎關注我的公眾號,微信搜尋:程式設計師DMZ,或者掃描下方二維碼,跟著我一起認認真真學Java,踏踏實實做一個coder。

我叫DMZ,一個在學習路上匍匐前行的小菜鳥!