1. 程式人生 > >第六十九條 併發工具優先於wait和notify

第六十九條 併發工具優先於wait和notify

java中經常會用到子執行緒等,當各個執行緒作操作後,需要處理資料,我們需要自己重寫wait和notify方法,但1.5以後,java平臺對這些要求提高了,我們可以用高階的工具來代替。比如用執行緒池管理執行緒,或者java提供的併發集合類以及同步器(Synchronizer)。執行緒池上一條介紹過了。併發集合Concurrent Collection,它的標準介面如 List Queue Map等,為了提供良好的高併發性,它們內部管理自己同步,因此,併發集合不能排除併發活動,鎖定它們沒什麼用,只會讓速度變慢,我們無法對它們做過度的操作,但我們可以根據它們的特性,根據實際業務作出對應的選擇。比如,HashMap不支援併發,單執行緒效率比較高,相應的HashTable提供支援併發功能,但由於對方法加鎖,所以效率低;因此,後來 ConcurrentMap 出師了,它實現了Map介面,它是實行分段加鎖技術,所以多執行緒的情況下,效率比較高,併發效能比較卓越,速度也很快,所以map集合的併發情況,如無意外或特殊的要求,我們優先使用 ConcurrentMap,而不是用 HashTable 等等。有些介面通過阻塞操作進行擴充套件,例如BlockingQueue擴充套件了Queue介面,並添加了take()在內的幾個方法,它從佇列中刪除並返回了頭元素,如果佇列為空,沒有元素,它就等待。這樣就允許將阻塞佇列用於工作佇列,也稱作生產者-消費者佇列,可以一邊新增,一邊移除返回,並處理工作專案。上一條總,大多數ExecutorService實現(包括ThreadPoolExecutor)都使用BlockingQueue。


同步器是一些使執行緒能夠等待另一個執行緒的物件,允許他們協調動作。最常用的同步器是CountDownLatch和Semaphore,較不常用的是CyclicBarrier和Exchanger。我們分析一下其中的用法:

CountDownLatch:

    private void testCountDownLatch() {

        final CountDownLatch latch = new CountDownLatch(2);

        Thread t1 = new Thread(){
            public void run() {
                try {
                    Log.e(TAG, "子執行緒"+Thread.currentThread().getName()+"正在執行");
                    Thread.sleep(3000);
                    Log.e(TAG, "子執行緒"+Thread.currentThread().getName()+"執行完畢");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread t2 = new Thread(){
            public void run() {
                try {
                    Log.e(TAG, "子執行緒"+Thread.currentThread().getName()+"正在執行");
                    Thread.sleep(3000);
                    Log.e(TAG, "子執行緒"+Thread.currentThread().getName()+"執行完畢");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        try {
            t1.start();
            t2.start();
            Log.e(TAG, "等待2個子執行緒執行完畢...");
            latch.await();
            Log.e(TAG, "2個子執行緒已經執行完畢");
            Log.e(TAG, "繼續執行主執行緒");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

呼叫這個方法,java 平臺列印結果:

等待2個子執行緒執行完畢...
子執行緒Thread-0正在執行
子執行緒Thread-1正在執行
子執行緒Thread-1執行完畢
子執行緒Thread-0執行完畢
2個子執行緒已經執行完畢
繼續執行主執行緒

android 平臺列印為:

12-03 16:06:43.077 12545-12545/com.example.cn.desigin E/ConcurrentActivity: 等待2個子執行緒執行完畢...
12-03 16:06:43.077 12545-12712/com.example.cn.desigin E/ConcurrentActivity: 子執行緒Thread-5正在執行
12-03 16:06:43.077 12545-12713/com.example.cn.desigin E/ConcurrentActivity: 子執行緒Thread-6正在執行

12-03 16:06:46.078 12545-12712/com.example.cn.desigin E/ConcurrentActivity: 子執行緒Thread-5執行完畢
12-03 16:06:46.078 12545-12713/com.example.cn.desigin E/ConcurrentActivity: 子執行緒Thread-6執行完畢
12-03 16:06:46.078 12545-12545/com.example.cn.desigin E/ConcurrentActivity: 2個子執行緒已經執行完畢
12-03 16:06:46.078 12545-12545/com.example.cn.desigin E/ConcurrentActivity: 繼續執行主執行緒

中間會報異常日誌,是內部的,不會導致程式崩潰或停止功能的執行,可以不考慮異常日誌

用途: 比如說有一個任務a,它要等待其他2個任務執行完畢之後才能執行,此時就可以利用 CountDownLatch 來實現這種功能了;或者常見的一個網路功能,介面有時候功能單一,我們需要呼叫兩個不同伺服器的介面,都獲取到後才能重新整理UI介面,此時,也可以使用 CountDownLatch 來實現。


Semaphore:

    private void testSemaphore() {
        int N = 8;            //人數
        Semaphore semaphore = new Semaphore(5); //機器數目
        for (int i = 0; i < N; i++) {
            new SemaphoreThread(i, semaphore).start();
        }
    }

    static class SemaphoreThread extends Thread {
        private int num;
        private Semaphore semaphore;

        public SemaphoreThread(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                Log.e(TAG, "人" + this.num + "坐在板凳上吃飯...");
                Thread.sleep(2000);
                Log.e(TAG, "人" + this.num + "吃完離開板凳");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

列印結果為:  android 平臺

12-03 16:19:23.545 24901-25171/? E/ConcurrentActivity: 人1坐在板凳上吃飯...
12-03 16:19:23.545 24901-25170/? E/ConcurrentActivity: 人0坐在板凳上吃飯...
12-03 16:19:23.545 24901-25172/? E/ConcurrentActivity: 人2坐在板凳上吃飯...
12-03 16:19:23.545 24901-25173/? E/ConcurrentActivity: 人3坐在板凳上吃飯...
12-03 16:19:23.545 24901-25174/? E/ConcurrentActivity: 人4坐在板凳上吃飯...
12-03 16:19:25.545 24901-25171/? E/ConcurrentActivity: 人1吃完離開板凳
12-03 16:19:25.545 24901-25175/? E/ConcurrentActivity: 人5坐在板凳上吃飯...
12-03 16:19:25.545 24901-25172/? E/ConcurrentActivity: 人2吃完離開板凳
12-03 16:19:25.545 24901-25173/? E/ConcurrentActivity: 人3吃完離開板凳
12-03 16:19:25.545 24901-25176/? E/ConcurrentActivity: 人6坐在板凳上吃飯...
12-03 16:19:25.545 24901-25177/? E/ConcurrentActivity: 人7坐在板凳上吃飯...
12-03 16:19:25.545 24901-25174/? E/ConcurrentActivity: 人4吃完離開板凳
12-03 16:19:25.545 24901-25170/? E/ConcurrentActivity: 人0吃完離開板凳
12-03 16:19:27.546 24901-25175/? E/ConcurrentActivity: 人5吃完離開板凳
12-03 16:19:27.546 24901-25176/? E/ConcurrentActivity: 人6吃完離開板凳
12-03 16:19:27.547 24901-25177/? E/ConcurrentActivity: 人7吃完離開板凳


Semaphore 可以控同時訪問的執行緒個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。我們發現,它和執行緒池控制執行緒的個數有點相似,都是達到上限後,有執行緒執行完後,執行緒池是複用已有的執行緒,Semaphore是允許剩下的執行緒依次執行,如果再次達到上限,繼續這個迴圈,直到執行完畢為止。


CyclicBarrier:

    private void testCyclicBarrier() {
        int N = 4;
        Runnable runable = new Runnable() {
            @Override
            public void run() {
                Log.e(TAG, "當前執行緒" + Thread.currentThread().getName());
            }
        };
        CyclicBarrier barrier = new CyclicBarrier(N, runable);
        for (int i = 0; i < N; i++) {
            new CyclicBarrierThread(barrier).start();
        }
    }

    static class CyclicBarrierThread extends Thread{
        private CyclicBarrier cyclicBarrier;

        public CyclicBarrierThread(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            Log.e(TAG, "執行緒" + Thread.currentThread().getName() + "正在寫入資料...");
            try {
                Thread.sleep(5000);      //以睡眠來模擬寫入資料操作
                Log.e(TAG, "執行緒" + Thread.currentThread().getName() + "寫入資料完畢,等待其他執行緒寫入完畢");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            Log.e(TAG, "所有執行緒寫入完畢,繼續處理其他任務...");
        }
    }

列印結果為:  android 平臺

12-03 16:34:28.962 7791-8164/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-5正在寫入資料...
12-03 16:34:28.962 7791-8165/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-6正在寫入資料...
12-03 16:34:28.962 7791-8166/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-7正在寫入資料...
12-03 16:34:28.962 7791-8167/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-8正在寫入資料...
12-03 16:34:34.140 7791-8165/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-6寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:34:34.141 7791-8167/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-8寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:34:34.141 7791-8164/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-5寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:34:34.141 7791-8166/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-7寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:34:34.146 7791-8166/com.example.cn.desigin E/ConcurrentActivity: 當前執行緒Thread-7
12-03 16:34:34.146 7791-8165/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:34:34.146 7791-8166/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:34:34.147 7791-8164/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:34:34.147 7791-8167/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...


四個執行緒執行完了後,會去執行傳遞進去的 runnable,四個執行緒哪個先執行完,就用哪條執行緒執行 runnable 回撥,看到這,我們明白了,和 CountDownLatch 的功能類似,如果僅僅是這樣,就太小看 CyclicBarrier 了, CountDownLatch 是一次性的,而 CyclicBarrier 是可以迴圈使用的。 如下,新增新的執行緒。由於android 的 ANR 機制,我們開啟一個子執行緒,在子執行緒中呼叫 testCyclicBarrier() 方法。

    private void testCyclicBarrier() {
        int N = 4;
        Runnable runable = new Runnable() {
            @Override
            public void run() {
                Log.e(TAG, "當前執行緒" + Thread.currentThread().getName());
            }
        };
        CyclicBarrier barrier = new CyclicBarrier(N, runable);
        for (int i = 0; i < N; i++) {
            new CyclicBarrierThread(barrier).start();
        }

        try {
            Thread.sleep(25000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Log.e(TAG, "CyclicBarrier重用");

        for (int i = 0; i < N; i++) {
            new CyclicBarrierThread(barrier).start();
        }
    }

列印結果為:

12-03 16:45:12.801 18898-19075/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-9正在寫入資料...
12-03 16:45:12.801 18898-19074/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-8正在寫入資料...
12-03 16:45:12.801 18898-19073/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-7正在寫入資料...
12-03 16:45:12.801 18898-19072/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-6正在寫入資料...
12-03 16:45:17.804 18898-19073/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-7寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:17.804 18898-19072/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-6寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:17.805 18898-19075/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-9寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:17.805 18898-19074/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-8寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:17.805 18898-19074/com.example.cn.desigin E/ConcurrentActivity: 當前執行緒Thread-8
12-03 16:45:17.806 18898-19074/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:45:17.806 18898-19073/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:45:17.806 18898-19072/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:45:17.806 18898-19075/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:45:37.801 18898-19071/com.example.cn.desigin E/ConcurrentActivity: CyclicBarrier重用
12-03 16:45:37.802 18898-19469/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-10正在寫入資料...
12-03 16:45:37.803 18898-19470/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-11正在寫入資料...
12-03 16:45:37.803 18898-19471/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-12正在寫入資料...
12-03 16:45:37.806 18898-19472/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-13正在寫入資料...
12-03 16:45:42.803 18898-19469/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-10寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:42.803 18898-19470/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-11寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:42.804 18898-19471/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-12寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:42.806 18898-19472/com.example.cn.desigin E/ConcurrentActivity: 執行緒Thread-13寫入資料完畢,等待其他執行緒寫入完畢
12-03 16:45:42.807 18898-19472/com.example.cn.desigin E/ConcurrentActivity: 當前執行緒Thread-13
12-03 16:45:42.807 18898-19472/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:45:42.807 18898-19469/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:45:42.808 18898-19470/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...
12-03 16:45:42.810 18898-19471/com.example.cn.desigin E/ConcurrentActivity: 所有執行緒寫入完畢,繼續處理其他任務...


併發這一塊,能涉及的地方可以很深,我們只是在這稍微涉及點皮毛,如果想深入瞭解,建議閱讀原始碼和相關書籍。最後,我們在說一下notify,一個相關的話題是,為了喚醒正在等待的執行緒,你應該使用notfiy還是notifyAll。一種常見的說法是,你總是應該使用notifyAll。這是合理而保守的建議。它總會產生正確的結果,因為它可以保證你將會喚醒所有需要被喚醒的執行緒你可能也會喚醒其他一些執行緒,但是這不會影響程式的正確性,這些執行緒醒來之後,會檢查他們正在等待的條件如果發現條件並不滿足,就會繼續等待。從優化的角度來看,如果處於等待狀態的所有執行緒都在等待同一個條件,而每次只有一個執行緒可以從這個條件中被喚醒,那麼你應該選擇呼叫notify,而不是notifyAll。即使這些條件都是真的,也許還是有理由使用notifyAll而不是notify。就好像把wait呼叫放在一個迴圈中,以避免在公有可訪問物件上的意外或惡意的通知一樣,與此類似,使用notifyAll代替notify可以避免來自不想關執行緒的意外或惡意的等待。否則,這樣的等待會“吞掉”一個關鍵的通知,使真正的接收執行緒無限的等待下去。

沒有理由在新程式碼中使用wait和notify,即使有、也是極少的。你應該優先使用notifyAll,而不是使用notify。如果使用notify,請一定要小心,以確保程式的活性。