1. 程式人生 > >java執行緒併發工具類

java執行緒併發工具類

  本次內容主要講Fork-Join、CountDownLatch、CyclicBarrier以及Callable、Future和FutureTask,最後再手寫一個自己的FutureTask,絕對乾貨滿滿!

 

1、Fork-Join

1.1 什麼是Fork-Join

  Java多執行緒的開發可以我們自己啟用多執行緒,執行緒池,還可以使用forkjoin。forkjoin可以讓我們不去了解諸如Thread、Runnable等相關的知識,只要遵循forkjoin的開發模式,就可以寫出很好的多執行緒併發程式。

   forkjoin採用的是分而治之。分而治之思想是:將一個難以直接解決的大問題,分割成一些規模較小的相同問題,以便各個擊破,分而治之。分而治之的策略是:對於一個規模為n的問題,若該問題可以容易地解決(比如說規模n較小)則直接解決,否則將其分解為m個規模較小的子問題,這些子問題互相獨立且與原問題形式相同(子問題相互之間有聯絡就會變為動態規範演算法),遞迴地解這些子問題,然後將各子問題的解合併得到原問題的解,這種演算法設計策略叫做分治法。用一張圖來表示forkjoin原理。

  我們可以瞭解一下計算機的十大經典演算法:快速排序、堆排序、歸併排序 、二分查詢、BFPRT(線性查詢)、DFS(深度優先搜尋)、BFS(廣度優先搜尋)、Dijkstra、動態規劃、樸素貝葉斯分類。其中有哪一些用到的是分而治之呢?有3個,分別是快速排序、歸併排序和二分查詢。

  歸併排序是建立在歸併操作上的一種有效的排序演算法。該演算法是採用分治法的一個非常典型的應用。將已有序的子序列合併,得到完全有序的序列;即先使每個子序列有序,再使子序列段間有序。若將兩個有序表合併成一個有序表,稱為2路歸併,與之對應的還有多路歸併。對於給定的一組資料,利用遞迴與分治技術將資料序列劃分成為越來越小的半子表,在對半子表排序後,再用遞迴方法將排好序的半子表合併成為越來越大的有序序列。為了提升效能,有時我們在半子表的個數小於某個數(比如15)的情況下,對半子表的排序採用其他排序演算法,比如插入排序。下面演示一下歸併排序的過程。

1.2 歸併排序(升序)示例

 先將陣列劃分為左右2個子表:

 然後繼續對左右2個子表進行拆分:

對拆分好的4個子表進行排序:

 對有序子表進行比較合併:

 對合並後的子表繼續比較合併:

 第二次合併後,陣列呈有序排列。

 1.3 Fork-Join工作竊取

  工作竊取是指當前執行緒的Task已經全被執行完畢,則自動取到其他執行緒的Task佇列中取出Task繼續執行。ForkJoinPool中維護著多個執行緒在不斷地執行Task,每個執行緒除了執行自己職務內的Task之外,還會根據自己工作執行緒的閒置情況去獲取其他繁忙的工作執行緒的Task,如此一來就能能夠減少執行緒阻塞或是閒置的時間,提高CPU利用率。用一張圖進行說明。

1.3 Fork-Join使用

   Fork-Join使用兩個類來完成以上兩件事情:ForkJoinTask和ForkJoinPool。我們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務中執行fork和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。

(1)RecursiveAction,用於沒有返回結果的任務

(2)RecursiveTask,用於有返回值的任務

 task要通過ForkJoinPool來執行,使用submit 或 invoke 提交,兩者的區別是:invoke是同步執行,呼叫之後需要等待任務完成,才能執行後面的程式碼;submit是非同步執行。join()和get方法當任務完成的時候返回計算結果。呼叫get/join方法的時候會阻塞。還是用一個圖來說明forkjoin的工作流程。

  在我們自己實現的compute方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在呼叫invokeAll方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。

 1.4 Fork-Join VS 單執行緒

  假設有一個業務場景,資料庫中有編號為0到1千萬的會員資訊,要統計所有會員的餘額總和。為了對比結果的一致性,使用者的餘額不用隨機數表示,就用編號代表使用者的餘額。現在的做法是每次從資料庫查詢出5000條資料進行統計,直到所有資料統計完成,進行彙總。對比看看單執行緒和Fork-Join的差異。

先看單執行緒場景:

public class SingleThreadSumNumber {
    /**
     * 每次查詢5000條進行統計
     */
    private static final int THRESHOLD = 5000;

    /**
     * 最小值
     */
    private static final int MIN = 0;

    /**
     * 最大值
     */
    private static final int MAX = 10000000;

    public void sumNumber() {
        long sum = 0;
        long startTime = System.currentTimeMillis();
        int start = MIN;
        int end = MIN + THRESHOLD;

        boolean isFirstTime = true;
        while (end <= MAX) {
            sum = sum + batchSum(start, end);
            if (isFirstTime) {
                start = start + THRESHOLD + 1;
                isFirstTime = false;
            } else {
                start = start + THRESHOLD;
            }
            end = end + THRESHOLD;
        }
        System.out.println("The result is " + sum
                + " spend time:" + (System.currentTimeMillis() - startTime) + "ms");
    }

    /**
     * 統計每次查詢出來的餘額總和
     * @param start
     * @param end
     * @return
     */
    public long batchSum(int start, int end) {
        long sum = 0;
        try {
            Thread.sleep(15);//休眠15毫秒模擬查詢業務
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = start; i <= end; i++) {
            sum += i;
        }
        return sum;
    }

    public static void main(String[] args) {
        SingleThreadSumNumber thread = new SingleThreadSumNumber();
        thread.sumNumber();
    }
}

執行程式輸出以下結果:

餘額總和為50000005000000,花費了30119毫秒,下面使用forkjoin來進行統計:

 1 import java.util.concurrent.ForkJoinPool;
 2 import java.util.concurrent.RecursiveTask;
 3 
 4 public class ForkJoinDemo {
 5     /**
 6      * 門限值,如果任務門限低於此值,則進行計算
 7      */
 8     private static final int THRESHOLD = 5000;
 9 
10     /**
11      * 最小值
12      */
13     private static final int MIN = 0;
14 
15     /**
16      * 最大值
17      */
18     private static final int MAX = 10000000;
19 
20     private static class SumNumberTask extends RecursiveTask<Long> {
21         private int start;
22         private int end;
23 
24         public SumNumberTask(int start, int end) {
25             this.start = start;
26             this.end = end;
27         }
28 
29         @Override
30         protected Long compute() {
31             if (end - start < THRESHOLD) {
32                 return sumBatch(start, end);
33             } else {
34                 int mid = (start + end) / 2;
35                 SumNumberTask left = new SumNumberTask(start, mid);
36                 SumNumberTask right = new SumNumberTask(mid + 1, end);
37                 invokeAll(left, right);
38                 long leftResult = left.join();
39                 long rightResult = right.join();
40                 return leftResult + rightResult;
41             }
42         }
43     }
44 
45     public void sumNumber() {
46         ForkJoinPool pool = new ForkJoinPool();
47         long start = System.currentTimeMillis();
48         int recordMin = MIN;
49         int recordMax = MAX;
50         SumNumberTask sumTask = new SumNumberTask(recordMin, recordMax);
51         pool.invoke(sumTask);
52         System.out.println("Task is Running.....");
53         Long result = sumTask.join();
54         System.out.println("The result is " + result + " spend time:"
55                 + (System.currentTimeMillis() - start) + "ms");
56     }
57 
58     /**
59      * 統計每次任務的總和
60      * @param fromId
61      * @param toId
62      * @return
63      */
64     public static long sumBatch(int fromId, int toId) {
65         long sum = 0;
66         try {
67             Thread.sleep(15);//休眠15毫秒模擬查詢業務
68         } catch (InterruptedException e) {
69             e.printStackTrace();
70         }
71         for (int i = fromId; i <= toId; i++) {
72             sum += i;
73         }
74         return sum;
75     }
76 
77     public static void main(String[] args) {
78         ForkJoinDemo forkJoinDemo = new ForkJoinDemo();
79         forkJoinDemo.sumNumber();
80     }
81 }

輸出結果:

   餘額總和為50000005000000,和使用單執行緒統計時一致,使用forkjoin達到了同樣的目的,但是隻花費了4078毫秒,效能提升了7倍多。為了使效能有進一步提升,我們可以在第44行指定併發數量。不傳參情況下,預設併發量是當前伺服器的邏輯CPU個數。我們把併發量調整成64,即ForkJoinPool pool = new ForkJoinPool(16 * 4),執行程式,輸出結果為:

 統計結果一致,花費了567毫秒,比起單執行緒統計,效能提升了53倍之多,由此可見forkjoin的併發威力。

2、CountDownLatch

 2.1 什麼是CountDownLatch

  JDK對CountDownLatch的解釋是:一種同步輔助器,它允許一個或多個執行緒等待,直到在其他執行緒中執行的一組操作完成為止。舉個例子來理解CountDownLatch:隔壁寢室的老王今天要參加學校運動會的400米決賽,跟小王一起爭奪冠軍的還有另外5個人,不管這6位選手的內心多激動多澎湃,也要等裁判的發令槍響了之後才能起跑,裁判不發出指令,選手就只能在起跑線等待,這就是CountDownLatch的作用。但是實際場景並不只有一個發令裁判,參加過學校運動會的同學都知道,還可能需要若干個裁判進行手動計時,要等所有的裁判都就位後,發令槍一響,運動員才能起跑。假設有3個計時裁判,一個發令裁判,用一個圖來說明。

   在比賽開始前,發令裁判會用洪荒之力吼一聲,各~就~各~位,此時發令裁判會用炯炯有神的目光和3位計時裁判交流,3位裁判分別點頭示意已經準備好了,此時發令裁判會再次大吼一聲,預備~~~跑!!!此時憋了許久的6位運動員飛奔出去,當然老王遙遙領先,畢竟女神給他說了跑第一名的話晚上有獎勵。發令裁判的任務完成,不用繼續執行下去,而3個計時裁判繼續工作,對6位選手的成績進行一個記錄。

 2.1 CountDownLatch實戰

用一段程式來模擬老王參加運動會400米決賽的場景。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    /**
     * 運動員在計時裁判和發令裁判就位後才能起跑
     */
    static CountDownLatch sportsManLatch = new CountDownLatch(4);

    /**
     * 發令裁判在3個計時裁判準備好之後才能發令
     */
    static CountDownLatch orderRefereeLatch = new CountDownLatch(3);

    /**
     * 計時裁判
     */
    static class TimeReferee implements Runnable {
        private int no;

        public TimeReferee(int no) {
            this.no = no;
        }

        @Override
        public void run() {
            System.out.println(no + "號計時裁判就位");
            orderRefereeLatch.countDown();
            sportsManLatch.countDown();
        }
    }

    /**
     * 發令裁判
     */
    static class OrderReferee implements Runnable {
        @Override
        public void run() {
            try {
                orderRefereeLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("發令裁判發出指令~~~~~~");
            sportsManLatch.countDown();
        }
    }

    /***
     * 運動員
     */
    static class SportsMan implements Runnable {
        private int no;

        public SportsMan(int no) {
            this.no = no;
        }

        @Override
        public void run() {
            try {
                System.out.println(no + "號運動員已經就位");
                sportsManLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(no + "號選手說,我要跑第一");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //6個運動員就位
        for (int i = 0; i < 6; i++) {
            new Thread(new SportsMan(i)).start();
        }

        //發令裁判和計時裁判眼神確認,等計時裁判都準備好之後發令
        new Thread(new OrderReferee()).start();

        //3個計時裁判就位
        for (int i = 0; i < 3; i++) {
            new Thread(new TimeReferee(i)).start();
        }
    }
}

程式輸出:

3、CyclicBarrier

 3.1 什麼是CyclicBarrier

  JDK對CyclicBarrier的解釋是:一種同步輔助工具,它允許一組執行緒全部互相等待以到達一個公共的障礙點。我們可以從字面意思理解它,可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開啟,所有被屏障攔截的執行緒才能繼續執行。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),parties表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。CyclicBarrier還提供一個更高階的建構函式CyclicBarrier(int parties,Runnable barrierAction),用於在parties個執行緒到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。還用一張圖來說明。

 3.2 CyclicBarrier實戰

  CyclicBarrier可以用於多執行緒計算資料,最後合併計算結果的場景。我們模擬3個子執行緒向一個map中新增資料,它們新增資料完成後,到一個屏障點進行等待,由統計執行緒對資料進行列印,統計執行緒工作結束後,3個子執行緒再被統一釋放去幹其他工作。我們設定2個屏障點來演示,,體現其可迴圈使用的特徵。

public class CyclicBarrierDemo {
    private static CyclicBarrier barrier = new CyclicBarrier(3, new CollectThread());

    /**存放子執行緒產生資料的容器*/
    private static ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(new WorkThread());
            thread.start();
        }
        Thread.sleep(5);
    }

    /**
     * 負責對子執行緒的結果進行其他處理
     */
    private static class CollectThread implements Runnable {
        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : map.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println("the result = " + result);
            System.out.println("CollectThread do other things");
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("CollectThread end........");
        }
    }

    /**
     * 實際工作的子執行緒
     */
    private static class WorkThread implements Runnable {
        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            map.put(id + "", id);
            Random r = new Random();
            try {
                Thread.sleep(r.nextInt(1000));
                System.out.println("Thread_" + id + " first do something ");
                //第一次到達屏障
                barrier.await();
                System.out.println("Thread_" + id + " first do other things");

                Thread.sleep(r.nextInt(500));
                map.put(id * 2 + "", id * 2);
                System.out.println("Thread_" + id + " second do something ");
                //第二次到達屏障
                barrier.await();
                System.out.println("Thread_" + id + " second other things ");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

程式輸出:

3.3 CountDownLatch和CyclicBarrier對比

  CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以反覆使用。CountDownLatch.await()一般阻塞工作執行緒,所有的進行預備工作的執行緒執行countDown(),而CyclicBarrier通過工作執行緒呼叫await()從而自行阻塞,直到所有工作執行緒達到指定屏障,再大家一起往下走。在控制多個執行緒同時執行上,CountDownLatch可以不限執行緒數量,而CyclicBarrier是固定執行緒數。同時,CyclicBarrier還可以提供一個barrierAction,合併多執行緒計算結果。

4、Callable、Future和FutureTask

4.1 Runnable、Callable、Future和FutureTask之間的關係

  Runnable是一個介面,在它裡面只聲明瞭一個run()方法,由於run()方法返回值為void型別,所以在執行完任務之後無法返回任何結果。Callable位於java.util.concurrent包下,它也是一個介面,在它裡面也只聲明瞭一個方法,只不過這個方法叫做call(),這是一個泛型介面,call()函式返回的型別就是傳遞進來的V型別。Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。要獲取返回結果時可以呼叫get方,該方法會阻塞直到任務返回結果。因為Future只是一個介面,所以是無法直接用來建立物件使用的,因此就有了FutureTask。FutureTask類實現了RunnableFuture介面,RunnableFuture繼承了Runnable介面和Future介面,所以它既可以作為Runnable被執行緒執行,又可以作為Future得到Callable的返回值。用一個圖來說明。

  因此當我們想通過一個執行緒執行Callable,但是Thread不支援構造方法中傳遞Callable的例項,我們需要通過FutureTask把一個Callable包裝成Runnable,然後再通過這個FutureTask拿到Callable執行後的返回值。要想new出一個FutureTask的例項,有2種方式,直接貼出程式碼。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }


    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

4.2 Callable和FutureTask實戰

  這個例子比較簡單,在一個主執行緒中建立一個callable來對1到10000進行累加,再休眠3秒,然後把這個callable封裝成一個futureTask,交給一個執行緒去執行,最終檢視callable的返回結果和阻塞效果。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<Long> callable = new Callable<Long>() {
            long sum = 0;

            @Override
            public Long call() throws Exception {
                for (int i = 0; i <= 10000; i++) {
                    sum += i;
                }
                Thread.sleep(3000);//主要是為了演示get()時候的阻塞效果
                return sum;
            }
        };
        FutureTask<Long> futureTask = new FutureTask<>(callable);
        new Thread(futureTask).start();
        Thread.sleep(10);
        System.out.println("main執行緒繼續執行");
        System.out.println("獲取callable計算結果 = " + futureTask.get());
        System.out.println("main執行緒繼續執行 ");
    }
}

程式輸出:

 可以看到當futureTask.get()沒有獲取到返回結果時,主執行緒是處於阻塞狀態。

4.3 手寫一個FutureTask

   要實現一個簡易的FutureTask,通過上面對幾個介面之間關係的介紹,以及閱讀FutureTask程式碼可以看出,只需定義一個類,實現Runnable和Future介面,並實現run()方法和get()方法就可以了,核心思想就是上一篇文章中提到的通知/等待機制。直接上程式碼:

import java.util.concurrent.*;

public class MyFutureTask<V> implements Runnable, Future<V> {
    private Callable<V> callable;

    private V result = null;

    public MyFutureTask(Callable<V> callable) {
        this.callable = callable;
    }

    @Override
    public void run() {
        V temp = null;
        try {
            temp = callable.call();
        } catch (Exception e) {
            e.printStackTrace();
        }
        synchronized (this) {
            result = temp;
            this.notifyAll();
        }
    }

    @Override
    public V get() throws InterruptedException {
        if (result != null) {
            return result;
        }
        System.out.println("等待結果執行完成。。。。。");
        synchronized (this) {
            this.wait();
        }
        return result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return false;
    }


    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }
}

 為了驗證效果,把上一段程式碼中的FutureTask改成MyFutureTask,其餘程式碼統統不變。

import java.util.concurrent.Callable;

public class FutureTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        Callable<Long> callable = new Callable<Long>() {
            long sum = 0;

            @Override
            public Long call() throws Exception {
                for (int i = 0; i <= 10000; i++) {
                    sum += i;
                }
                Thread.sleep(3000);//主要是為了演示get()時候的阻塞效果
                return sum;
            }
        };
        MyFutureTask<Long> futureTask = new MyFutureTask<>(callable);
        new Thread(futureTask).start();
        Thread.sleep(10);
        System.out.println("main執行緒繼續執行");
        System.out.println("獲取callable計算結果 = " + futureTask.get());
        System.out.println("main執行緒繼續執行 ");
    }
}

執行程式,可以看到輸出結果和阻塞現象與使用FutureTask一致:

 

 

  這篇隨筆就介紹這麼多內容,希望大家看了有收穫。原子操作CAS在下一篇文章中介紹,閱讀過程中如發現描述有誤,請指出,謝謝。