[AOP] 5. Spring AOP中提供的種種Aspects
本文繼續討論ConcurrencyThrottleInterceptor(基於Spring 4.3.7)。以及上一篇文章中遺留的一個關於SimpleAsyncTaskExecutor類中屬性concurrencyLimit的問題。
這些都和併發控制相關。但是這裡需要事先說明的一點是,這些類和實現的年代都比較久遠了,比如ConcurrencyThrottleInterceptor是在2004年的Spring 1.x中就存在了,那個年代還沒有JDK中的java.util.concurrent併發包。因此這裡更多地是學習和討論一個解決特定問題的思想,而不是鼓勵大家去使用它。對於併發控制的問題,利用併發包中的相關類可以更好地解決。
首先還是按照慣例畫出關鍵型別之間的示意圖:
併發控制是如何實現的
ConcurrencyThrottleInterceptor
/**
* Interceptor that throttles concurrent access, blocking invocations if a specified concurrency
* limit is reached.
*
* <p>
* Can be applied to methods of local services that involve heavy use of system resources, in a
* scenario where it is more efficient to throttle concurrency for a specific service rather than
* restricting the entire thread pool (e.g. the web container's thread pool).
*
* <p>
* The default concurrency limit of this interceptor is 1. Specify the "concurrencyLimit" bean
* property to change this value.
*
* @author Juergen Hoeller
* @since 11.02.2004
* @see #setConcurrencyLimit
*/
@SuppressWarnings("serial")
public class ConcurrencyThrottleInterceptor extends ConcurrencyThrottleSupport
implements MethodInterceptor, Serializable {
public ConcurrencyThrottleInterceptor() {
setConcurrencyLimit(1);
}
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
beforeAccess();
try {
return methodInvocation.proceed();
} finally {
afterAccess();
}
}
}
這個型別的實現也挺簡潔的,它繼承了ConcurrencyThrottleSupport,實現了MethodInterceptor和Serializable介面。而其中的invoke方法主要就是為了實現MethodInterceptor介面。
它想要完成的功能註釋中也說的比較明白了:對於目標方法的呼叫實現併發控制,通過concurrencyLimit來定義併發度。
從上述invoke方法的實現來看,主要的控制邏輯應該都在beforeAccess這個方法的實現中,它定義在父類ConcurrencyThrottleSupport:
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException(
"Thread was interrupted while waiting for invocation access, "
+ "but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount + " has reached limit "
+ this.concurrencyLimit + " - blocking");
}
try {
this.monitor.wait();
} catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
this.concurrencyCount++;
}
}
}
首先會判斷併發度是否被設定成了NO_CONCURRENCY(0),即不允許任何執行。如果是這樣的話就會直接丟擲異常進行提示。
當concurrencyLimit(也就是併發度)大於0的時候,會在monitor這個物件上設定一個同步程式碼塊。這個同步程式碼塊應用的是最底層的wait/nofity機制來實現併發控制,也算是一個wait/notify工作機制的參考例項吧,它非常具體的說明了在使用這個機制的時候所需要注意的幾個點:
- 需要在一個while迴圈中進行wait操作,當然這個不是必要的,不過是一種最佳實踐。while的迴圈條件是因具體的業務而異的,這個條件的作用是保證當前被阻塞的執行緒真的能夠再次開始執行,放到上面這個例子中,就是隻有在當前的併發量小於閾值(也就是concurrencyLimit)的時候,才能夠喚醒被阻塞的執行緒。因此在多執行緒環境中,一切皆有可能,某次喚醒後,在其他執行執行緒的影響下,本來被滿足的條件再次變為不滿足的狀態。
- 在同步程式碼塊中進行wait操作。從上面的實現來看,while迴圈確實也處於一個同步程式碼塊中。這樣做的目的同樣是為了保證喚醒訊息的準確性。
- 同步程式碼塊的監視器物件應該和wait方法的呼叫物件一致,如上述程式碼中的monitor物件。
在方法的最後,增加了計數器concurrencyCount的值,用來表示當前的併發量。
完成了beforeAccess方法的呼叫後,會執行目標方法:return methodInvocation.proceed();
。執行完畢後在finally程式碼塊中呼叫afterAccess方法:
protected void afterAccess() {
if (this.concurrencyLimit >= 0) {
synchronized (this.monitor) {
this.concurrencyCount--;
if (logger.isDebugEnabled()) {
logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
}
this.monitor.notify();
}
}
}
這個方法的作用很簡單,就是在目標方法執行完畢之後減少當前的併發計數器,並喚醒被阻塞的執行緒。這裡需要注意的是,喚醒被阻塞執行緒的notify操作也是在同一個監視器物件的同步程式碼塊中實現的。
瞭解了重要方法的實現之後,這個Interceptor的作用也就非常清晰了。比如設定了併發度為3,那麼目標方法就最多隻有三個執行緒能夠同時訪問,當第四個執行緒嘗試進行訪問的時候會在wait處被阻塞,直到前面的三個執行緒中有一個執行完畢才會喚醒一個被阻塞的執行緒。
應用例項
Advisor的定義
下面寫一個簡單的例子來應用ConcurrencyThrottleInterceptor,首先是定義Pointcut以及Interceptor本身:
@Bean
public Advisor throttleAdvisor() {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression(
"execution(* (@org.springframework.stereotype.Service *).doThrottleThings(..))");
return new DefaultPointcutAdvisor(pointcut, concurrencyThrottleInterceptor());
}
@Bean
public ConcurrencyThrottleInterceptor concurrencyThrottleInterceptor() {
ConcurrencyThrottleInterceptor interceptor = new ConcurrencyThrottleInterceptor();
interceptor.setConcurrencyLimit(3);
return interceptor;
}
這裡完全使用了JavaConfig的方式進行配置。第一個方法throttleAdvisor宣告的實際上是一個完整的Aspect,包含了兩部分:
- Pointcut
- Interceptor
目標方法的定義
public void doThrottleThings() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + ": is doing something, needing 5s");
Thread.sleep(5000);
}
為了驗證併發控制是否生效,首先會打印出當前執行緒的名稱,然後睡眠5秒。
啟動方法的定義
直接使用JUnit幫助方法的啟動:
@Test
public void testConcurrencyThrottle() throws InterruptedException {
IntStream.range(0, 5).forEach(i -> {
new Thread(() -> {
try {
service.doThrottleThings();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
});
Thread.sleep(10000);
}
在最後睡眠10s是為了讓該測試方法不會退出。因為它畢竟不是main方法,JUnit會在測試方法完成之後結束掉所有執行緒,然後是關閉JVM。
最後的列印結果是這樣的:
[Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 0
[Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
[Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
[Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
[Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Concurrency count 3 has reached limit 3 - blocking
Thread-4: is doing something, needing 5s
Thread-5: is doing something, needing 5s
Thread-6: is doing something, needing 5s
[Thread-4] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
[Thread-3] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 2
Thread-3: is doing something, needing 5s
[Thread-6] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 2
[Thread-5] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Returning from throttle at concurrency count 1
[Thread-2] DEBUG o.s.a.i.ConcurrencyThrottleInterceptor - Entering throttle at concurrency count 1
Thread-2: is doing something, needing 5s
可以清晰地發現當併發量達到3之後,剩下的兩個執行緒會被阻塞。等待5s之後這兩個執行緒被喚醒。
SimpleAsyncTaskExecutor中的併發控制
清楚了ConcurrencyThrottleInterceptor是如何處理併發控制之後,讓我們轉過頭來看看SimpleAsyncTaskExecutor中的併發控制,在這個類中有這麼一個成員物件和兩個常量:
/**
* Permit any number of concurrent invocations: that is, don't throttle concurrency.
*/
public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;
/**
* Switch concurrency 'off': that is, don't allow any concurrent invocations.
*/
public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;
/** Internal concurrency throttle used by this executor */
private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
也就是說在內部它使用了一個名為ConcurrencyThrottleAdapter的併發控制物件,並且複用了ConcurrencyThrottleSupport中定義的兩個常量,用來表示不限制併發度和完全不允許任何執行(哪怕序列也不允許了):
/**
* Subclass of the general ConcurrencyThrottleSupport class, making {@code beforeAccess()} and
* {@code afterAccess()} visible to the surrounding class.
*/
private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
@Override
protected void beforeAccess() {
super.beforeAccess();
}
@Override
protected void afterAccess() {
super.afterAccess();
}
}
這個類的目的就是為了讓SimpleAsyncTaskExecutor能夠訪問到定義在ConcurrencyThrottleSupport中的beforeAccess以及afterAccess兩個方法。
那麼在什麼時候會用到這個adapter呢?
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
} else {
doExecute(taskToUse);
}
}
上述程式碼是定義在SimpleAsyncTaskExecutor中的execute方法。這裡面有三個問題值得探究:
- taskDecorator是個啥?
- 何時執行併發控制以及什麼任務會被歸併發控制?
- ConcurrencyThrottlingRunnable是幹嘛用的?
TaskDecorator
首先第一個問題:
TaskDecorator是個介面,抽象了裝飾器模式的概念:
public interface TaskDecorator {
/**
* Decorate the given {@code Runnable}, returning a potentially wrapped
* {@code Runnable} for actual execution.
* @param runnable the original {@code Runnable}
* @return the decorated {@code Runnable}
*/
Runnable decorate(Runnable runnable);
}
Spring中並沒有這個介面的任何實現,因此這個想象空間就留個各位開發人員了,不過從Javadoc中的說明可以大概明白它的意圖,比如:
- 設定任務執行的上下文環境
- 進行某些監控以及統計方面的工作
併發控制的執行時機
首先需要滿足的條件就是isThrottleActive()返回true:
public boolean isThrottleActive() {
return (this.concurrencyLimit > 0);
}
即設定的併發度大於0時就被認為是開啟了Throttling功能。
同時還需要滿足的條件是startTimeout > TIMEOUT_IMMEDIATE
。後面的常量的值是0。也就是說如何任務的timeout值被設定成TIMEOUT_IMMEDIATE的話,這種任務是不屬於併發控制的範疇的。(另外這個timeout除了和TIMEOUT_IMMEDIATE進行了比較外,沒有其它的用途了,這一點我覺得有點莫名其妙。)
ConcurrencyThrottlingRunnable
最終,會將任務封裝在一個ConcurrencyThrottlingRunnable物件中,然後執行該wrapper物件。
/**
* This Runnable calls {@code afterAccess()} after the target Runnable has finished its execution.
*/
private class ConcurrencyThrottlingRunnable implements Runnable {
private final Runnable target;
public ConcurrencyThrottlingRunnable(Runnable target) {
this.target = target;
}
@Override
public void run() {
try {
this.target.run();
} finally {
concurrencyThrottle.afterAccess();
}
}
}
這樣做也僅僅是為了在finally中執行afterAccess配合併發控制的資源釋放這一過程。
總結
本文討論兩個方面的內容:
- ConcurrencyThrottleInterceptor的實現原理
- SimpleAsyncTaskExecutor是如何實現併發控制的
至此,Spring AOP中的幾個比較實用的Aspects(Interceptors)就都介紹完畢了。