1. 程式人生 > >當Parallel遇上了DI - Spring並行資料聚合最佳實踐

當Parallel遇上了DI - Spring並行資料聚合最佳實踐

分析淘寶PDP

讓我們先看個圖, Taobao的PDP(Product Detail Page)頁.

開啟Chrome Network面板, 讓我們來看taobao是怎麼載入這個頁面資料的. 根據經驗, 一般是非同步載入的, 要麼是XHR,要麼就是js(jsonp), 你應該很快可以找到

還能看到這個介面的效能

神奇的是, taobao竟然在一次請求中拉下了整個PDP頁的完整資料, 而且服務端處理耗時不到125ms

首先, 這麼做有什麼好處?

  • 前後端開發對接簡單
  • 在一次網路連線中儘可能多的傳輸資料(資料大小要不影響使用者體驗, 一般不會超過300kb), 減少建立連線的次數和請求頭浪費的流量.

然後, 這又是怎麼做到的呢?

你可能會說快取, 但你要知道, 這樣一個對電商極為重要的頁面, 絕對涉及到了非常多的團隊, 比如:

  • 商品團隊
  • 賣家團隊
  • 評價團隊
  • 訂單團隊
  • 會員團隊
  • 優惠團隊
  • 問答團隊
  • 推薦團隊
  • 物流系統
  • etc/等等

即使每個團隊的資料全都是快取的, 你一個個去拿, 要在125ms內拿完也不容易. 而且作為跟錢相關的頁面, 部分資料必須保證絕對實時有效, 能用快取的地方不多. 怎麼辦, 如果是你, 你會怎麼做? 離線打標? 資料預熱? etc..

此時, 並行呼叫不失為一種好辦法.

分析一下這個頁面, 你會發現, 每一個模組除了屬於同一個商品(入參相同), 其實各個模組的資料之間, 並沒有依賴性, 完全可以並行去獲取.

並行就沒有問題了嗎?

並行獲取資料, 可以提高我們的介面效能. 但也會引入一些問題, 如:

  • 依賴的項可能很多, 怎麼使程式碼簡潔清晰?
  • 依賴關係很可能是一個有向圖, 如果做到有向圖中的每個節點都可以並行執行?
  • 非同步處理後, 超時怎麼處理? 業務程式碼丟擲異常了怎麼處理?
  • 依賴關係如果有死迴圈怎麼辦?
  • 非同步之後, ThreadLocal中的內容怎麼處理? 一些基於ThreadLocal實現的Context不work怎麼辦?
  • 事務被執行緒隔離了怎麼辦?
  • 如何監控每一次非同步執行, 每個節點的效能?

下面, 我們來討論下如何簡單\易用\高效的並行獲取資料; 如何解決上述非同步問題.

常見的並行方式

假如你現在需要使用者的基礎資訊\部落格列表\粉絲列表 3份資料. 哪麼你有哪些方式可以並行獲取呢?

Java ThreadPool並行

最簡單原始的辦法, 直接使用Java提供了的執行緒池和Future機制.

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CountDownLatch countDownLatch = new CountDownLatch(3);
    Future<User> userFuture = executorService.submit(() -> {
        try{
            return userService.get(userId);
        }finally {
            countDownLatch.countDown();
        }
    });
    Future<List<Post>> postsFuture = executorService.submit(() -> {
        try{
            return postService.getPosts(userId);
        }finally {
            countDownLatch.countDown();
        }
    });
    Future<List<User>> followersFuture = executorService.submit(() -> {
        try{
            return followService.getFollowers(userId);
        }finally {
            countDownLatch.countDown();
        }
    });
    countDownLatch.await();
    User user = userFuture.get();
    user.setFollowers(followersFuture.get());
    user.setPosts(postsFuture.get());
    return user;
}

Spring的非同步並行

我們知道, Spring支援@Async註解, 可以方便的實現非同步, 並且支援獲取返回值. 參考: https://www.baeldung.com/spring-async#2-methods-with-return-type

@Async實現的原理實際是在Bean的代理類的方法中, 攔截方法呼叫, 向taskExecutor Bean中提交Callable任務. 原理跟自己用Java ThreadPool寫其實區別不大.

那麼要用Spring Async實現上述功能. 首先需要修改下面3個方法的返回值, 並且修改返回值型別, 併為方法新增 @Async註解

class UserServiceImpl implements UserService {
    @Async
    public Future<User> get(Long userId) {
        // ... something
    }
}
class PostServiceImpl implements PostService {
    @Async
    public Future<List<Post> getPosts(Long userId) {
        // ... something
    }
}
class FollowServiceImpl implements FollowService {
    @Async
    public Future<List<User> getFollowers(Long userId) {
        // ... something
    }
}

並行獲取3份使用者資料然後聚合, 程式碼如下:

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
    Future<User> userFuture = userService.get(userId);
    Future<List<Post>> postsFuture = postService.getPosts(userId);
    Future<List<User>> followersFuture = followService.getFollowers(userId);
    
    User user = whileGet(userFuture);
    user.setFollowers(whileGet(followersFuture));
    user.setPosts(whileGet(postsFuture));
    return user;
}

private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException {
    while(true) {
        if (future.isDone()) {
            break;
        }
    }
    return future.get();
}

這裡使用自旋去獲取非同步資料. 當然你也可以像前面那樣, 傳遞一個閉鎖(CountDownLatch)到Service中去, 然後讓主調執行緒在一個閉鎖上面等待.

並行結合DI(依賴注入)

上面2種方式的確能實現功能, 但首先, 它們都很不直觀, 而且沒有處理前面講到的非同步問題, 一旦出現超時\異常\ThreadLocal, 程式碼可能不會按照你預期的方式工作. 那有沒有更簡單方便可靠的方法呢?

試想這樣一種方式, 如果你需要的資料, 都可以通過方法入參自動並行獲取, 然後傳遞給你, 那是不是很方便? 就像這樣:

@Component
public class UserAggregate {
    @DataProvider("userWithPosts")
    public User userWithPosts(
            @DataConsumer("user") User user,
            @DataConsumer("posts") List<Post> posts,
            @DataConsumer("followers") List<User> followers) {
        user.setPosts(posts);
        user.setFollowers(followers);
        return user;
    }
}

這裡的@DataConsumer聲明瞭你要非同步獲取的資料id. @DataProvider聲明瞭這個方法提供資料, 並且id為userWithPosts.

或者你不想寫這樣一個Aggregate類, 你不需要複用, 你想直接建立一個"匿名Provider". 那麼你可以直接在任何地方像下面這樣呼叫拿結果

User user = dataBeanAggregateQueryFacade.get(
     Collections.singletonMap("userId", 1L), 
     new Function3<User, List<Post>,List<User>, User>() {
            @Override
            public User apply(@DataConsumer("user") User user, 
                              @DataConsumer("posts") List<Post> posts,
                              @DataConsumer("followers") List<User> followers) {
                user.setPosts(posts);
                user.setFollowers(followers);
                return user;
            }
     });
Assert.notNull(user,"user not null");
Assert.notNull(user.getPosts(),"user posts not null");

這裡的Function3接收4個泛型引數, 最後一個User表示返回值型別, 前3個引數依次對應apply方法的3個入參型別. 專案預定義了Function2-Function5, 支援不超過5個引數, 如果你需要更多引數, 可以編寫一個介面(FunctionInterface), 繼承MultipleArgumentsFunction介面即可.

很顯然

  • 每一個 @DataConsumer 只會對應一個 @DataProvider .
  • 一個 @DataProvider 可能被多個 @DataConsumer 消費 .
  • 一個 @DataProvider 通過多個 @DataConsumer 依賴上多個 @DataProvider.
**現在, 就有這樣一個專案, 實現了上述功能. 只需要在你的方法上, 新增一些註解. 就可以迅速地讓你的呼叫樹轉為並行.** **專案地址:** https://github.com/lvyahui8/spring-boot-data-aggregator

你不用care底層如何實現. 只有在你有定製化的需求時, 才去關心一些配置引數. 去擴充套件一些能力.

實現原理

  1. 在Spring啟動之時, 掃描應用中的 @DataProvider@DataConsumer 註解. 分析記錄下依賴關係(有向非連通圖), 並且記錄好@DataProvider和Spring Bean的對映關係.
  2. 當進行查詢時, 從已經記錄好的依賴關係中拿出依賴樹, 使用執行緒池和閉鎖(CountLatchDown), 遞迴非同步呼叫孩子節點對應的Bean方法, 拿到結果後作為入參注入當前節點 (近似廣度優先, 但因為並行的原因, 節點的訪問順序是不確定的).
  3. 在發起遞迴呼叫前, 傳入進一個map, 用來存放查詢引數, 方法中沒有@DataConsumer註解的入參, 將從此map中取值.
  4. @DataProvider@DataConsumer 註解可以支援一些引數, 用來控制超時時間\異常處理方式\是否冪等快取等等.

怎麼解決並行/非同步後引入的新問題

超時怎麼控制 ?

@DataProvider 註解支援 timeout 引數, 用來控制超時. 實現原理是通過閉鎖的超時等待方法.

java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)

異常怎麼處理 ?

對異常提供兩種處理方式: 吞沒或者向上層丟擲.

@DataConsumer 註解支援exceptionProcessingMethod 引數, 用來表示這個Consumer想怎麼處理Provider丟擲的異常.

當然, 也支援在全域性維度配置. 全域性配置的優先順序低於(<)Consumer配置的優先順序.

依賴關係有死迴圈怎麼辦 ?

Spring Bean初始化, 因為Bean建立和Bean屬性賦值分了兩步走, 因此可以用所謂的"早期引用"解決迴圈依賴的問題.

但如果你迴圈依賴的Bean, 依賴關係定義在建構函式入參上, 那麼是沒法解決迴圈依賴的問題的.

同理, 我們通過方法入參, 非同步注入依賴資料, 在方法入參沒有變化的情況下, 也是無法結束死迴圈的. 因此必須禁止迴圈依賴.

那麼問題變為了怎麼禁止迴圈依賴. 或者說, 怎麼檢測有向非聯通圖中的迴圈依賴, 兩個辦法:

  • 帶染色的DFS遍歷: 節點入棧訪問前, 先標記節點狀態為"訪問中", 之後遞迴訪問孩子節點, 遞迴完成後, 將節點標記為"訪問完成". 如果在DFS遞迴過程中, 再次訪問到"訪問中"的節點, 說明有環.
  • 拓撲排序: 把有向圖的節點排成一個序列, 不存在索引號較高的節點指向索引號較低的節點, 表示圖存在拓撲排序. 拓撲排序的實現方法是, 先刪除入度為0的節點, 並將領接節點的入度 - 1, 直到所有節點都被刪除. 很顯然, 如果有向圖中有環, 那麼環裡節點的入度不可能為0 , 那麼節點不可能刪完. 因此, 只要滿足節點未刪完 && 不存在入度為0的節點, 那麼一定有環.

這裡我們用領接表+DFS染色搜尋, 來實現環的檢查

private void checkCycle(Map<String,Set<String>> graphAdjMap) {
    Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
    for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
        if (visitStatusMap.containsKey(item.getKey())) {
            continue;
        }
        dfs(graphAdjMap,visitStatusMap,item.getKey());
    }
}

private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
    if (visitStatusMap.containsKey(node)) {
        if(visitStatusMap.get(node) == 1) {
            List<String> relatedNodes = new ArrayList<>();
            for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
                if (item.getValue() == 1) {
                    relatedNodes.add(item.getKey());
                }
            }
            throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes));
        }
        return ;
    }
    visitStatusMap.put(node,1);
    log.info("visited:{}", node);
    for (String relateNode : graphAdjMap.get(node)) {
        dfs(graphAdjMap,visitStatusMap,relateNode);
    }
    visitStatusMap.put(node,2);
}

ThreadLocal怎麼處理?

許多的框架都使用了ThreadLocal來實現Context來儲存單次請求中的一些共享資料, Spring也不例外.

眾所周知, ThreadLocal實際是訪問Thread中一個特殊Map的入口. ThreadLocal只能訪問當前Thread的資料(副本), 如果跨越了執行緒, 是拿不到到其他ThreadLocalMap的資料的.

解決方法

如圖

  1. 在當前執行緒提交非同步任務前, 將當前執行緒ThreadLocal執行的資料"捆綁"到任務例項中
  2. 當任務開始執行時, 從任務例項中取出資料, 恢復到當前非同步執行緒的ThreadLocal中
  3. 當任務結束後, 清理當前非同步執行緒的ThreadLocal.

這裡, 我們先定義一個介面, 來描述這3個動作

public interface AsyncQueryTaskWrapper {
    /**
     * 任務提交之前執行. 此方法在提交任務的那個執行緒中執行
     */
    void beforeSubmit();

    /**
     * 任務開始執行前執行. 此方法在非同步執行緒中執行
     * @param taskFrom 提交任務的那個執行緒
     */
    void beforeExecute(Thread taskFrom);

    /**
     * 任務執行結束後執行. 此方法在非同步執行緒中執行
     * 注意, 不管使用者的方法丟擲何種異常, 此方法都會執行.
     * @param taskFrom 提交任務的那個執行緒
     */
    void afterExecute(Thread taskFrom);
}

為了讓我們定義的3個動作起作用. 我們需要重寫一下 java.util.concurrent.Callable#call方法.

public abstract class AsyncQueryTask<T> implements Callable<T> {
    Thread      taskFromThread;
    AsyncQueryTaskWrapper asyncQueryTaskWrapper;

    public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
        this.taskFromThread = taskFromThread;
        this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
    }

    @Override
    public T call() throws Exception {
        try {
            if(asyncQueryTaskWrapper != null) {
                asyncQueryTaskWrapper.beforeExecute(taskFromThread);
            }
            return execute();
        } finally {
            if (asyncQueryTaskWrapper != null) {
                asyncQueryTaskWrapper.afterExecute(taskFromThread);
            }
        }
    }

    /**
     * 提交任務時, 業務方實現這個替代方法
     *
     * @return
     * @throws Exception
     */
    public abstract T  execute() throws Exception;
}

接下來, 向執行緒池提交任務時, 不再直接提交Callable匿名類例項, 而是提交AsyncQueryTask例項. 並且在提交前觸發 taskWrapper.beforeSubmit();

AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper();
// 任務提交前執行動作.
taskWrapper.beforeSubmit();
Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
    @Override
    public Object execute() throws Exception {
        try {
            // something to do
        } finally {
            stopDownLatch.countDown();
        }
    }
});
你要做什麼?

你只需要定義一個類, 實現這個介面, 並將這個類加到配置檔案中去.

@Slf4j
public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper {
    /**
     * "捆綁" 在任務例項中的資料
     */
    private Long tenantId;
    private User user;

    @Override
    public void beforeSubmit() {
        /* 提交任務前, 先從當前執行緒拷貝出ThreadLocal中的資料到任務中 */
        log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName());
        this.tenantId = RequestContext.getTenantId();
        this.user = ExampleAppContext.getUser();
    }

    @Override
    public void beforeExecute(Thread taskFrom) {
        /* 任務提交後, 執行前, 在非同步執行緒中用資料恢復ThreadLocal(Context) */
        log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
        RequestContext.setTenantId(tenantId);
        ExampleAppContext.setLoggedUser(user);
    }

    @Override
    public void afterExecute(Thread taskFrom) {
        /* 任務執行完成後, 清理非同步執行緒中的ThreadLocal(Context) */
        log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
        RequestContext.removeTenantId();
        ExampleAppContext.remove();
    }
}

新增配置使TaskWapper生效.

io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper

怎麼監控每一次的非同步呼叫?

解決辦法

我們先把一次查詢, 分為以下幾個生命週期

  • 查詢任務初次提交 (querySubmitted)
  • 某一個Provider節點開始執行前 (queryBefore)
  • 某一個Provider節點執行完成後 (queryAfter)
  • 查詢全部完成 (queryFinished)
  • 查詢異常 (exceptionHandle)

轉換成介面如下

public interface AggregateQueryInterceptor {
    /**
     * 查詢正常提交, Context已經建立
     *
     * @param aggregationContext 查詢上下文
     * @return 返回為true才繼續執行
     */
    boolean querySubmitted(AggregationContext aggregationContext) ;

    /**
     * 每個Provider方法執行前, 將呼叫此方法. 存在併發呼叫
     *
     * @param aggregationContext 查詢上下文
     * @param provideDefinition 將被執行的Provider
     */
    void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition);

    /**
     * 每個Provider方法執行成功之後, 呼叫此方法. 存在併發呼叫
     *
     * @param aggregationContext 查詢上下文
     * @param provideDefinition 被執行的Provider
     * @param result 查詢結果
     * @return 返回結果, 如不修改不, 請直接返回引數中的result
     */
    Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result);

    /**
     * 每個Provider執行時, 如果丟擲異常, 將呼叫此方法. 存在併發呼叫
     *
     * @param aggregationContext  查詢上下文
     * @param provideDefinition 被執行的Provider
     * @param e Provider丟擲的異常
     */
    void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e);

    /**
     * 一次查詢全部完成.
     *
     * @param aggregationContext 查詢上下文
     */
    void queryFinished(AggregationContext aggregationContext);
}

在Spring應用啟動之初, 獲取所有實現了AggregateQueryInterceptor介面的Bean, 並按照Order註解排序, 作為攔截器鏈.

至於攔截器如何執行. 很簡單, 在遞迴提交查詢任務時, 插入執行一些鉤子(hook)函式即可. 涉及到的程式碼很多, 就不貼在這裡, 感興趣的可以去github clone程式碼檢視.

你要做什麼?

你可以實現一個攔截器, 在攔截器中輸出日誌, 監控節點執行狀態(耗時, 出入參), 如下:

@Component
@Order(2)
@Slf4j
public class SampleAggregateQueryInterceptor implements AggregateQueryInterceptor {
    @Override
    public boolean querySubmitted(AggregationContext aggregationContext) {
        log.info("begin query. root:{}",aggregationContext.getRootProvideDefinition().getMethod().getName());
        return true;
    }

    @Override
    public void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition) {
        log.info("query before. provider:{}",provideDefinition.getMethod().getName());
    }

    @Override
    public Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result) {
        log.info("query after. provider:{},result:{}",provideDefinition.getMethod().getName(),result.toString());
        return result;
    }

    @Override
    public void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e) {
        log.error(e.getMessage());
    }

    @Override
    public void queryFinished(AggregationContext aggregationContext) {
        log.info("query finish. root: {}",aggregationContext.getRootProvideDefinition().getMethod().getName());
    }
}

專案地址

最後, 再次貼一下專案地址: . spring-boot-data-aggregator

歡迎拍磚, 歡迎star, 歡迎使