[從原始碼學設計]螞蟻金服SOFARegistry 之 自動調節間隔週期性任務
阿新 • • 發佈:2020-12-19
# [從原始碼學設計]螞蟻金服SOFARegistry 之 自動調節間隔週期性任務
[toc]
## 0x00 摘要
SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。
本文為第九篇,介紹SOFARegistry自動調節間隔週期性任務的實現。
## 0x01 業務領域
螞蟻金服這裡的業務需求主要是:
- 啟動一個無限迴圈任務,不定期執行任務;
- 啟動若干週期性延時任務;
- 某些週期性任務需要實現自動調節間隔功能:程式一旦遇到發生超時異常,就將間隔時間調大,如果連續超時,那麼每次間隔時間都會增大一倍,一直到達外部引數設定的上限為止,一旦新任務不再發生超時異常,間隔時間又會自動恢復為初始值
## 0x02 阿里方案
阿里採用了:
- ExecutorService實現了無限迴圈任務;
- ScheduledExecutorService 實現了週期性任務;
- TimedSupervisorTask 實現了自動調節間隔的週期性任務;
我們在設計延時/週期性任務時就可以參考TimedSupervisorTask的實現
## 0x03 Scheduler
Scheduler類中就是這個方案的體現。
首先,我們需要看看 Scheduler的程式碼。
```java
public class Scheduler {
private final ScheduledExecutorService scheduler;
public final ExecutorService versionCheckExecutor;
private final ThreadPoolExecutor expireCheckExecutor;
@Autowired
private AcceptorStore localAcceptorStore;
public Scheduler() {
scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
}
public void startScheduler() {
scheduler.schedule(
new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
30, TimeUnit.SECONDS);
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
}
public void stopScheduler() {
if (scheduler != null && !scheduler.isShutdown()) {
scheduler.shutdown();
}
if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) {
versionCheckExecutor.shutdown();
}
}
}
```
接下來我們就逐一分析下其實現或者說是設計選擇。
## 0x04 無限迴圈任務
阿里這裡採用ExecutorService實現了無限迴圈任務,不定期完成業務。
### 4.1 ExecutorService
**Executor**:一個JAVA介面,其定義了一個接收Runnable物件的方法executor,其方法簽名為executor(Runnable command),該方法接收一個Runable例項,用來執行一個實現了Runnable介面的類。
**ExecutorService**:是一個比Executor使用更廣泛的子類介面。
其提供了生命週期管理的方法,返回 Future 物件,以及可跟蹤一個或多個非同步任務執行狀況返回Future的方法;
當所有已經提交的任務執行完畢後將會關閉ExecutorService。因此我們一般用該介面來實現和管理多執行緒。
這裡ExecutorService雖然其不能提供週期性功能,但是`localAcceptorStore.changeDataCheck`本身就是一個while (true) loop,其可以依靠DelayQueue來完成類似週期功能 。
```java
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
public void changeDataCheck() {
while (true) {
try {
D