springboot執行緒池超級詳解
- springboot執行緒池超級詳解
springboot執行緒池超級詳解
#本文先否定網上部分人的一個觀點:不能使用spring boot預設執行緒池 其實他們之所以有這樣的結論是把spring和springboot混淆了;不推薦使用spring預設執行緒池,是因為spring預設是使用SimpleAsyncTaskExecutor這個執行緒池,但此執行緒不是真正意義上的執行緒池,因為執行緒不重用,每次呼叫都會建立一個新的執行緒。預設執行緒數最大值為Integer.MAX_VALUE,因此萬一執行緒啟動過多會引發OOM,所以不推薦使用 但是spring boot已經對spring預設執行緒池進行了自動裝配,設定了預設值8,所以不會因為建立執行緒數太多引發OOM ## 本文主要包含以下內容 1、執行緒和執行緒池的基本概念 2、spring預設執行緒池基礎及原理 3、springboot預設執行緒池 4、如何調整springboot預設執行緒池 5、如果使用springboot執行緒池 6、自定義執行緒池 # 非同步方法建議儘量處理耗時的任務,或者是處理不希望阻斷主流程的任務(比如非同步記錄操作日誌) ## 感謝:本文是整理了網際網路上數十篇相關文章整理所得,已經記不清涉及哪些連結了,感謝之前整理的IT從業者,本人只是一個搬運工
程式、執行緒和執行緒池
程序是資源分配最小單位,執行緒是程式執行的最小單位。 計算機在執行程式時,會為程式建立相應的程序,進行資源分配時,是以程序為單位進行相應的分配。每個程序都有相應的執行緒,在執行程式時,實際上是執行相應的一系列執行緒。
總結:程序是資源分配最小單位,執行緒是程式執行的最小單位
一、什麼是執行緒
執行緒,程式執行流的最小執行單位,是行程中的實際運作單位,經常容易和程序這個概念混淆。那麼,執行緒和程序究竟有什麼區別呢?首先,程序是一個動態的過程,是一個活動的實體。簡單來說,一個應用程式的執行就可以被看做是一個程序,而執行緒,是執行中的實際的任務執行者。可以說,程序中包含了多個可以同時執行的執行緒。
二、執行緒的生命週期
執行緒的生命週期可以利用以下的圖解來更好的理解:
第一步,是用new Thread()的方法新建一個執行緒,線上程建立完成之後,執行緒就進入了就緒(Runnable)狀態,此時創建出來的執行緒進入搶佔CPU資源的狀態,當執行緒搶到了CPU的執行權之後,執行緒就進入了執行狀態(Running),當該執行緒的任務執行完成之後或者是非常態的呼叫的stop()方法之後,執行緒就進入了死亡狀態。而我們在圖解中可以看出,執行緒還具有一個則色的過程,這是怎麼回事呢?當面對以下幾種情況的時候,容易造成執行緒阻塞,第一種,當執行緒主動呼叫了sleep()方法時,執行緒會進入則阻塞狀態,除此之外,當執行緒中主動呼叫了阻塞時的IO方法時,這個方法有一個返回引數,當引數返回之前,執行緒也會進入阻塞狀態,還有一種情況,當執行緒進入正在等待某個通知時,會進入阻塞狀態。那麼,為什麼會有阻塞狀態出現呢?我們都知道,CPU的資源是十分寶貴的,所以,當執行緒正在進行某種不確定時長的任務時,Java就會收回CPU的執行權,從而合理應用CPU的資源。我們根據圖可以看出,執行緒在阻塞過程結束之後,會重新進入就緒狀態,重新搶奪CPU資源。這時候,我們可能會產生一個疑問,如何跳出阻塞過程呢?又以上幾種可能造成執行緒阻塞的情況來看,都是存在一個時間限制的,當sleep()方法的睡眠時長過去後,執行緒就自動跳出了阻塞狀態,第二種則是在返回了一個引數之後,在獲取到了等待的通知時,就自動跳出了執行緒的阻塞過程
三、什麼是單執行緒和多執行緒?
單執行緒,顧名思義即是隻有一條執行緒在執行任務。
多執行緒,建立多條執行緒同時執行任務,這種方式在我們的日常生活中比較常見。但是,在多執行緒的使用過程中,還有許多需要我們瞭解的概念。比如,在理解上並行和併發的區別,以及在實際應用的過程中多執行緒的安全問題,對此,我們需要進行詳細的瞭解。
並行和併發:在我們看來,都是可以同時執行多種任務,那麼,到底他們二者有什麼區別呢?
併發,從巨集觀方面來說,併發就是同時進行多種時間,實際上,這幾種時間,並不是同時進行的,而是交替進行的,而由於CPU的運算速度非常的快,會造成我們的一種錯覺,就是在同一時間內進行了多種事情,而併發,則是真正意義上的同時進行多種事情。這種只可以在多核CPU的基礎下完成。
還有就是多執行緒的安全問題?為什麼會造成多執行緒的安全問題呢?我們可以想象一下,如果多個執行緒同時執行一個任務,那麼意味著他們共享同一種資源,由於執行緒CPU的資源不一定可以被誰搶佔到,這是,第一條執行緒先搶佔到CPU資源,他剛剛進行了第一次操作,而此時第二條執行緒搶佔到了CPU的資源,那麼共享資源還來不及發生變化,就同時有兩條資料使用了同一條資源,具體請參考多執行緒買票問題。這個問題我們應該如何解決那?
有造成問題的原因我們可以看出,這個問題主要的矛盾在於,CPU的使用權搶佔和資源的共享發生了衝突,解決時,我們只需要讓一條執行緒佔用了CPU的資源時,阻止第二條執行緒同時搶佔CPU的執行權,在程式碼中,我們只需要在方法中使用同步程式碼塊即可。在這裡,同步程式碼塊不多進行贅述,可以自行了解。
四、執行緒池
在一個應用程式中,我們需要多次使用執行緒,也就意味著,我們需要多次建立並銷燬執行緒。而建立並銷燬執行緒的過程勢必會消耗記憶體。而在Java中,記憶體資源是及其寶貴的,所以,我們就提出了執行緒池的概念。
執行緒池:Java中開闢出了一種管理執行緒的概念,這個概念叫做執行緒池,從概念以及應用場景中,我們可以看出,執行緒池的好處,就是可以方便的管理執行緒,也可以減少記憶體的消耗。
五、執行緒池的執行流程
有圖我們可以看出,任務進來時,首先執行判斷,判斷核心執行緒是否處於空閒狀態,如果不是,核心執行緒就先就執行任務,如果核心執行緒已滿,則判斷任務佇列是否有地方存放該任務,若果有,就將任務儲存在任務佇列中,等待執行,如果滿了,在判斷最大可容納的執行緒數,如果沒有超出這個數量,就開創非核心執行緒執行任務,如果超出了,就呼叫handler實現拒絕策略。
handler的拒絕策略:
有四種:第一種AbortPolicy:不執行新任務,直接丟擲異常,提示執行緒池已滿
第二種DisCardPolicy:不執行新任務,也不丟擲異常
第三種DisCardOldSetPolicy:將訊息佇列中的第一個任務替換為當前新進來的任務執行
第四種CallerRunsPolicy:直接呼叫execute來執行當前任務
六、四種常見的執行緒池:
CachedThreadPool:可快取的執行緒池,該執行緒池中沒有核心執行緒,非核心執行緒的數量為Integer.max_value,就是無限大,當有需要時建立執行緒來執行任務,沒有需要時回收執行緒,適用於耗時少,任務量大的情況。
SecudleThreadPool:週期性執行任務的執行緒池,按照某種特定的計劃執行執行緒中的任務,有核心執行緒,但也有非核心執行緒,非核心執行緒的大小也為無限大。適用於執行週期性的任務。
SingleThreadPool:只有一條執行緒來執行任務,適用於有順序的任務的應用場景。
FixedThreadPool:定長的執行緒池,有核心執行緒,核心執行緒的即為最大的執行緒數量,沒有非核心執行緒
Spring原理之@Async--Spring Framework提供的,而非SpringBoot
前言-Demo
在開發過程中,我們會遇到很多使用執行緒池的業務場景,例如非同步簡訊通知、非同步記錄操作日誌。大多數使用執行緒池的場景,就是會將一些可以進行非同步操作的業務放線上程池中去完成。
例如在生成訂單的時候給使用者傳送簡訊,生成訂單的結果不應該被髮送簡訊的成功與否所左右,也就是說生成訂單這個主操作是不依賴於傳送簡訊這個操作,所以我們就可以把傳送簡訊這個操作置為非同步操作。
那麼本文就是來看看Spring中提供的優雅的非同步處理方案:在Spring3中,Spring中引入了一個新的註解@Async,這個註解讓我們在使用Spring完成非同步操作變得非常方便
需要注意的是這些功能都是Spring Framework提供的,而非SpringBoot。因此下文的講解都是基於Spring Framework的工程
Spring中用@Async註解標記的方法,稱為非同步方法,它會在呼叫方的當前執行緒之外的獨立的執行緒中執行,其實就相當於我們自己new Thread(()-> System.out.println(“hello world !”))這樣在另一個執行緒中去執行相應的業務邏輯。
Demo
// @Async 若把註解放在類上或者介面上,那麼他所有的方法都會非同步執行了~~~~(包括私有方法)
public interface HelloService {
Object hello();
}
@Service
public class HelloServiceImpl implements HelloService {
@Async // 注意此處加上了此註解
@Override
public Object hello() {
System.out.println("當前執行緒:" + Thread.currentThread().getName());
return "service hello";
}
}
然後只需要在配置裡,開啟對非同步的支援即可:(也許直接加在這裡無效,因為bean沒有注入,可以直接加在@Service旁邊)
@Configuration
@EnableAsync // 開啟非同步註解的支援
public class RootConfig {
}
輸出如下(當前執行緒名):(親測Alan:for迴圈呼叫100次,建立SimpleAsyncTaskExecutor-1-100)
當前執行緒:SimpleAsyncTaskExecutor-1
可以很明顯的發現,它使用的是執行緒池SimpleAsyncTaskExecutor,這也是Spring預設給我們提供的執行緒池(其實它不是一個真正的執行緒池,後面會有講述)。下面原理部分講解後,你就能知道怎麼讓它使用我們自定義的執行緒池了~~~
@Async註解使用細節
1、@Async註解一般用在方法上,如果用在類上,那麼這個類所有的方法都是非同步執行的;
2、@Async可以放在任何方法上,哪怕你是private的(若是同類呼叫,請務必注意註解失效的情況~~~)
3、所使用的@Async註解方法的類物件應該是Spring容器管理的bean物件
4、@Async可以放在介面處(或者介面方法上)。但是隻有使用的是JDK的動態代理時才有效,CGLIB會失效。因此建議:統一寫在實現類的方法上
5、需要註解@EnableAsync來開啟非同步註解的支援
6、若你希望得到非同步呼叫的返回值,請你的返回值用Futrue變數包裝起來
需要額外匯入哪些Jar包?
它的依賴包非常簡單,只依賴一些Spring的核心包外加spring-aop,但是如果你已經匯入了spring-webmvc這個jar,那就什麼不需要額外匯入了,因為都有了:
備註:它雖然依賴於Spring AOP,但是它並不需要匯入aspectjweaver,因為它和AspectJ沒有半毛錢關係
原理、原始碼解析(有興趣的可以仔細研讀,否則瞭解結論,跳過閱讀即可)
@EnableAsync
它位於的包名為org.springframework.scheduling.annotation,jar名為:spring-context
@EnableXXX這種設計模式之前有分析過多次,這個註解就是它的入口,因此本文也一樣,從入口處一層一層的剖析:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
//預設情況下,要開啟非同步操作,要在相應的方法或者類上加上@Async註解或者EJB3.1規範下的@Asynchronous註解。
//這個屬性使得開發人員可以自己設定開啟非同步操作的註解(可謂非常的人性化了,但是大多情況下用Spring的就足夠了)
Class<? extends Annotation> annotation() default Annotation.class;
// true表示啟用CGLIB代理
boolean proxyTargetClass() default false;
// 代理方式:預設是PROXY 採用Spring的動態代理(含JDK動態代理和CGLIB)
// 若改為:AdviceMode.ASPECTJ表示使用AspectJ靜態代理方式。
// 它能夠解決同類內方法呼叫不走代理物件的問題,但是一般情況下都不建議這麼去做,不要修改這個引數值
AdviceMode mode() default AdviceMode.PROXY;
// 直接定義:它的執行順序(因為可能有多個@EnableXXX)
int order() default Ordered.LOWEST_PRECEDENCE;
}
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
// May be used to determine the target executor to be used when executing this method
// 意思是這個value值是用來指定執行器的(寫入執行器BeanName即可採用特定的執行器去執行此方法)
String value() default "";
}
最重要的,還是上面的@Import註解匯入的類:AsyncConfigurationSelector
AsyncConfigurationSelector
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
// 這類 我也不知道在哪?是用於支援AspectJ這種靜態代理Mode的,忽略吧~~~~
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
// 這裡AdviceMode 進行不同的處理,從而向Spring容器注入了不同的Bean~~~
switch (adviceMode) {
// 大多數情況下都走這裡,ProxyAsyncConfiguration會被注入到Bean容器裡面~~~
case PROXY:
return new String[] { ProxyAsyncConfiguration.class.getName() };
case ASPECTJ:
return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
default:
return null;
}
}
}
AdviceModeImportSelector父類的這個抽象更加的重要。它的實現類至少有如下兩個:
備註:TransactionManagementConfigurationSelector需要額外匯入jar包:spring-tx
這個父類抽象得非常的好,它的作用:抽象實現支援了AdviceMode,並且支援通用的@EnableXXX模式。
//@since 3.1 它是一個`ImportSelector`
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
// 預設都叫mode
public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";
// 顯然也允許子類覆蓋此方法
protected String getAdviceModeAttributeName() {
return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
}
// importingClassMetadata:註解的資訊
@Override
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
// 這裡泛型,拿到泛型型別~~~
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
// 根據型別,拿到該型別的這個註解,然後轉換為AnnotationAttributes
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format( "@%s is not present annType.getSimpleName(), importingClassMetadata.getClassName()));
}
// 拿到AdviceMode,最終交給子類,讓她自己去實現 決定匯入哪個Bean吧
AdviceMode adviceMode = attributes.getEnum(this.getAdviceModeAttributeName());
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException(String.format("Unknown AdviceMode: '%s'", adviceMode));
}
return imports;
}
// 子類去實現 具體匯入哪個Bean
@Nullable
protected abstract String[] selectImports(AdviceMode adviceMode);
}
改抽象提供了支援AdviceMode的較為通用的實現,若我們自己想自定義,可以考慮實現此類。
由此可議看出,@EnableAsync最終是向容器內注入了ProxyAsyncConfiguration這個Bean。由名字可議看出,它是一個配置類。
ProxyAsyncConfiguration
// 它是一個配置類,角色為ROLE_INFRASTRUCTURE 框架自用的Bean型別
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
// 它的作用就是諸如了一個AsyncAnnotationBeanPostProcessor,它是個BeanPostProcessor
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
// customAsyncAnnotation:自定義的註解型別
// AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation") 為拿到該註解該欄位的預設值
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
// 相當於如果你指定了AsyncAnnotationType,那就set進去吧
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
// 只有自定義了AsyncConfigurer的實現類,自定義了一個執行緒執行器,這裡才會有值
if (this.executor != null) {
bpp.setExecutor(this.executor);
}
// 同上,非同步執行緒異常的處理器~~~~~
if (this.exceptionHandler != null) {
bpp.setExceptionHandler(this.exceptionHandler);
}
// 這兩個引數,就不多說了。
// 可以看到,order屬性值,最終決定的是BeanProcessor的執行順序的
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
// 它的父類:
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
// 此註解@EnableAsync的元資訊
protected AnnotationAttributes enableAsync;
// 非同步執行緒池
protected Executor executor;
// 非同步異常的處理器
protected AsyncUncaughtExceptionHandler exceptionHandler;
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
// 拿到@EnableAsync註解的元資料資訊~~~
this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
// doc說得很明白。它會把所有的`AsyncConfigurer`的實現類都蒐集進來,然後進行類似屬性的合併
// 備註 雖然這裡用的是Collection 但是AsyncConfigurer的實現類只允許有一個
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
//AsyncConfigurer用來配置執行緒池配置以及異常處理器,而且在Spring環境中最多隻能有一個
//在這裡我們知道了,如果想要自己去配置執行緒池,只需要實現AsyncConfigurer介面,並且不可以在Spring環境中有多個實現AsyncConfigurer的類。
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
// 拿到唯一的AsyncConfigurer ,然後賦值~~~~ 預設的請參照這個類:AsyncConfigurerSupport(它並不會被加入進Spring容器裡)
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer.getAsyncExecutor();
this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
}
}
從上可知,真正做文章的最終還是AsyncAnnotationBeanPostProcessor這個後置處理器,下面我們來重點看看它
AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor這個BeanPostBeanPostProcessor很顯然會對帶有能夠引發非同步操作的註解(比如@Async)的Bean進行處理
從該類的繼承體系可以看出,大部分功能都是在抽象類裡完成的,它不關乎於@Async,而是這一類技術都是這樣子處理的。
首先,ProxyProcessorSupport這裡就不用多說了,在講解自動代理建立器的時候有說過。
AbstractAdvisingBeanPostProcessor
從這個名字也能看出來。它主要處理AdvisingBean,也就是處理Advisor和Bean的關係的
// 它繼承自,ProxyProcessorSupport,說明它也擁有AOp的通用配置
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
@Nullable
protected Advisor advisor;
protected boolean beforeExistingAdvisors = false;
// 快取合格的Bean們
private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);
// 當遇到一個pre-object的時候,是否把該processor所持有得advisor放在現有的增強器們之前執行
// 預設是false,會放在最後一個位置上的
public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
this.beforeExistingAdvisors = beforeExistingAdvisors;
}
// 不處理
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
// Bean已經例項化、初始化完成之後執行。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 忽略AopInfrastructureBean的Bean,並且如果沒有advisor也會忽略不處理~~~~~
if (bean instanceof AopInfrastructureBean || this.advisor == null) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 如果這個Bean已經被代理過了(比如已經被AOP切中了),那本處就無需再重複建立代理了嘛
// 直接向裡面新增advisor就成了
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
// 注意此advised不能是已經被凍結了的。且源物件必須是Eligible合格的
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
// 把自己持有的這個advisor放在首位(如果beforeExistingAdvisors=true)
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
// 否則就是尾部位置
else {
advised.addAdvisor(this.advisor);
}
// 最終直接返回即可,因為已經沒有必要再建立一次代理物件了
return bean;
}
}
// 如果這個Bean事合格的(此方法下面有解釋) 這個時候是沒有被代理過的
if (isEligible(bean, beanName)) {
// 以當前的配置,建立一個ProxyFactory
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
// 如果不是使用CGLIB常見代理,那就去分析出它所實現的介面們 然後放進ProxyFactory 裡去
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
// 切面就是當前持有得advisor
proxyFactory.addAdvisor(this.advisor);
// 留給子類,自己還可以對proxyFactory進行自定義~~~~~
customizeProxyFactory(proxyFactory);
// 最終返回這個代理物件~~~~~
return proxyFactory.getProxy(getProxyClassLoader());
}
// No async proxy needed.(相當於沒有做任何的代理處理,返回原物件)
return bean;
}
// 檢查這個Bean是否是合格的
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}
protected boolean isEligible(Class<?> targetClass) {
// 如果已經被快取著了,那肯定靠譜啊
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
// 如果沒有切面(就相當於沒有給配置增強器,那鐵定是不合格的)
if (this.advisor == null) {
return false;
}
// 這個重要了:看看這個advisor是否能夠切入進targetClass這個類,能夠切入進取的也是合格的
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
// 子類可以複寫。比如`AbstractBeanFactoryAwareAdvisingPostProcessor`就複寫了這個方法~~~
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.copyFrom(this);
proxyFactory.setTarget(bean);
return proxyFactory;
}
// 子類複寫~
protected void customizeProxyFactory(ProxyFactory proxyFactory) {
}
}
看看它的繼承圖:
MethodValidationPostProcessor屬於JSR-303校驗方面的範疇
AbstractBeanFactoryAwareAdvisingPostProcessor
從名字可以看出,它相較於父類,就和BeanFactory有關了,也就是和Bean容器相關了~~~
public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
implements BeanFactoryAware {
// Bean工廠
@Nullable
private ConfigurableListableBeanFactory beanFactory;
// 如果這個Bean工廠不是ConfigurableListableBeanFactory ,那就set一個null
// 我們的`DefaultListableBeanFactory`顯然就是它的子類~~~~~
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
(ConfigurableListableBeanFactory) beanFactory : null);
}
@Override
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
// 如果Bean工廠是正常的,那就把這個Bean expose一個特殊的Bean,記錄下它的型別
if (this.beanFactory != null) {
AutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
}
ProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
// 這裡建立代理也是和`AbstractAutoProxyCreator`差不多的邏輯。
// 如果沒有顯示的設定為CGLIB,並且toProxyUtils.shouldProxyTargetClass還被暴露過時一個特殊的Bean,那就強制使用CGLIB代理吧 這裡一般和Scope無關的話,都返回false了
if (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
AutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {
proxyFactory.setProxyTargetClass(true);
}
return proxyFactory;
}
}
下面就可以看看具體的兩個實現類了:
AsyncAnnotationBeanPostProcessor
該實現類就是具體和@Async相關的一個類了~~~
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
// 建議換成AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME 這樣語意更加的清晰些
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
// 註解型別
@Nullable
private Class<? extends Annotation> asyncAnnotationType;
// 非同步的執行器
@Nullable
private Executor executor;
// 非同步異常處理器
@Nullable
private AsyncUncaughtExceptionHandler exceptionHandler;
// 此處特別注意:這裡設定為true,也就是說@Async的Advisor會放在首位
public AsyncAnnotationBeanPostProcessor() {
setBeforeExistingAdvisors(true);
}
// 可以設定需要掃描哪些註解型別。預設只掃描@Async以及`javax.ejb.Asynchronous`這個註解
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
this.asyncAnnotationType = asyncAnnotationType;
}
// 如果沒有指定。那就將執行全域性得預設查詢。在上下文中查詢唯一的`TaskExecutor`型別的Bean,或者一個名稱為`taskExecutor`的Executor
// 當然,如果上面途徑都沒找到。那就會採用一個預設的任務池
public void setExecutor(Executor executor) {
this.executor = executor;
}
public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
// 重寫了父類的方法。然後下面:自己new了一個AsyncAnnotationAdvisor ,傳入executor和exceptionHandler
// 並且最終this.advisor = advisor
// 因此可議看出:AsyncAnnotationAdvisor 才是重點了。它定義了它的匹配情況~~~~
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
AsyncAnnotationAdvisor
可以看出,它是一個PointcutAdvisor,並且Pointcut是一個AnnotationMatchingPointcut,因此是為註解來匹配的
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private AsyncUncaughtExceptionHandler exceptionHandler;
// 增強器
private Advice advice;
// 切點
private Pointcut pointcut;
// 兩個都為null,那就是都會採用預設的方案
public AsyncAnnotationAdvisor() {
this(null, null);
}
// 建立一個AsyncAnnotationAdvisor例項,可以自己指定Executor 和 AsyncUncaughtExceptionHandler
@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {
// 這裡List長度選擇2,應為絕大部分情況下只會支援這兩種@Async和@Asynchronous
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
if (exceptionHandler != null) {
this.exceptionHandler = exceptionHandler;
}
// 若沒指定,那就使用預設的SimpleAsyncUncaughtExceptionHandler(它僅僅是輸出了一句日誌而已)
else {
this.exceptionHandler = new SimpleAsyncUncaughtExceptionHandler();
}
// 這兩個方法是重點,下面會重點介紹
this.advice = buildAdvice(executor, this.exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
// 如果set了Executor,advice會重新構建。
public void setTaskExecutor(Executor executor) {
this.advice = buildAdvice(executor, this.exceptionHandler);
}
// 這裡注意:如果你自己指定了註解型別。那麼將不再掃描其餘兩個預設的註解,因此pointcut也就需要重新構建了
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
asyncAnnotationTypes.add(asyncAnnotationType);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
// 如果這個advice也實現了BeanFactoryAware,那就也把BeanFactory放進去
@Override
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
@Override
public Advice getAdvice() {
return this.advice;
}
@Override
public Pointcut getPointcut() {
return this.pointcut;
}
// 這個最終又是委託給`AnnotationAsyncExecutionInterceptor`,它是一個具體的增強器,有著核心內容
protected Advice buildAdvice(@Nullable Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler);
}
// Calculate a pointcut for the given async annotation types, if any
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
// 採用一個組合切面:ComposablePointcut (因為可能需要支援多個註解嘛)
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 這裡為何new出來兩個AnnotationMatchingPointcut??????
// 第一個:類匹配(只需要類上面有這個註解,所有的方法都匹配)this.methodMatcher = MethodMatcher.TRUE;
// 第二個:方法匹配。所有的類都可議。但是隻有方法上有這個註解才會匹配上
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
// 最終的結果都是取值為並集的~~~~~~~
result = result.union(mpc);
}
// 最後一個處理厲害了:也就是說你啥型別都木有的情況下,是匹配所有類的所有方法~~~
return (result != null ? result : Pointcut.TRUE);
}
}
從上的原始碼可議看出,預設是支援@Asycn以及EJB的那個非同步註解的。但是最終的增強行為,委託給了AnnotationAsyncExecutionInterceptor
AnnotationAsyncExecutionInterceptor:@Async攔截器
可知,它是一個MethodInterceptor,並且繼承自AsyncExecutionAspectSupport
AsyncExecutionAspectSupport
從類名就可以看出,它是用來支援處理非同步執行緒執行器的,若沒有指定,靠它提供一個預設的非同步執行器。
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
// 這是備選的。如果找到多個型別為TaskExecutor的Bean,才會備選的再用這個名稱去找的~~~
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
// 快取~~~AsyncTaskExecutor是TaskExecutor的子介面
// 從這可以看出:不同的方法,對應的非同步執行器還不一樣咯~~~~~~
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
// 預設的執行緒執行器
@Nullable
private volatile Executor defaultExecutor;
// 非同步異常處理器
private AsyncUncaughtExceptionHandler exceptionHandler;
// Bean工廠
@Nullable
private BeanFactory beanFactory;
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());
}
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
this.defaultExecutor = defaultExecutor;
this.exceptionHandler = exceptionHandler;
}
public void setExecutor(Executor defaultExecutor) {
this.defaultExecutor = defaultExecutor;
}
public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
// 該方法是找到一個非同步執行器,去執行這個方法~~~~~~
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 如果快取中能夠找到該方法對應的執行器,就立馬返回了
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 抽象方法:`AnnotationAsyncExecutionInterceptor`有實現。就是@Async註解的value值
String qualifier = getExecutorQualifier(method);
// 現在知道@Async直接的value值的作用了吧。就是制定執行此方法的執行器的(容器內執行器的Bean的名稱)
// 當然有可能為null。注意此處是支援@Qualified註解標註在類上來區分Bean的
// 注意:此處targetExecutor仍然可能為null
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
// 註解沒有指定value值,那就去找預設的執行器
else {
targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {
// 去找getDefaultExecutor()~~~
synchronized (this.executors) {
if (this.defaultExecutor == null) {
this.defaultExecutor = getDefaultExecutor(this.beanFactory);
}
targetExecutor = this.defaultExecutor;
}
}
}
// 若還未null,那就返回null吧
if (targetExecutor == null) {
return null;
}
// 把targetExecutor 包裝成一個AsyncTaskExecutor返回,並且快取起來。
// TaskExecutorAdapter就是AsyncListenableTaskExecutor的一個實現類
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
// 子類去複寫此方法。也就是拿到對應的key,從而方便找bean吧(執行器)
@Nullable
protected abstract String getExecutorQualifier(Method method);
// @since 4.2.6 也就是根據
@Nullable
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
if (beanFactory == null) {
throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
" to access qualified executor '" + qualifier + "'");
}
return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}
// @since 4.2.6
// Retrieve or build a default executor for this advice instance 檢索或者建立一個預設的executor
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
// 這個處理很有意思,它是用用的try catch的技巧去處理的
try {
// 如果容器記憶體在唯一的TaskExecutor(子類),就直接返回了
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
// 這是出現了多個TaskExecutor型別的話,那就按照名字去拿 `taskExecutor`且是Executor型別
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
// 如果再沒有找到,也不要報錯,而是接下來建立一個預設的處理器
// 這裡輸出一個info資訊
catch (NoSuchBeanDefinitionException ex2) {
}
}
catch (NoSuchBeanDefinitionException ex) {
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
}
// 這裡還沒有獲取到,就放棄。 用本地預設的executor吧~~~
// 子類可以去複寫此方法,發現為null的話可議給一個預設值~~~~比如`AsyncExecutionInterceptor`預設給的就是`SimpleAsyncTaskExecutor`作為執行器的
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
//Delegate for actually executing the given task with the chosen executor
// 用選定的執行者實際執行給定任務的委託
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 根據不同的返回值型別,來採用不同的方案去非同步執行,但是執行器都是executor
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// ListenableFuture介面繼承自Future 是Spring自己擴充套件的一個介面。
// 同樣的AsyncListenableTaskExecutor也是Spring擴充套件自AsyncTaskExecutor的
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// 普通的submit
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
// 沒有返回值的情況下 也用sumitt提交,按時返回null
else {
executor.submit(task);
return null;
}
}
// 處理錯誤
protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
// 如果方法的返回值型別是Future,就rethrowException,表示直接throw出去
if (Future.class.isAssignableFrom(method.getReturnType())) {
ReflectionUtils.rethrowException(ex);
} else {
// Could not transmit the exception to the caller with default executor
try {
this.exceptionHandler.handleUncaughtException(ex, method, params);
} catch (Throwable ex2) {
logger.error("Exception handler for async method '" + method.toGenericString() +
"' threw unexpected exception itself", ex2);
}
}
}
}
這個類相當於已經做了大部分的工作了,接下來繼續看子類:
AsyncExecutionInterceptor
終於,從此處開始。可議看出它是一個MethodInterceptor,是一個增強器了。但是從命名中也可以看出,它已經能夠處理非同步的執行了(比如基於XML方式的),但是還和註解無關
// 他繼承自AsyncExecutionAspectSupport 來處理非同步方法的處理,同時是個MethodInterceptor,來增強複合條件的方法
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
...
// 顯然這個方法它直接返回null,因為XML配置嘛~~~~
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
return null;
}
// 這個厲害了。如果父類返回的defaultExecutor 為null,那就new一個SimpleAsyncTaskExecutor作為預設的執行器
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
// 最高優先順序 希望的是最優先執行(XML配置就是這種優先順序)
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
// 最重要的當然是這個invoke方法
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 注意:此處是getMostSpecificMethod 拿到最終要執行的那個方法
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
// 橋接方法~~~~~~~~~~~~~~
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// determine一個用於執行此方法的非同步執行器
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 構造一個任務,Callable(此處不採用Runable,因為需要返回值)
Callable<Object> task = () -> {
try {
// result就是返回值
Object result = invocation.proceed();
// 注意此處的處理~~~~ 相當於如果不是Future型別,就返回null了
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
// 處理執行時可能產生的異常~~~~~~
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 提交任務~~~~invocation.getMethod().getReturnType()為返回值型別
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}
SimpleAsyncTaskExecutor:非同步執行使用者任務的SimpleAsyncTaskExecutor。每次執行客戶提交給它的任務時,它會啟動新的執行緒,並允許開發者控制併發執行緒的上限(concurrencyLimit),從而起到一定的資源節流作用。預設時,concurrencyLimit取值為-1,即不啟用資源節流
所以它不是真的執行緒池,這個類不重用執行緒,每次呼叫都會建立一個新的執行緒(因此建議我們在使用@Aysnc的時候,自己配置一個執行緒池,節約資源)
AnnotationAsyncExecutionInterceptor
很顯然,他是在AsyncExecutionInterceptor的基礎上,提供了對@Async註解的支援。所以它是繼承它的。
// 它繼承自AsyncExecutionInterceptor ,只複寫了一個方法
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
...
// 由此可以見它就是去拿到@Async的value值。以方法的為準,其次是類上面的
// 備註:發現這裡是不支援EJB的@Asynchronous註解的,它是不能指定執行器的
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}
}
使用推薦配置
@EnableAsync //對應的@Enable註解,最好寫在屬於自己的配置檔案上,保持內聚性
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); //核心執行緒數
executor.setMaxPoolSize(20); //最大執行緒數
executor.setQueueCapacity(1000); //佇列大小
executor.setKeepAliveSeconds(300); //執行緒最大空閒時間
executor.setThreadNamePrefix("fsx-Executor-"); 指定用於新建立的執行緒名稱的字首。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略(一共四種,此處省略)
// 這一步千萬不能忘了,否則報錯: java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized
executor.initialize();
return executor;
}
// 異常處理器:當然你也可以自定義的,這裡我就這麼簡單寫了~~~
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
使用Spring的非同步,我個人十分建議配置上自己自定義的配置器(如上配置僅供參考),這樣能夠更好的掌握(比如異常處理AsyncUncaughtExceptionHandler~~~)
總結
有了Spring AOP整體執行原理作為基礎,再看這些基於AOP的應用,簡直行雲流水。
最後,使用的有兩個建議:
非同步方法建議儘量處理耗時的任務,或者是處理不希望阻斷主流程的任務(比如非同步記錄操作日誌)
儘量為@Async準備一個專門的執行緒池來提高效率減少開銷,而不要用Spring預設的SimpleAsyncTaskExecutor,它不是一個真正的執行緒池~
spring boot預設執行緒池的正確使用方式
我們在Spring專案的時候,會用到非同步註解 @Async 註解,從 Spring原理之@Async (上一段)我們可以知道其實他底層用到的預設的所謂的執行緒池並不是真的執行緒池,每次呼叫都會建立一個新的執行緒,那麼我們如何來修改這個預設的執行緒池或者說使用我們自定義的執行緒池呢?spring boot自動裝配其實已經為我們處理了上述問題,spring boot設定了預設的連線池以及預設基本配置,詳見下文
spring boot自動裝配
自動裝配是spring boot的一大特點,自動裝配其實就是spring把一些常用的bean提前註冊到了容器,如redis、jdbc、activemq等,我們可以直接來使用這些bean,所有的自動裝配的程式碼都在spring-boot-autoconfigure這個模組裡。
內建了很多,市面上常用的元件基本都包括了。
執行緒池的自動裝配
執行緒池的自動裝配程式碼在org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration這個類裡面。
可以看到類上有一個註解:@ConditionalOnClass({ThreadPoolTaskExecutor.class}),也就是說自動裝配是有條件的,前提是在類路徑下有這個類, ThreadPoolTaskExecutor這個類是spring的執行緒池實現類,這個類在spring-context包中,如果引入了這個包,那麼spring boot將會向容器註冊一個預設的執行緒池。
預設執行緒池的引數,我們可以通過配置檔案指定,格式如下:
spring.task.execution.pool.core-size=8
spring.task.execution.pool.max-size=Integer.MAX_VALUE
spring.task.execution.pool.queue-capacity=Integer.MAX_VALUE
spring.task.execution.pool.keep-alive=60 //單位是:秒
還有一些其他引數,都在org.springframework.boot.autoconfigure.task.TaskExecutionProperties這個類裡面,如果我們不指定的話,會使用預設值。
預設執行緒池的修改--兩種修改方式:1、修改環境變數 2、修改預設的執行緒池
修改環境變數方式:
## 配置執行緒池個數
如果是 CPU 密集型任務,那麼執行緒池的執行緒個數應該儘量少一些,一般為 CPU 的個數+1條執行緒。
如果是 IO 密集型任務,那麼執行緒池的執行緒可以放的很大,如 2*CPU 的個數。
對於混合型任務,如果可以拆分的話,通過拆分成 CPU 密集型和 IO 密集型兩種來提高執行效率;如果不能拆分的的話就可以根據實際情況來調整執行緒池中執行緒的個數
# 執行緒池建立時的初始化執行緒數,預設為8
spring.task.execution.pool.core-size=5
# 執行緒池的最大執行緒數,預設為int最大值
spring.task.execution.pool.max-size=10
# 用來緩衝執行任務的佇列,預設為int最大值
spring.task.execution.pool.queue-capacity=5
# 非核心執行緒的存活時間,執行緒終止前允許保持空閒的時間
spring.task.execution.pool.keep-alive=60
# 執行緒池的字首名稱,設定好了之後可以方便我們在日誌中檢視處理任務所在的執行緒池
spring.task.execution.thread-name-prefix=god-jiang-task-
# 是否允許核心執行緒超時
spring.task.execution.pool.allow-core-thread-timeout=
# 是否等待剩餘任務完成後才關閉應用
spring.task.execution.shutdown.await-termination=
# 等待剩餘任務完成的最大時間
spring.task.execution.shutdown.await-termination-period=
修改預設執行緒池方式:
關於修改 @Async預設的執行緒池 ,我們僅僅需要實現一個 AsyncConfigurer 類,進行**getAsyncExecutor 方法 **的重寫即可,具體例子如下:
@Slf4j
@EnableAsync //對應的@Enable註解,最好寫在屬於自己的配置檔案上,保持內聚性
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); //核心執行緒數
executor.setMaxPoolSize(20); //最大執行緒數
executor.setQueueCapacity(1000); //佇列大小
executor.setKeepAliveSeconds(300); //執行緒最大空閒時間
executor.setThreadNamePrefix("async-Executor-"); 指定用於新建立的執行緒名稱的字首。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略(一共四種)
/**
* 拒絕處理策略
* CallerRunsPolicy():交由呼叫方執行緒執行,比如 main 執行緒。
* AbortPolicy():直接丟擲異常。
* DiscardPolicy():直接丟棄。
* DiscardOldestPolicy():丟棄佇列中最老的任務。
*/
/**
* 特殊說明:
* 1. 這裡演示環境,拒絕策略咱們採用丟擲異常
* 2.真實業務場景會把快取佇列的大小會設定大一些,
* 如果,提交的任務數量超過最大執行緒數量或將任務環快取到本地、redis、mysql中,保證訊息不丟失
* 3.如果專案比較大的話,非同步通知種類很多的話,建議採用MQ做非同步通知方案
*/
// 這一步千萬不能忘了,否則報錯: java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized
// 執行緒初始化
executor.initialize();
return executor;
}
// 異常處理器:當然你也可以自定義的,這裡我就這麼簡單寫了~~~
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
log.error("=========================="+arg0.getMessage()+"=======================", arg0);
log.error("exception method:"+arg1.getName());
}
};
}
}
具體使用:
@Component
public class AsyncMyTask {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
// 直接使用該註解即可
@Async
public void doTask(int i) throws InterruptedException{
logger.info("Task-Native" + i + " started.");
}
}
預設執行緒池的使用
直接使用@Autowired這個註解注入即可。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestApplication.class)
public class TestApplicationTests {
// 注入ThreadPoolTaskExecutor
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
public void ThreadTest(){
System.out.println(threadPoolTaskExecutor);
System.out.println("new Runnable()");
// 建立並執行執行緒,方式一
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("new Runnable()"+i+"當前執行緒"+Thread.currentThread().getName());
}
}
});
// // 建立並執行執行緒,方式二
System.out.println("lambda");
threadPoolTaskExecutor.execute(() -> {
for (int i = 0; i < 100; i++) {
System.out.println("lambda"+i+"當前執行緒"+Thread.currentThread().getName());
}
});
}
}
使用@Async和@EnableAsync註解
// 在spring boot應用中使用@Async很簡單:
1、呼叫非同步方法類上或者啟動類加上註解@EnableAsync
2、在需要被非同步呼叫的方法外加上@Async
3、所使用的@Async註解方法的類物件應該是Spring容器管理的bean物件
//#首先使用@EnableAsync註解開啟非同步呼叫功能,該註解可以放置的位置有:
// #啟動類上方
@EnableAsync
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
// #呼叫非同步的當前類上方
@EnableAsync
@RestController
public class TestAsyncController(){}
// #在配置類上方使用
@Configuration
@EnableAsync
public class AsyncTaskPoolConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-ws-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
//# 編寫非同步請求 在非同步執行的方法上添加註解:@Async
Component
@Log4j2
public class AsyncTask {
//這裡注入的是dubbo的服務,和非同步請求沒有多大關係
@Reference(check = false)
private AuthorFacade authorFacade;
/**
* 獲取作者資訊
*
* @param authorId 作者ID
* @return 作者資訊
*/
@Async("taskExecutor")
public Future<AuthorDTO> getAuthor(String authorId){
try {
System.out.println("執行執行緒的名字:"+Thread.currentThread().getName());
return new AsyncResult<AuthorDTO>(authorFacade.getAuthor(authorId));
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
// #什麼情況下會導致@Async非同步方法會失效?
1.不要在本類中非同步呼叫。即一個方法是非同步方法,然後用另一個方法呼叫這個非同步方法。
2.不要有返回值,使用void
3.不能使用本類的私有方法或者非介面化加註@Async,因為代理不到失效
4.非同步方法不能使用static修飾
5.非同步類需使用@Component註解,不然將導致spring無法掃描到非同步類
6.SpringBoot框架必須在啟動類中增加@EnableAsync註解
7.非同步方法不要和事物註解同時存在。可以在事物的方法中呼叫另外一個類中的非同步方法。在呼叫Async方法的方法上標註@Transactional是管理呼叫方法的事務的,在Async方法上標註@Transactional是管理非同步方法的事務,事務因執行緒隔離
8.諸如以上幾點的情況比如spring中的@Transactional還有cache註解也不能有以上幾點情況,否則也會失效的,因為本質都是因為代理的機制導致的
// #使用@Async 註解時,需要注意一下幾點:
當專案中只有一個執行緒池時,我們只需要寫 @Async 即可將需要非同步的方法進行非同步;
當專案中存在多個執行緒池,我們在寫非同步時,需要注意如果只寫@Async註解沒有任何屬性則將此方法的執行非同步到帶有 @Primary 註解修飾的執行緒池中執行。
還可以將方法非同步到指定的執行緒池中,如 @Async(“threadPool”)則是將此方法非同步到threadPool 執行緒池中執行。
自定義執行緒池
自定義執行緒池的建立,也非常簡單,和spring boot的寫法並無二致。
也是寫一個配置類,在使用的時候,根據執行緒池的名稱來使用具體的執行緒池
/**
* 自定義建立執行緒池
*/
@Configuration
public class TaskExecutePool {
@Bean("myTaskAsyncPool") //也可以直接寫@Bean 預設就是方法名
public Executor myTaskAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); //核心執行緒數
executor.setMaxPoolSize(20); //最大執行緒數
executor.setQueueCapacity(1000); //佇列大小
executor.setKeepAliveSeconds(300); //執行緒最大空閒時間
executor.setThreadNamePrefix("async-Executor-"); 指定用於新建立的執行緒名稱的字首。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略(一共四種,此處省略)
executor.initialize();
return executor;
}
}
自定義執行緒池的使用
因為spring boot預設建立了一個,所以加上自定義的,容器中有兩個執行緒池,如果我們使用@Autowired註解注入,看下@Autowired這個註解的邏輯:
首先在容器中查詢對應型別的bean
如果查詢結果剛好為一個,就將該bean裝配給@Autowired指定的資料
如果查詢的結果不止一個,那麼@Autowired會根據名稱來查詢。
如果查詢的結果為空,那麼會丟擲異常。
所以使用@Autowired這個注入的話,要保證名稱和我們自定義的一致才可以,建議指定名稱。
// @Async註解的使用方式
@Component
public class AsyncMyTask {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
// 註解中標註使用的執行緒池是哪一個
// myTaskAsynPool即配置執行緒池的方法名,此處如果不寫自定義執行緒池的方法名,會使用預設的執行緒池
@Async("myTaskAsyncPool")
public void doTask(int i) throws InterruptedException{
logger.info("Task-Native" + i + " started.");
}
}
如果想通過配置檔案來對執行緒池中的屬性進行方便修改的話
(1)配置檔案中進行配置
task.pool.corePoolSize=20
task.pool.maxPoolSize=40
task.pool.keepAliveSeconds=300
task.pool.queueCapacity=50
(2)編寫獲取配置屬性的配置類
/**
* 執行緒池配置屬性類
*/
@ConfigurationProperties(prefix = "task.pool")
@Data
public class ThreadPoolConfig {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
}
(3)使用執行緒池配置屬性類
/**
* 建立執行緒池
*/
@Configuration
public class TaskExecutePool {
@Autowired
private ThreadPoolConfig config;
@Bean
public Executor myTaskAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心執行緒池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大執行緒數
executor.setMaxPoolSize(config.getMaxPoolSize());
//佇列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活躍時間
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//執行緒名字字首
executor.setThreadNamePrefix("async-Executor-");
// 拒絕策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
帶條件的自定義執行緒池
@Configuration
public class MyThreadPool {
@Bean("myThreadPool")
@Conditional(CacheCondition.class)
public ThreadPoolTaskExecutor myThreadPool(ThreadProperties threadProperties) {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(threadProperties.getSize());//核心執行緒數量
threadPool.setMaxPoolSize(threadProperties.getMax());//最大執行緒數
threadPool.setQueueCapacity(threadProperties.getQueue());//佇列數量
threadPool.setKeepAliveSeconds(threadProperties.getKeep());//執行緒所允許的空閒時間
threadPool.setThreadNamePrefix("myThread-");
threadPool.setRejectedExecutionHandler((r, executor) -> r.run());
threadPool.initialize();
return threadPool;
}
}
這個多了一個條件註解@Conditional(CacheCondition.class),需要滿足這個條件,才會建立。如果條件不滿足,將不會建立這個執行緒池,如果自定義的只有這一個的話,使用@Autowired這個注入的話,這個時候將會使用預設的執行緒池,這個一定要注意,如果不想使用預設的,在使用的時候,最好指定名稱。