1. 程式人生 > 程式設計 >【實戰演練】用手寫一個騷氣的請求合併,演繹底層的真實

【實戰演練】用手寫一個騷氣的請求合併,演繹底層的真實


  • 冬來冬往,越來越喜歡軟綿綿的被窩了。a
  • 厭倦了夏天的酷熱,秋天的浮躁,來一首冬天的安靜。

一、伺服器崩潰的思考

老闆說,他要做個現場營銷活動,線上線下都要參與推廣,這個活動參與人數可能很大哦···
果然,由於不是我寫的程式碼,所以那天伺服器就崩了,崩的時候很安靜,寫程式碼的那個人一個人走的,走的時候很安詳。

?: 當請求量到達百萬級時候,為啥會崩潰呢?

微服務中是通過介面去向服務提供者發起http請求或者rpc(tcp)請求去獲取資料,事實上大量請求中,服務端能處理的請求數量有限,服務中充斥著大量的執行緒,以及資料庫等的連線也會被佔用完,導致請求響應速度也越來越慢。

?:

  • 響應速度和我們的資料層有關係嗎?
  • 能不能去新增服務端伺服器呢?
  • 如果能減少客戶端向服務端的請求就好了?
  • 限流嗎?當前場景能限流嗎?
  • 每個執行緒去查詢資料,每次都只查詢某一個結果,是不是太浪費了?
  • 我們能不能想辦法,提升我們系統的呼叫效能?

二、有人想看請求合併,今天她來了

上面的一些思路可以用加快取,加MQ的方式去解決。但是快取有限,MQ是個佇列,有限流的效果。那麼,如何才能提高系統的呼叫能力,我們學習一下,請求合併,結果分發。

  • 正常的請求都是一個請求一個執行緒,到後臺觸發相關的業務需求,去呼叫資料獲取業務。
  • 當請求合併後,我們要將多個多個請求合併後統一去批量去呼叫。

大概的設計思路便是如下圖所示:

  1. 常規請求

    常規請求
    常規請求
  2. 請求合併

  3. 說下我們的思路

  • 解決請求呼叫多,比如呼叫商品資料,經過的服務多,呼叫鏈很長,所以查詢資料庫的次數也就非常多,資料庫連線池很快就被用光,導致很多請求被阻塞,也導致應用整體執行緒數非常高。雖然通過增加資料庫連線池大小可以緩解問題,並且可以通過壓力測試,但這治標不治本。
  • 查詢商品資訊的時候,如果同一商品同一時刻有100個請求,那麼其中的99次查詢是多餘的,可以把100個請求合併成一個真實的後臺介面呼叫,只要控制好執行緒安全即可。我的想法是使用併發計數器來實現再配合本地快取,計數器可直接用JDK提供的`AtomicInteger`,執行緒安全又提供原子操作。
  • 以獲取商品資訊為例,每個商品id對應一個計數器,計數器初始值預設是0,當一個請求過來後通過`incrementAndGet()`使計數器自增1並返回自增後的值。當該值等於1,表明該執行緒在這個時間點上是第一個到達的執行緒,然後就去呼叫真實的業務邏輯,在查詢到結果後放入到本地快取中。當該值大於1的時候,表明之前已有執行緒正在呼叫業務邏輯,則進入等待狀態,並迴圈的查詢本地快取中是否已有資料可用。獲取到結果後都呼叫`decrementAndGet()`使計數器減1,計數器被減到0的時候就回到了初始狀態,並且當減到0(代表最後一個執行緒)時清除快取。
  • 那還有在1000次請求中,請求的資料id不同,但是使用的服務介面相同,都是查詢商品庫的商品`id`從`1~1000`的資料,都是從表裡面查詢,`queryDataById(dataId)`,那我也可以合併這些請求,改為批量查詢,然後將資料分發返回。思路就是設計每個請求攜帶一個請求`唯一的traceId`,有點像鏈路跟蹤的感覺,簡單點可以使用查詢的id進行最為跟蹤id,將請求放入一個隊中,使用定時任務,比如每隔10ms去掃描佇列,將這些業務合併請求統一去請求資料庫層。
  • 此方案有個資料延遲的地方,就是每次迴圈時的等待狀態的時間。因為一次包含多次查庫的業務呼叫,耗時基本都在幾十毫秒,甚至是上百毫秒,可以把該等待睡眠設定小一點,比如10毫秒。這樣即不會浪費CPU時間,實時性也比較高,但然也可以通過主動喚醒等待執行緒的方式,這個操作起來就比較複雜些。在這其中還可以新增一些異常處理、超時控制、最大重試次數,最大併發數(超時最大併發數就快速失敗)等。

三、開始演練

  • 模擬一個遠端呼叫介面
 1import org.springframework.stereotype.Service;
2
3import java.util.*;
4
5/**
6 * 模擬遠端呼叫ShopData介面
7 * @author Lijing
8 */

9@Service
10public class QueryServiceRemoteCall {
11
12    13     * 呼叫遠端的商品資訊查詢介面
14     *
15     * @param code 商品編碼
16     * @return 返回商品資訊,map格式
17     */
18    public HashMap<String, Object> queryShopDataInfoByCode(String code) {
19        try {
20            Thread.sleep(50L);
21        } catch (InterruptedException e) {
22            e.printStackTrace();
23        }
24        HashMap<String, Object> hashMap = new HashMap<>();
25        hashMap.put("shopDataId"new Random().nextInt(999999999));
26        hashMap.put("code", code);
27        hashMap.put("name""小玩具");
28        hashMap.put("isOk",112); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"true");
29        hashMap.put("price","3000");
30        return hashMap;
31    }
32
33    34     * 批量查詢 - 呼叫遠端的商品資訊查詢介面
35     *
36     * @param codes 多個商品編碼
37     * @return 返回多個商品資訊
38     */
39    public List<Map<String, Object>> queryShopDataInfoByCodeBatch(List<String> codes) {
40        List<Map<String, Object>> result = new ArrayList<>();
41        for (String code : codes) {
42            HashMap<String,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number">43            hashMap.put(44            hashMap.put(45            hashMap.put("棉花糖");
46            hashMap.put(47            hashMap.put("6000");
48            result.add(hashMap);
49        }
50        return result;
51    }
52}
複製程式碼
  • 使用CountDownLatch模擬併發請求的公共測試類
 1@RunWith(SpringRunner.class)
2@SpringBootTest(classes = MyBotApplication.class)
MergerApplicationTests {
5    long timed = 0L;
6
7    @Before
8    public void start() {
9        System.out.println("開始測試");
10        timed = System.currentTimeMillis();
11    }
12
13    @After
14    end15        System.out.println("結束測試,執行時長:" + (System.currentTimeMillis() - timed));
16    }
17
18    // 模擬的請求數量
19    private static final int THREAD_NUM = 1000;
20
21    // 倒計數器 juc包中常用工具類
22    private CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
23
24    @Autowired
25    private ShopDataService shopDataService;
26
27    @Test
28    simulateCall() throws IOException {
29        // 建立 並不是馬上發起請求
for (int i = 0; i < THREAD_NUM; i++) {
31            final String code = "code-" + (i + 1);
32            // 多執行緒模擬使用者查詢請求
33            Thread thread = new Thread(() -> {
34                35                    // 程式碼在這裡等待,等待countDownLatch為0,代表所有執行緒都start,再執行後續的程式碼
36                    countDownLatch.await();
37                    // 模擬 http請求,實際上就是多執行緒呼叫這個方法
38                    Map<String, Object> result = shopDataService.queryData(code);
39                    System.out.println(Thread.currentThread().getName() + " 查詢結束,結果是:" + result);
40                } catch (Exception e) {
41                    System.out.println(Thread.currentThread().getName() + " 執行緒執行出現異常:" + e.getMessage());
42                }
43            });
44            thread.setName("price-thread-" + code);
45            thread.start();
46            // 啟動後,倒計時器倒計數 減一,代表又有一個執行緒就緒了
47            countDownLatch.countDown();
48        }
49
50        System.in.read();
52
53}
複製程式碼
  • 先來個普通呼叫演示
 1 2 * 商品資料服務類
3 * @author lijing
4 */
5 6ShopDataService {
8    QueryServiceRemoteCall queryServiceRemoteCall;
9
10    // 1000 使用者請求,1000個執行緒
11    public Map<String,45); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-title">queryData(String shopDataId) throws ExecutionException, InterruptedException {
12         return queryServiceRemoteCall.queryShopDataInfoByCode(shopDataId);
13    }
14}
複製程式碼
  • 查詢結果展示
 1開始測試
2price-thread-code-3 查詢結束,結果是:{code=code-3, shopDataId=165800794, price=3000, isOk=true, name=小玩具}
3price-thread-code-994 查詢結束,結果是:{code=code-994, shopDataId=735455508,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number"> 4price-thread-code-36 查詢結束,結果是:{code=code-36, shopDataId=781610507,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number"> 5price-thread-code-993 查詢結束,結果是:{code=code-993, shopDataId=231087525,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number"> 7....... 省略程式碼中。。。。
8
9price-thread-code-25 查詢結束,結果是:{code=code-25, shopDataId=149193873,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number">10price-thread-code-2 查詢結束,結果是:{code=code-2, shopDataId=324877405,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number">12.......共計1000次的查詢結果
13
14結束測試,執行時長:150
複製程式碼
  • 那麼我們發現我們可以用code作為一個追蹤traceId,然後使用ScheduledExecutorService,CompletableFuture,248);">LinkedBlockingQueue等一些多執行緒技術,就可以實現這個請求合併,請求分發的簡單實現demo.
import javax.annotation.PostConstruct;
2import java.util.ArrayList;
import java.util.HashMap;
4import java.util.List;
5import java.util.Map;
import java.util.concurrent.*;
7
8 9 * 商品資料服務類
10 *
11 * @author lijing
12 */
131415
16    Request {
17        String shopDataId;
18        CompletableFuture<Map<String, Object>> completableFuture;
19    }
// 集合,積攢請求,每N毫秒處理
22    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
@PostConstruct
init26        ScheduledExecutorService scheduledExecutorPool = Executors.newScheduledThreadPool(5);
27        scheduledExecutorPool.scheduleAtFixedRate(() -> {
28            // TODO 取出所有queue的請求,生成一次批量查詢
29            int size = queue.size();
30            if (size == 0) {
31                return;
32            }
33            System.out.println("此次合併了多少請求:" + size);
34            // 1、 取出
35            ArrayList<Request> requests = 36            ArrayList<String> shopDataIds = 37            0; i < size; i++) {
38                Request request = queue.poll();
39                requests.add(request);
40                shopDataIds.add(request.shopDataId);
41            }
42            // 2、 組裝一個批量查詢 (不會比單次查詢慢很多)
43            List<Map<String, Object>> mapList = queryServiceRemoteCall.queryShopDataInfoByCodeBatch(shopDataIds);
44
45            // 3、 分發響應結果,給每一個request使用者請求 (多執行緒 之間的通訊)
46            HashMap<String, Map<String, Object>> resultMap = new HashMap<>(); //  1000---- 007
47            for (Map<String, Object> map : mapList) {
48                String code = map.get("code").toString();
49                resultMap.put(code, map);
50            }
51
52            // 1000個請求
53            for (Request req : requests) { 
54                Map<String, Object> result = resultMap.get(req.shopDataId);
55                // 怎麼通知對應的1000多個執行緒,取結果呢?
56                req.completableFuture.complete(result);
57            }
58        }, 0,250); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">10, TimeUnit.MILLISECONDS);
59    }
60
61
62    63    QueryServiceRemoteCall queryServiceRemoteCall;
64
65    66     * 1000 使用者請求,1000個執行緒
67     *
68     * @param shopDataId
69     * @return
70     * @throws ExecutionException
71     * @throws InterruptedException
72     */
73    74        Request request = new Request();
75        request.shopDataId = shopDataId;
76        CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
77        request.completableFuture = future;
78        queue.add(request);
79        // 等待其他執行緒通知拿結果
80        return future.get();
81    }
82}
複製程式碼
  • 測試結果
 2結束測試,執行時長:164
3
4此次合併了多少請求:63
5此次合併了多少請求:227
6此次合併了多少請求:32
7此次合併了多少請求:298
8此次合併了多少請求:68
9此次合併了多少請求:261
10此次合併了多少請求:51
12price-thread-code-747 查詢結束,結果是:{code=code-747, shopDataId=113980125, price=6000, name=棉花糖}
13price-thread-code-821 查詢結束,結果是:{code=code-821, shopDataId=568038265,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number">14price-thread-code-745 查詢結束,結果是:{code=code-745, shopDataId=998247608,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number">16....... 省略程式碼中。。。。
18price-thread-code-809 查詢結束,結果是:{code=code-809, shopDataId=479029433,250); padding-right: 20px; word-spacing: 0px; word-wrap: inherit !important; word-break: inherit !important;" class="linenum hljs-number">19price-thread-code-806 查詢結束,結果是:{code=code-806, shopDataId=929748878, name=棉花糖}
複製程式碼

可以看到我們將1000次請求進行了合併,資料也是正常的模擬到了。

四、總結

弊端:

  • 啟用請求的成本是執行實際邏輯之前增加的延遲。
  • 如果平均僅需要5毫秒的執行時間,放在一個10毫秒的做一次批處理的合併場景下,則在最壞的情況下,執行時間可能會變為15毫秒。(一定不適合低延遲的RPC場景、一定不適合低併發場景)

場景:

  • 如果很少有超過1或2個請求會併發在一起,則沒有必要用。
  • 一個特定的查詢同時被大量使用,並且可以將幾+個甚至數百個批處理在一起,那麼如果能接受處理時間變長一點點,用來減少網路連線欲,這是值得的。(典型如:資料庫、Http介面)

擴充套件:

  • 我們不重複造輪子,在SpringCloud的元件spring-cloud-starter-netflix-hystrix中已經有封裝好的輪子HystrixHystrixCollapser來實現請求的合併,以減少通訊消耗和執行緒數的佔用
  • 當然他的元件比較複雜,也更全面,支援非同步,同步,超時,異常等的處理機制。
  • 但是,從底層思路來說,無非是執行緒之間的通訊,執行緒的切換,佇列等一些併發程式設計相關的技術,只要我們高度封裝和抽象,那也可以手擼一個合併請求的框架處理。

那今日份的講解就到此結束,具體的程式碼請移步我的gitHub的mybot專案master分支查閱,fork體驗一把,或者評論區留言探討,寫的不好,請多多指教~~