當Parallel遇上了DI - Spring並行資料聚合最佳實踐
分析淘寶PDP
讓我們先看個圖, Taobao的PDP(Product Detail Page)頁.
開啟Chrome Network面板, 讓我們來看taobao是怎麼載入這個頁面資料的. 根據經驗, 一般是非同步載入的, 要麼是XHR,要麼就是js(jsonp), 你應該很快可以找到
還能看到這個介面的效能
神奇的是, taobao竟然在一次請求中拉下了整個PDP頁的完整資料, 而且服務端處理耗時不到125ms
首先, 這麼做有什麼好處?
- 前後端開發對接簡單
- 在一次網路連線中儘可能多的傳輸資料(資料大小要不影響使用者體驗, 一般不會超過300kb), 減少建立連線的次數和請求頭浪費的流量.
然後, 這又是怎麼做到的呢?
你可能會說快取, 但你要知道, 這樣一個對電商極為重要的頁面, 絕對涉及到了非常多的團隊, 比如:
- 商品團隊
- 賣家團隊
- 評價團隊
- 訂單團隊
- 會員團隊
- 優惠團隊
- 問答團隊
- 推薦團隊
- 物流系統
- etc/等等
即使每個團隊的資料全都是快取的, 你一個個去拿, 要在125ms內拿完也不容易. 而且作為跟錢相關的頁面, 部分資料必須保證絕對實時有效, 能用快取的地方不多. 怎麼辦, 如果是你, 你會怎麼做? 離線打標? 資料預熱? etc..
此時, 並行呼叫不失為一種好辦法.
分析一下這個頁面, 你會發現, 每一個模組除了屬於同一個商品(入參相同), 其實各個模組的資料之間, 並沒有依賴性, 完全可以並行去獲取.
並行就沒有問題了嗎?
並行獲取資料, 可以提高我們的介面效能. 但也會引入一些問題, 如:
- 依賴的項可能很多, 怎麼使程式碼簡潔清晰?
- 依賴關係很可能是一個有向圖, 如果做到有向圖中的每個節點都可以並行執行?
- 非同步處理後, 超時怎麼處理? 業務程式碼丟擲異常了怎麼處理?
- 依賴關係如果有死迴圈怎麼辦?
- 非同步之後, ThreadLocal中的內容怎麼處理? 一些基於ThreadLocal實現的Context不work怎麼辦?
- 事務被執行緒隔離了怎麼辦?
- 如何監控每一次非同步執行, 每個節點的效能?
下面, 我們來討論下如何簡單\易用\高效的並行獲取資料; 如何解決上述非同步問題.
常見的並行方式
假如你現在需要使用者的基礎資訊\部落格列表\粉絲列表 3份資料. 哪麼你有哪些方式可以並行獲取呢?
Java ThreadPool並行
最簡單原始的辦法, 直接使用Java提供了的執行緒池和Future機制.
public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(3);
Future<User> userFuture = executorService.submit(() -> {
try{
return userService.get(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<Post>> postsFuture = executorService.submit(() -> {
try{
return postService.getPosts(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<User>> followersFuture = executorService.submit(() -> {
try{
return followService.getFollowers(userId);
}finally {
countDownLatch.countDown();
}
});
countDownLatch.await();
User user = userFuture.get();
user.setFollowers(followersFuture.get());
user.setPosts(postsFuture.get());
return user;
}
Spring的非同步並行
我們知道, Spring支援@Async註解, 可以方便的實現非同步, 並且支援獲取返回值. 參考: https://www.baeldung.com/spring-async#2-methods-with-return-type
@Async實現的原理實際是在Bean的代理類的方法中, 攔截方法呼叫, 向taskExecutor Bean中提交Callable任務. 原理跟自己用Java ThreadPool寫其實區別不大.
那麼要用Spring Async實現上述功能. 首先需要修改下面3個方法的返回值, 並且修改返回值型別, 併為方法新增 @Async註解
class UserServiceImpl implements UserService {
@Async
public Future<User> get(Long userId) {
// ... something
}
}
class PostServiceImpl implements PostService {
@Async
public Future<List<Post> getPosts(Long userId) {
// ... something
}
}
class FollowServiceImpl implements FollowService {
@Async
public Future<List<User> getFollowers(Long userId) {
// ... something
}
}
並行獲取3份使用者資料然後聚合, 程式碼如下:
public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
Future<User> userFuture = userService.get(userId);
Future<List<Post>> postsFuture = postService.getPosts(userId);
Future<List<User>> followersFuture = followService.getFollowers(userId);
User user = whileGet(userFuture);
user.setFollowers(whileGet(followersFuture));
user.setPosts(whileGet(postsFuture));
return user;
}
private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException {
while(true) {
if (future.isDone()) {
break;
}
}
return future.get();
}
這裡使用自旋去獲取非同步資料. 當然你也可以像前面那樣, 傳遞一個閉鎖(CountDownLatch)到Service中去, 然後讓主調執行緒在一個閉鎖上面等待.
並行結合DI(依賴注入)
上面2種方式的確能實現功能, 但首先, 它們都很不直觀, 而且沒有處理前面講到的非同步問題, 一旦出現超時\異常\ThreadLocal, 程式碼可能不會按照你預期的方式工作. 那有沒有更簡單方便可靠的方法呢?
試想這樣一種方式, 如果你需要的資料, 都可以通過方法入參自動並行獲取, 然後傳遞給你, 那是不是很方便? 就像這樣:
@Component
public class UserAggregate {
@DataProvider("userWithPosts")
public User userWithPosts(
@DataConsumer("user") User user,
@DataConsumer("posts") List<Post> posts,
@DataConsumer("followers") List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
}
這裡的@DataConsumer
聲明瞭你要非同步獲取的資料id. @DataProvider
聲明瞭這個方法提供資料, 並且id為userWithPosts.
或者你不想寫這樣一個Aggregate類, 你不需要複用, 你想直接建立一個"匿名Provider". 那麼你可以直接在任何地方像下面這樣呼叫拿結果
User user = dataBeanAggregateQueryFacade.get(
Collections.singletonMap("userId", 1L),
new Function3<User, List<Post>,List<User>, User>() {
@Override
public User apply(@DataConsumer("user") User user,
@DataConsumer("posts") List<Post> posts,
@DataConsumer("followers") List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
});
Assert.notNull(user,"user not null");
Assert.notNull(user.getPosts(),"user posts not null");
這裡的Function3接收4個泛型引數, 最後一個User表示返回值型別, 前3個引數依次對應apply方法的3個入參型別. 專案預定義了Function2-Function5, 支援不超過5個引數, 如果你需要更多引數, 可以編寫一個介面(FunctionInterface), 繼承MultipleArgumentsFunction介面即可.
很顯然
- 每一個
@DataConsumer
只會對應一個@DataProvider
. - 一個
@DataProvider
可能被多個@DataConsumer
消費 . - 一個
@DataProvider
通過多個@DataConsumer
依賴上多個@DataProvider
.
你不用care底層如何實現. 只有在你有定製化的需求時, 才去關心一些配置引數. 去擴充套件一些能力.
實現原理
- 在Spring啟動之時, 掃描應用中的
@DataProvider
和@DataConsumer
註解. 分析記錄下依賴關係(有向非連通圖), 並且記錄好@DataProvider
和Spring Bean的對映關係. - 當進行查詢時, 從已經記錄好的依賴關係中拿出依賴樹, 使用執行緒池和閉鎖(CountLatchDown), 遞迴非同步呼叫孩子節點對應的Bean方法, 拿到結果後作為入參注入當前節點 (近似廣度優先, 但因為並行的原因, 節點的訪問順序是不確定的).
- 在發起遞迴呼叫前, 傳入進一個map, 用來存放查詢引數, 方法中沒有
@DataConsumer
註解的入參, 將從此map中取值. @DataProvider
和@DataConsumer
註解可以支援一些引數, 用來控制超時時間\異常處理方式\是否冪等快取等等.
怎麼解決並行/非同步後引入的新問題
超時怎麼控制 ?
@DataProvider
註解支援 timeout
引數, 用來控制超時. 實現原理是通過閉鎖的超時等待方法.
java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)
異常怎麼處理 ?
對異常提供兩種處理方式: 吞沒或者向上層丟擲.
@DataConsumer
註解支援exceptionProcessingMethod
引數, 用來表示這個Consumer想怎麼處理Provider丟擲的異常.
當然, 也支援在全域性維度配置. 全域性配置的優先順序低於(<)Consumer配置的優先順序.
依賴關係有死迴圈怎麼辦 ?
Spring Bean初始化, 因為Bean建立和Bean屬性賦值分了兩步走, 因此可以用所謂的"早期引用"解決迴圈依賴的問題.
但如果你迴圈依賴的Bean, 依賴關係定義在建構函式入參上, 那麼是沒法解決迴圈依賴的問題的.
同理, 我們通過方法入參, 非同步注入依賴資料, 在方法入參沒有變化的情況下, 也是無法結束死迴圈的. 因此必須禁止迴圈依賴.
那麼問題變為了怎麼禁止迴圈依賴. 或者說, 怎麼檢測有向非聯通圖中的迴圈依賴, 兩個辦法:
- 帶染色的DFS遍歷: 節點入棧訪問前, 先標記節點狀態為"訪問中", 之後遞迴訪問孩子節點, 遞迴完成後, 將節點標記為"訪問完成". 如果在DFS遞迴過程中, 再次訪問到"訪問中"的節點, 說明有環.
- 拓撲排序: 把有向圖的節點排成一個序列, 不存在索引號較高的節點指向索引號較低的節點, 表示圖存在拓撲排序. 拓撲排序的實現方法是, 先刪除入度為0的節點, 並將領接節點的入度 - 1, 直到所有節點都被刪除. 很顯然, 如果有向圖中有環, 那麼環裡節點的入度不可能為0 , 那麼節點不可能刪完. 因此, 只要滿足節點未刪完 && 不存在入度為0的節點, 那麼一定有環.
這裡我們用領接表+DFS染色搜尋, 來實現環的檢查
private void checkCycle(Map<String,Set<String>> graphAdjMap) {
Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
if (visitStatusMap.containsKey(item.getKey())) {
continue;
}
dfs(graphAdjMap,visitStatusMap,item.getKey());
}
}
private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
if (visitStatusMap.containsKey(node)) {
if(visitStatusMap.get(node) == 1) {
List<String> relatedNodes = new ArrayList<>();
for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
if (item.getValue() == 1) {
relatedNodes.add(item.getKey());
}
}
throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes));
}
return ;
}
visitStatusMap.put(node,1);
log.info("visited:{}", node);
for (String relateNode : graphAdjMap.get(node)) {
dfs(graphAdjMap,visitStatusMap,relateNode);
}
visitStatusMap.put(node,2);
}
ThreadLocal怎麼處理?
許多的框架都使用了ThreadLocal來實現Context來儲存單次請求中的一些共享資料, Spring也不例外.
眾所周知, ThreadLocal實際是訪問Thread中一個特殊Map的入口. ThreadLocal只能訪問當前Thread的資料(副本), 如果跨越了執行緒, 是拿不到到其他ThreadLocalMap的資料的.
解決方法
如圖
- 在當前執行緒提交非同步任務前, 將當前執行緒ThreadLocal執行的資料"捆綁"到任務例項中
- 當任務開始執行時, 從任務例項中取出資料, 恢復到當前非同步執行緒的ThreadLocal中
- 當任務結束後, 清理當前非同步執行緒的ThreadLocal.
這裡, 我們先定義一個介面, 來描述這3個動作
public interface AsyncQueryTaskWrapper {
/**
* 任務提交之前執行. 此方法在提交任務的那個執行緒中執行
*/
void beforeSubmit();
/**
* 任務開始執行前執行. 此方法在非同步執行緒中執行
* @param taskFrom 提交任務的那個執行緒
*/
void beforeExecute(Thread taskFrom);
/**
* 任務執行結束後執行. 此方法在非同步執行緒中執行
* 注意, 不管使用者的方法丟擲何種異常, 此方法都會執行.
* @param taskFrom 提交任務的那個執行緒
*/
void afterExecute(Thread taskFrom);
}
為了讓我們定義的3個動作起作用. 我們需要重寫一下 java.util.concurrent.Callable#call方法.
public abstract class AsyncQueryTask<T> implements Callable<T> {
Thread taskFromThread;
AsyncQueryTaskWrapper asyncQueryTaskWrapper;
public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
this.taskFromThread = taskFromThread;
this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
}
@Override
public T call() throws Exception {
try {
if(asyncQueryTaskWrapper != null) {
asyncQueryTaskWrapper.beforeExecute(taskFromThread);
}
return execute();
} finally {
if (asyncQueryTaskWrapper != null) {
asyncQueryTaskWrapper.afterExecute(taskFromThread);
}
}
}
/**
* 提交任務時, 業務方實現這個替代方法
*
* @return
* @throws Exception
*/
public abstract T execute() throws Exception;
}
接下來, 向執行緒池提交任務時, 不再直接提交Callable匿名類例項, 而是提交AsyncQueryTask例項. 並且在提交前觸發 taskWrapper.beforeSubmit();
AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper();
// 任務提交前執行動作.
taskWrapper.beforeSubmit();
Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
@Override
public Object execute() throws Exception {
try {
// something to do
} finally {
stopDownLatch.countDown();
}
}
});
你要做什麼?
你只需要定義一個類, 實現這個介面, 並將這個類加到配置檔案中去.
@Slf4j
public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper {
/**
* "捆綁" 在任務例項中的資料
*/
private Long tenantId;
private User user;
@Override
public void beforeSubmit() {
/* 提交任務前, 先從當前執行緒拷貝出ThreadLocal中的資料到任務中 */
log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName());
this.tenantId = RequestContext.getTenantId();
this.user = ExampleAppContext.getUser();
}
@Override
public void beforeExecute(Thread taskFrom) {
/* 任務提交後, 執行前, 在非同步執行緒中用資料恢復ThreadLocal(Context) */
log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
RequestContext.setTenantId(tenantId);
ExampleAppContext.setLoggedUser(user);
}
@Override
public void afterExecute(Thread taskFrom) {
/* 任務執行完成後, 清理非同步執行緒中的ThreadLocal(Context) */
log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
RequestContext.removeTenantId();
ExampleAppContext.remove();
}
}
新增配置使TaskWapper生效.
io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper
怎麼監控每一次的非同步呼叫?
解決辦法
我們先把一次查詢, 分為以下幾個生命週期
- 查詢任務初次提交 (querySubmitted)
- 某一個Provider節點開始執行前 (queryBefore)
- 某一個Provider節點執行完成後 (queryAfter)
- 查詢全部完成 (queryFinished)
- 查詢異常 (exceptionHandle)
轉換成介面如下
public interface AggregateQueryInterceptor {
/**
* 查詢正常提交, Context已經建立
*
* @param aggregationContext 查詢上下文
* @return 返回為true才繼續執行
*/
boolean querySubmitted(AggregationContext aggregationContext) ;
/**
* 每個Provider方法執行前, 將呼叫此方法. 存在併發呼叫
*
* @param aggregationContext 查詢上下文
* @param provideDefinition 將被執行的Provider
*/
void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition);
/**
* 每個Provider方法執行成功之後, 呼叫此方法. 存在併發呼叫
*
* @param aggregationContext 查詢上下文
* @param provideDefinition 被執行的Provider
* @param result 查詢結果
* @return 返回結果, 如不修改不, 請直接返回引數中的result
*/
Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result);
/**
* 每個Provider執行時, 如果丟擲異常, 將呼叫此方法. 存在併發呼叫
*
* @param aggregationContext 查詢上下文
* @param provideDefinition 被執行的Provider
* @param e Provider丟擲的異常
*/
void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e);
/**
* 一次查詢全部完成.
*
* @param aggregationContext 查詢上下文
*/
void queryFinished(AggregationContext aggregationContext);
}
在Spring應用啟動之初, 獲取所有實現了AggregateQueryInterceptor介面的Bean, 並按照Order註解排序, 作為攔截器鏈.
至於攔截器如何執行. 很簡單, 在遞迴提交查詢任務時, 插入執行一些鉤子(hook)函式即可. 涉及到的程式碼很多, 就不貼在這裡, 感興趣的可以去github clone程式碼檢視.
你要做什麼?
你可以實現一個攔截器, 在攔截器中輸出日誌, 監控節點執行狀態(耗時, 出入參), 如下:
@Component
@Order(2)
@Slf4j
public class SampleAggregateQueryInterceptor implements AggregateQueryInterceptor {
@Override
public boolean querySubmitted(AggregationContext aggregationContext) {
log.info("begin query. root:{}",aggregationContext.getRootProvideDefinition().getMethod().getName());
return true;
}
@Override
public void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition) {
log.info("query before. provider:{}",provideDefinition.getMethod().getName());
}
@Override
public Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result) {
log.info("query after. provider:{},result:{}",provideDefinition.getMethod().getName(),result.toString());
return result;
}
@Override
public void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e) {
log.error(e.getMessage());
}
@Override
public void queryFinished(AggregationContext aggregationContext) {
log.info("query finish. root: {}",aggregationContext.getRootProvideDefinition().getMethod().getName());
}
}
專案地址
最後, 再次貼一下專案地址: . spring-boot-data-aggregator
歡迎拍磚, 歡迎star, 歡迎使