請求合併的 3 種方式,大大提高介面效能。。。
來源:https://zhenbianshu.github.io/
將相似或重複請求在上游系統中合併後發往下游系統,可以大大降低下游系統的負載,提升系統整體吞吐率。
文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實現BatchCollapser 三種請求合併技術,並通過其具體實現對比各自適用的場景。
前言
工作中,我們常見的請求模型都是”請求-應答”式,即一次請求中,服務給請求分配一個獨立的執行緒,一塊獨立的記憶體空間,所有的操作都是獨立的,包括資源和系統運算。我們也知道,在請求中處理一次系統 I/O 的消耗是非常大的,如果有非常多的請求都進行同一類 I/O 操作,那麼是否可以將這些 I/O 操作都合併到一起,進行一次 I/O 操作,是否可以大大降低下游資源伺服器的負擔呢?
最近我工作之餘的大部分時間都花在這個問題的探究上了,對比了幾個現有類庫,為了解決一個小問題把 hystrix javanica 的程式碼翻了一遍,也根據自己工作中遇到的業務需求實現了一個簡單的合併類,收穫還是挺大的。可能這個需求有點”偏門”,在網上搜索結果並不多,也沒有綜合一點的資料,索性自己總結分享一下,希望能幫到後來遇到這種問題的小夥伴。
Hystrix Collapser
hystrix
開源的請求合併類庫(知名的)好像也只有 Netflix 公司開源的 Hystrix 了, hystrix 專注於保持 WEB 伺服器在高併發環境下的系統穩定,我們常用它的熔斷器(Circuit Breaker) 來實現服務的服務隔離和災時降級,有了它,可以使整個系統不至於被某一個介面的高併發洪流沖塌,即使介面掛了也可以將服務降級,返回一個人性化的響應。請求合併作為一個保障下游服務穩定的利器,在 hystrix 內實現也並不意外。
我們在使用 hystrix 時,常用它的 javanica 模組,以註解的方式編寫 hystrix 程式碼,使程式碼更簡潔而且對業務程式碼侵入更低。所以在專案中我們一般至少需要引用 hystrix-core 和 hystrix-javanica 兩個包。
另外,hystrix 的實現都是通過 AOP,我們要還要在專案 xml 裡顯式配置 HystrixAspect 的 bean 來啟用它。
<aop:aspectj-autoproxy/> <bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />
collapser
hystrix collapser 是 hystrix 內的請求合併器,它有自定義 BatchMethod 和 註解兩種實現方式,自定義 BatchMethod 網上有各種教程,實現起來很複雜,需要手寫大量程式碼,而註解方式只需要新增兩行註解即可,但配置方式我在官方文件上也沒找見,中文方面本文應該是獨一份兒了。
其實現需要注意的是:
- 我們在需要合併的方法上新增 @HystrixCollapser 註解,在定義好的合併方法上新增 @HystrixCommand 註解;
- single 方法只能傳入一個引數,多引數情況下需要自己包裝一個引數類,而 batch 方法需要
java.util.List<SingleParam>
; - single 方法返回
java.util.concurrent.Future<SingleReturn>
, batch 方法返回java.util.List<SingleReturn>
,且要保證返回的結果數量和傳入的引數數量一致。
下面是一個簡單的示例:
public class HystrixCollapserSample {
@HystrixCollapser(batchMethod = "batch")
public Future<Boolean> single(String input) {
return null; // single方法不會被執行到
}
public List<Boolean> batch(List<String> inputs) {
return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}
原始碼實現
為了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的原始碼,這裡簡單總結一下 hystrix 請求合併器的具體實現,原始碼的詳細解析在我的筆記:Hystrix collasper 原始碼解析。
- 在 spring-boot 內註冊切面類的 bean,裡面包含 @HystrixCollapser 註解切面;
- 在方法執行時檢測到方法被 HystrixCollapser 註解後,spring 呼叫
methodsAnnotatedWithHystrixCommand
方法來執行 hystrix 代理; - hystrix 獲取一個 collapser 例項(在當前 scope 內檢測不到即建立);
- hystrix 將當前請求的引數提交給 collapser, 由 collapser 儲存在一個
concurrentHashMap (RequestArgumentType -> CollapsedRequest)
內,此方法會建立一個 Observable 物件,並返回一個 觀察此物件的 Future 給業務執行緒; - collpser 在建立時會建立一個 timer 執行緒,定時消費儲存的請求,timer 會將多個請求構造成一個合併後的請求,呼叫 batch 執行後將結果順序對映到輸出引數,並通知 Future 任務已完成。
需要注意,由於需要等待 timer 執行真正的請求操作,collapser 會導致所有的請求的 cost 都會增加約 timerInterval/2 ms;
配置
hystrix collapser 的配置需要在 @HystrixCollapser 註解上使用,主要包括兩個部分,專有配置和 hystrixCommand 通用配置;
專有配置包括:
- collapserKey,這個可以不用配置,hystrix 會預設使用當前方法名;
- batchMethod,配置 batch 方法名,我們一般會將 single 方法和 batch 方法定義在同一個類內,直接填方法名即可;
- scope,最坑的配置項,也是逼我讀原始碼的元凶,
com.netflix.hystrix.HystrixCollapser.Scope
列舉類,有 REQUEST, GLOBAL 兩種選項,在 scope 為 REQUEST 時,hystrix 會為每個請求都建立一個 collapser, 此時你會發現 batch 方法執行時,傳入的請求數總為1。而且 REQUEST 項還是預設項,不明白這樣請求合併還有什麼意義; -
collapserProperties
, 在此選項內我們可以配置 hystrixCommand 的通用配置;
通用配置包括:
- maxRequestsInBatch, 構造批量請求時,使用的單個請求的最大數量;
- timerDelayInMilliseconds, 此選項配置 collapser 的 timer 執行緒多久會合並一次請求;
- requestCache.enabled, 配置提交請求時是否快取;
一個完整的配置如下:
@HystrixCollapser(
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})
BatchCollapser
設計
由於業務需求,我們並不太關心被合併請求的返回值,而且覺得 hystrix 保持那麼多的 Future 並沒有必要,於是自己實現了一個簡單的請求合併器,業務執行緒簡單地將請求放到一個容器裡,請求數累積到一定量或延遲了一定的時間,就取出容器內的資料統一發送給下游系統。
設計思想跟 hystrix 類似,合併器有一個欄位作為儲存請求的容器,且設定一個 timer 執行緒定時消費容器內的請求,業務執行緒將請求引數提交到合併 器的容器內。不同之處在於,業務執行緒將請求提交給容器後立即同步返回成功,不必管請求的消費結果,這樣便實現了時間維度上的合併觸發。
另外,我還添加了另外一個維度的觸發條件,每次將請求引數新增到容器後都會檢驗一下容器內請求的數量,如果數量達到一定的閾值,將在業務執行緒內合併執行一次。
由於有兩個維度會觸發合併,就不可避免會遇到執行緒安全問題。為了保證容器內的請求不會被多個執行緒重複消費或都漏掉,我需要一個容器能滿足以下條件:
- 是一種 Collection,類似於 ArrayList 或 Queue,可以存重複元素且有順序;
- 在多執行緒環境中能安全地將裡面的資料全取出來進行消費,而不用自己實現鎖。
java.util.concurrent
包內的 LinkedBlockingDeque 剛好符合要求,首先它實現了 BlockingDeque 介面,多執行緒環境下的存取操作是安全的;此外,它還提供 drainTo(Collection<? super E> c, int maxElements)
方法,可以將容器內 maxElements 個元素安全地取出來,放到 Collection c 中。
實現
以下是具體的程式碼實現:
public class BatchCollapser<E> implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();
private Handler<List<E>, Boolean> cleaner;
private long interval;
private int threshHold;
private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.clean();
} catch (Exception e) {
logger.error("clean container exception", e);
}
}, 0, interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {
batchContainer.add(event);
if (batchContainer.size() >= threshHold) {
clean();
}
}
private void clean() {
List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) {
return;
}
try {
cleaner.handle(transferList);
} catch (Exception e) {
logger.error("batch execute error, transferList:{}", transferList, e);
}
}
public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
Class jobClass = cleaner.getClass();
if (instance.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
if (instance.get(jobClass) == null) {
instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
}
}
}
return instance.get(jobClass);
}
}
以下程式碼內需要注意的點:
- 由於合併器的全域性性需求,需要將合併器實現為一個單例,另外為了提升它的通用性,內部使用使用 concurrentHashMap 和 double check 實現了一個簡單的單例工廠。
- 為了區分不同用途的合併器,工廠需要傳入一個實現了 Handler 的例項,通過例項的 class 來對請求進行分組儲存。
- 由於
java.util.Timer
的阻塞特性,一個 Timer 執行緒在阻塞時不會啟動另一個同樣的 Timer 執行緒,所以使用ScheduledExecutorService
定時啟動 Timer 執行緒。
ConcurrentHashMultiset
設計
上面介紹的請求合併都是將多個請求一次傳送,下游伺服器處理時本質上還是多個請求,最好的請求合併是在記憶體中進行,將請求結果簡單合併成一個傳送給下游伺服器。如我們經常會遇到的需求:元素分值累加或資料統計,就可以先在記憶體中將某一項的分值或資料累加起來,定時請求資料庫儲存。
Guava 內就提供了這麼一種資料結構:ConcurrentHashMultiset
,它不同於普通的 set 結構儲存相同元素時直接覆蓋原有元素,而是給每個元素保持一個計數 count, 插入重複時元素的 count 值加1。而且它在新增和刪除時並不加鎖也能保證執行緒安全,具體實現是通過一個 while(true)
迴圈嘗試操作,直到操作夠所需要的數量。
ConcurrentHashMultiset
這種排重計數的特性,非常適合資料統計這種元素在短時間內重複率很高的場景,經過排重後的數量計算,可以大大降低下游伺服器的壓力,即使重複率不高,能用少量的記憶體空間換取系統可用性的提高,也是很划算的。
實現
使用 ConcurrentHashMultiset
進行請求合併與使用普通容器在整體結構上並無太大差異,具體類似於:
if (ConcurrentHashMultiset.isEmpty()) {
return;
}
List<Request> transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {
int count = ConcurrentHashMultiset.count(request);
if (count <= 0) {
return;
}
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});
小結
最後總結一下各個技術適用的場景:
-
hystrix collapser
: 需要每個請求的結果,並且不在意每個請求的 cost 會增加; -
BatchCollapser
: 不在意請求的結果,需要請求合併能在時間和數量兩個維度上觸發; -
ConcurrentHashMultiset
:請求重複率很高的統計類場景;
另外,如果選擇自己來實現的話,完全可以將 BatchCollapser
和 ConcurrentHashMultiset
結合一下,在BatchCollapser 裡使用 ConcurrentHashMultiset
作為容器,這樣就可以結合兩者的優勢了。
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2022最新版)
4.別再寫滿屏的爆爆爆炸類了,試試裝飾器模式,這才是優雅的方式!!
覺得不錯,別忘了隨手點贊+轉發哦!