1. 程式人生 > 實用技巧 >使用併發工具類庫時需要注意的坑

使用併發工具類庫時需要注意的坑

所謂的併發工具類庫就是用來解決多執行緒環境下的併發問題的工具類庫,一般分為同步器和容器兩大類,比如

  1. 容器:ConcurrentHashMap、 ConcurrentSkipListMap、 CopyOnWriteArrayList、ConcurrentSkipListSet等
  2. 同步器:CountDownLatch、Semaphore、CyclicBarrier、Phaser、Exchanger等

但是在使用併發工具類庫時如果沒有弄清楚各種併發類工具庫的實現原理,提供的特性以及適用的情形,而是一遇到併發問題就選用相關的工具類庫去處理,盲目認為併發類工具庫能解決所有的併發問題必然會掉入一些坑。接下來就整理一些資料上總結的使用併發工具類庫過程中可能遇到的坑。

1. 執行緒池中的執行緒重用導致資訊錯亂

  • 場景設計:
    建立一個SpringBoot的Web應用,在Controller層使用一個ThreadLocal來存放前端傳來的使用者資訊,在前端傳送多次請求再看ThreadLocal內值的變化
@RestController
public class ThreadLocalController {

    public static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

    @GetMapping("thread")
    public Map wrong(@RequestParam("userId") Integer userId) {

        // 設定使用者資訊前先查詢一次ThreadLocal中的使用者資訊
        String before = Thread.currentThread().getName() + ":" + currentUser.get();
        // 設定用使用者資訊到ThreadLocal之中
        currentUser.set(userId);
        String after = Thread.currentThread().getName() + ":" + currentUser.get();
        Map result = new HashMap();
        result.put("before", before);
        result.put("after", after);
        return result;
    }
}

在這段程式碼邏輯中,先把ThreadLocal中使用者資料設定為初始值null,獲取一次該值,然後再把外部傳入的引數設定到ThreadLocal中,再次獲取使用者資料。期待的結果是第一次獲取的使用者資料是null,第二次獲取的使用者資料是前端出入的使用者資料。
ThreadLocal作為執行緒變數主要是填充當前執行緒的變數,該變數是其他執行緒隔離的,而在web應用中,程式執行在Tomcat中,其執行執行緒是Tomcat的工作執行緒,而Tomcat的工作執行緒是基於執行緒池的。那麼就存在一種情況,一個請求的執行緒重用了另一個執行緒,這就可能使得首次從ThreadLocal中獲取的使用者資訊是前一個執行緒遺留的資料。

  • 設定Tomcat的工程執行緒池的最大執行緒數為1,使得執行緒一定會重用,然後模擬不同的使用者傳送請求
server.tomcat.max-threads=1
// 請求1
localhost:8080/thread?userId=123

返回值
{
    "before": "http-nio-8080-exec-1:345",
    "after": "http-nio-8080-exec-1:123"
}

// 請求2
localhost:8080/thread?userId=456
返回值
{
    "before": "http-nio-8080-exec-1:123",
    "after": "http-nio-8080-exec-1:456"
}

從結果而言,很容易發現在獲取使用者2的資訊時也獲取到了使用者1的殘留資料,為了解決這個問題的方法也比較簡單,就是在當前執行緒結束後在finally程式碼塊中顯式清楚掉使用者資料,那麼重用的執行緒就不會再獲取到殘留資料了。

public class ThreadLocalController {

    public static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

    @GetMapping("thread")
    public Map wrong(@RequestParam("userId") Integer userId) {

        // 設定使用者資訊前先查詢一次ThreadLocal中的使用者資訊
        String before = Thread.currentThread().getName() + ":" + currentUser.get();
        // 設定用使用者資訊到ThreadLocal之中
        currentUser.set(userId);
        // 設定完使用者資訊之後才查詢一次
        try{
            String after = Thread.currentThread().getName() + ":" + currentUser.get();
            Map result = new HashMap();
            result.put("before", before);
            result.put("after", after);
            return result;
        } finally {
            currentUser.remove();
        }

    }
}

2. 使用執行緒安全的併發工具仍有可能產生執行緒安全問題

  • 場景設計:
    設計一個擁有900個元素的Map,然後再使用10個執行緒,併發的補充100個執行緒進去,使得Map中的元素總數最終為1000個。
@Slf4j
@RestController
public class ConcurrentHashMapController {

    private static int THREAD_COUNT = 10;
    private static int ITEM_COUNT = 1000;

    /**
     * range(開始節點,結束節點)返回一個有序的LongStream, 包含開始節點到結束節點的所有引數,間隔為1,rangeClosed包含結束節點,range不包含
     * boxed 數值轉換為流
     * Collectors.toCurrentMap的引數
     * 1. 第一個引數i -> UUID.randomUUID().toString()設定key
     * 2. 第二個引數Function.identity()設定value, Function.identity()返回一個輸出跟輸入一樣的Lambda表示式物件,等價於形如t -> t形式的Lambda表示式
     * 3. 第三個引數(o1, o2) -> o1,表示如果如果o1與o2的key值相同,選擇o1作為那個key所對應的value值
     * 4. 第四個引數 ConcurrentHashMap::new表示先建立一個新的物件在傳值
     * @param count
     * @return
     */
    private ConcurrentHashMap<String, Long> getData(int count) {
        return LongStream.rangeClosed(1, count)
                .boxed()
                .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(),
                        Function.identity(),
                        (o1, o2) -> o1, ConcurrentHashMap::new));

    }

    @GetMapping("map")
    public String wrong() throws InterruptedException {
        // 初始的時候建立900個元素
        ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
        log.info("init size:{}", concurrentHashMap.size());


        // 通過執行緒池建立10個執行緒併發處理
        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        forkJoinPool.execute(() -> IntStream.rangeClosed(0, 10).parallel().forEach(i -> {
            // 查詢還需補充的的元素個數
            int gap = ITEM_COUNT - concurrentHashMap.size();
            log.info("gap size:{}", gap);
            // 補充元素
            concurrentHashMap.putAll(getData(gap));
        }));

        // 等待所有任務完成
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

        // 檢視最終Map內的元素個數
        log.info("finish size:{}", concurrentHashMap.size());
        return "num:" + concurrentHashMap.size();

    }
    
  • 執行結果
2020-07-08 18:55:17.345  INFO 15960 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController        : init size:900
2020-07-08 18:55:17.349  INFO 15960 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController        : gap size:100
2020-07-08 18:55:17.351  INFO 15960 --- [Pool-1-worker-2] c.c.s.ConcurrentHashMapController        : gap size:99
2020-07-08 18:55:17.351  INFO 15960 --- [Pool-1-worker-8] c.c.s.ConcurrentHashMapController        : gap size:90
2020-07-08 18:55:17.351  INFO 15960 --- [ool-1-worker-11] c.c.s.ConcurrentHashMapController        : gap size:90
2020-07-08 18:55:17.351  INFO 15960 --- [ool-1-worker-15] c.c.s.ConcurrentHashMapController        : gap size:90
2020-07-08 18:55:17.351  INFO 15960 --- [Pool-1-worker-6] c.c.s.ConcurrentHashMapController        : gap size:90
2020-07-08 18:55:17.352  INFO 15960 --- [ool-1-worker-10] c.c.s.ConcurrentHashMapController        : gap size:71
2020-07-08 18:55:17.352  INFO 15960 --- [ool-1-worker-13] c.c.s.ConcurrentHashMapController        : gap size:74
2020-07-08 18:55:17.351  INFO 15960 --- [Pool-1-worker-1] c.c.s.ConcurrentHashMapController        : gap size:85
2020-07-08 18:55:17.351  INFO 15960 --- [Pool-1-worker-4] c.c.s.ConcurrentHashMapController        : gap size:85
2020-07-08 18:55:17.355  INFO 15960 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController        : gap size:-125
2020-07-08 18:55:17.357  INFO 15960 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController        : finish size:1774

從結果來看,顯然程式出現了執行緒安全問題,有的執行緒計算出了錯誤的應該新增的值,最終導致Map中的元素總數超過了1000。雖然併發的程式中使用了ConcurrentHashMap的併發安全類,但是由於ConcurrentHashMap只能保證其自身的讀寫的執行緒安全,而先計算需要新增的值,再進行新增的過程並不是一個原子操作,所以儘管使用了ConcurrentHashMap但是仍然會有執行緒安全的問題。而為了解決這個問題,一個最簡單的方式就將計算待新增的數和進行新增的操作上一個鎖,使其成為原子操作。

@GetMapping("map")
    public String right() throws InterruptedException {
        // 初始的時候建立900個元素
        ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
        log.info("init size:{}", concurrentHashMap.size());


        // 通過執行緒池建立10個執行緒併發處理
        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        forkJoinPool.execute(() -> IntStream.rangeClosed(0, 10).parallel().forEach(i -> {
            synchronized (concurrentHashMap) {
                int gap = ITEM_COUNT - concurrentHashMap.size();
                log.info("gap size:{}", gap);
                // 補充元素
                concurrentHashMap.putAll(getData(gap));
            }
        }));

        // 等待所有任務完成
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

        // 檢視最終Map內的元素個數
        log.info("finish size:{}", concurrentHashMap.size());
        return "num:" + concurrentHashMap.size();

    }
  • 執行結果
2020-07-08 19:05:46.058  INFO 12636 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController        : init size:900
2020-07-08 19:05:46.064  INFO 12636 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController        : gap size:100
2020-07-08 19:05:46.068  INFO 12636 --- [ool-1-worker-10] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.068  INFO 12636 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.068  INFO 12636 --- [Pool-1-worker-1] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.068  INFO 12636 --- [Pool-1-worker-8] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.068  INFO 12636 --- [Pool-1-worker-6] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.069  INFO 12636 --- [Pool-1-worker-2] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.069  INFO 12636 --- [Pool-1-worker-4] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.069  INFO 12636 --- [ool-1-worker-11] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.069  INFO 12636 --- [ool-1-worker-13] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.069  INFO 12636 --- [ool-1-worker-15] c.c.s.ConcurrentHashMapController        : gap size:0
2020-07-08 19:05:46.071  INFO 12636 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController        : finish size:1000

3. 沒有合理使用併發工具

3.1 沒有合理使用併發工具的特性

  • 場景設計:
    使用10個併發,迴圈10w次來累加隨機key的值,如果key不存在的話首次設定值為1
    使用兩種方式實現,一種使用ConcurrentHashMap的特性實現,一種使用傳統的synchronize加鎖的方式實現,比較兩種方式的效能
  1. 方式一:使用傳統的synchronize加鎖
private static int LOOP_COUNT = 1000000;
    private static int THREAD_COUNT = 10;
    private static int ITEM_COUNT = 10;

    /**
     * ThreadLocalRandom.current().nextInt(ITEM_COUNT)表示多執行緒情況下生成隨機數,保證每個執行緒生成的隨機數不一致
     * @return
     * @throws InterruptedException
     */
    @GetMapping("map1")
    private Map<String, Long> normaluse() throws InterruptedException {
        ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
            String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
            synchronized (freqs) {
                // key的value的初始值為1,key後value + 1;
                if (freqs.containsKey(key)) {
                    freqs.put(key, freqs.get(key) + 1);
                } else {
                    freqs.put(key, 1L);
                }
            }
        }));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        System.out.println("size:" + freqs.values());
        long sum = freqs.values().stream().collect(Collectors.summarizingLong(x -> x.longValue())).getSum();
        System.out.println("sum:" + sum);
        return freqs;
    }
  1. 方式二: 使用ConcurrentHashMap的特性實現
@GetMapping("map2")
    private Map<String, Long> goodUse() throws InterruptedException {
        ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
            String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
            // 利用computeIfAbsent()方法來例項化LongAdder, 然後利用LongAdder來進行安全計數
            freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
        }));

        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

        Map<String, Long> collect = freqs.entrySet().stream()
                .collect(Collectors.toMap(
                        e -> e.getKey(),
                        e -> e.getValue().longValue()
                ));

        long sum = collect.values().stream().collect(Collectors.summarizingLong(x -> x.longValue())).getSum();
        System.out.println("sum:" + sum);

        return collect;
    }

  1. 測試兩種方法的效能
@GetMapping("test")
    public String good() throws InterruptedException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("normaluse");
        Map<String, Long> normaluse = normaluse();
        stopWatch.stop();
        Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
        Assert.isTrue(normaluse.entrySet().stream().mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT, "normaluse count error");

        stopWatch.start("gooduse");
        Map<String, Long> gooduse = goodUse();
        stopWatch.stop();
        Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
        Assert.isTrue(gooduse.entrySet().stream().mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT, "gooduse count error");
        log.info(stopWatch.prettyPrint());
        return "OK";

    }
  • 測試結果
-----------------------------------------
ms     %     Task name
-----------------------------------------
00472  078%  normaluse
00131  022%  gooduse

就結果而言,可以發現使用ConcurrentHashMap的computeIfAbsent方法比傳統的synchronize方法效率高了很多倍,這主要是ConcurrentHashMap使用CAS在虛擬機器層面確保寫入資料的原子性,比通過synchronize加鎖的方式實現原子性的效率高很多。由此可見,在併發場景下,如果沒有合理的使用併發工具類提供的特性可能並不能發揮出其的效能。

4.2 沒有在適用場景選擇合適的併發工具

  • 場景設計
    將CopyOnWriteArrayList併發工具分別應用於寫多,和讀多的場景,比較其在這兩種場景下的效能
@GetMapping("write")
    public Map testWrite() {
        List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();

        List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
        StopWatch stopWatch = new StopWatch();
        int loopCount = 100000;
        stopWatch.start("Write:copyOnWriteArrayList");
        IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
        stopWatch.stop();

        stopWatch.start("Write:synchronizedList");
        IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
        stopWatch.stop();

        log.info(stopWatch.prettyPrint());
        Map result = new HashMap();
        result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
        result.put("synchronizedList", synchronizedList.size());
        return result;

    }




    //幫助方法用來填充List
    private void addAll(List<Integer> list) {
        list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
    }

    //測試併發讀的效能
    @GetMapping("read")
    public Map testRead() {
        //建立兩個測試物件
        List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
        List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
        //填充資料
        addAll(copyOnWriteArrayList);
        addAll(synchronizedList);
        StopWatch stopWatch = new StopWatch();
        int loopCount = 1000000;
        int count = copyOnWriteArrayList.size();
        stopWatch.start("Read:copyOnWriteArrayList");
        //迴圈1000000次併發從CopyOnWriteArrayList隨機查詢元素
        IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
        stopWatch.stop();
        stopWatch.start("Read:synchronizedList");
        //迴圈1000000次併發從加鎖的ArrayList隨機查詢元素
        IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
        Map result = new HashMap();
        result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
        result.put("synchronizedList", synchronizedList.size());
        return result;
    }
  • 寫多場景測試結果
-----------------------------------------
ms     %     Task name
-----------------------------------------
09355  100%  Write:copyOnWriteArrayList
00030  000%  Write:synchronizedList
  • 讀多場景測試結果
-----------------------------------------
ms     %     Task name
-----------------------------------------
00051  014%  Read:copyOnWriteArrayList
00309  086%  Read:synchronizedList

通過測試結果可以發現,copyOnWriteArrayList在寫多的場景下的效能非常差,但在讀多的場景下效能很好。因此,在併發場景中也應該根據具體的場景選擇合適的併發工具類。