微服務優化之並行呼叫
微服務優化之並行呼叫
網際網路產品隨著使用者的增加,系統對服務的高效能、高可用、可伸縮、可擴充套件的支援,大都採用分散式RPC框架。然而隨著業務的增加,系統越來越多,系統之間的呼叫也越來越複雜,原本一個系統中一次請求就可以完成的工作,現在可能被分散在多個系統中,一次請求需要多個系統響應。這樣就會放大RPC呼叫延遲帶來的副作用,影響系統的高效能需求。
例如:一個RPC介面中需要依賴另外三個系統的RPC服務,各RPC服務的響應時間分別是20ms、10ms、10ms,那麼這個介面的對外系統依賴的耗時40ms。如果介面依賴越多,響應時間就會越長。
對此,需要在業務範圍內進行效能優化,優化思路總的來說有兩種:
第一:如果對RPC介面呼叫,不需要關心介面的返回值,那麼可以採用非同步RPC呼叫。
第二:如果依賴RPC介面返回值,並且連續呼叫的多個RPC之間沒有依賴關係,可以採用並行化處理。
本文主要分享一下通過並行化處理,來優化RPC介面響應時間,如上例子中的RPC採用並行呼叫,對外系統介面的依賴耗時會降低到20ms。
第一:因為Java對執行緒的使用非常方便,所以完成並行呼叫對於Java語言來說是相對簡單,根據依賴外部介面分別建立一個執行緒來呼叫就可以完成。
第二:那麼問題來:如果我的介面在完成其他介面呼叫後,還需要完成額外的功能而且需要依賴其他介面呼叫結果,該怎麼處理呢?Thread類通過join呼叫,可以讓主執行緒等待子執行緒處理結果。
第三:那麼問題又來了:子執行緒內部的異常無法在外部獲取,而需要依賴外部介面的呼叫結果的情況下,如果RPC介面丟擲異常,必須在主執行緒中獲取並作出相應處理,這個工作可以通過FutureTask來完成。
第四:那麼問題又來了:如果一個介面依賴十個外部系統,那麼每次請求就需要建立十個執行緒,隨著介面TPS增加,系統建立執行緒和銷燬的執行緒耗費的資源越來越高,這個時候需要考慮採用執行緒池方案了。
第五:那麼問題又來了:以上例項只是一個單應用的測試Demo,真實應用情況下如上這樣在程式碼中建立執行緒池並沒太大意義,應該建立全域性的執行緒池,所有請求共用執行緒池才能達到執行緒資源共用。但是Executors中執行緒池都預設採用AbortPolicy 的拒絕策略,在高併發情況下,就會頻繁出現的執行緒池拒絕服務異常。此時可以考慮自定義執行緒池,採用CallerRunsPolicy拒絕策略,在高併發量,當執行緒池無法提供服務的情況下,採用主執行緒自己建立執行緒,達到併發量和計算資源的最優協調。
第六:完成以上操作就可以完美了嗎?然而情況並非如此,如果細心測試發現,如果其中一個介面丟擲異常時,主執行緒就結束了,而其他還沒有執行結束的子執行緒將繼續執行,一開始我們通過Thread.join()來協調主子執行緒的先後順序,而現在採用執行緒池,無法在獲取執行緒並且呼叫join方法,而是採用FutureTask.get()來協調先後順序,那麼還可以採用哪些方式保證主執行緒最後結束呢?此時可以採用一些特有的併發工具,如:閉鎖,柵欄,訊號量。如下為網路摘抄的三個工具對比:
閉鎖(CountDownLatch) |
類似於門。門初始是關閉的,試圖進門的執行緒掛起等待開門。當負責開門程序將門開啟後,所有等待執行緒被喚醒。 門一旦開啟就不能再關閉了。 |
CountDownLatch(int n):指定閉鎖計數器 await() :掛起等待閉鎖計數器為0 countDown():閉鎖計數器減1 |
柵欄(CyclicBarrier) |
和閉鎖有類似之處。閉鎖是等待“開門”事件;柵欄是等待其他執行緒。例如有N個執行緒檢視通過柵欄,此時先到的要等待,直到所有執行緒到到達後,柵欄開啟,所有等待執行緒被喚醒通過柵欄。 |
CyclicBarrier(int n):需要等待的執行緒數量 await():掛起等待達到執行緒數量 |
訊號量(Semaphore) |
和鎖的作用類似。區別是鎖只允許被一個執行緒獲取,但是訊號量可以設定資源數量。當沒有可用資源時,才被掛起等待。 |
Semaphore(int n):指定初始的資源數量 acquire():試圖獲取資源。當沒有可用資源時掛起 release():釋放一個資源 |
本文采用柵欄完成例項程式碼如下:
package com.halfworlders.test.domo;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.halfworlders.test.exp.AppException;
import com.halfworlders.test.impl.ServiceImpl;
import com.halfworlders.test.intf.ServiceInterface;
publicclass App {
/**
* 外介面總數
*/
privatestaticfinalintINTERFACE_COUNT = 10;
ExecutorService executorService = new ThreadPoolExecutor(INTERFACE_COUNT, INTERFACE_COUNT*3, 10L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), newThreadPoolExecutor.CallerRunsPolicy());
publicstaticvoid main(String[] args) {
longstart = System.currentTimeMillis();
App test = new App();
test.test();
longend = System.currentTimeMillis();
System.out.println("總耗時:"+(end-start)+"ms");
}
@SuppressWarnings("unchecked")
publicvoid test() {
final CyclicBarrier cb = new CyclicBarrier(INTERFACE_COUNT + 1);
final ServiceInterface[] services = assembles();
final FutureTask<Integer>[] futureTasks = new FutureTask[INTERFACE_COUNT];
for (inti = 0; i < INTERFACE_COUNT; i++) {
final Integer fi = i;
futureTasks[i] = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
try {
returnservices[fi].service();
} finally {
cb.await();
}
}
});
executorService.submit(futureTasks[i]);
}
String serviceName = null;
try {
// 開啟柵欄
cb.await();
// 如果有其他系統呼叫異常,則將該異常向外層丟擲
for (inti = 0; i < INTERFACE_COUNT; i++) {
serviceName = services[i].getName();
futureTasks[i].get();
}
} catch (Exception e) {
if ((einstanceof ExecutionException) && (e.getCause() instanceof AppException)) {
throw (AppException) e.getCause();
} else {
thrownew RuntimeException(serviceName+"系統異常", e);
}
}
}
private ServiceInterface[] assembles(){
ServiceInterface[] service = new ServiceInterface[INTERFACE_COUNT];
for (inti = 0; i < INTERFACE_COUNT; i++) {
service[i] = new ServiceImpl("介面"+i);
}
returnservice;
}
}