1. 程式人生 > >Java高併發程式設計:同步工具類

Java高併發程式設計:同步工具類

內容摘要

這裡主要介紹了java5中執行緒鎖技術以外的其他同步工具,首先介紹Semaphore:一個計數訊號量。用於控制同時訪問資源的執行緒個數,CyclicBarrier同步輔助類:從字面意思看是路障,這裡用於執行緒之間的相互等待,到達某點後,繼續向下執行。CountDownLatch同步輔助類:在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。猶如倒計時計數器,然後是Exchanger:實現兩個物件之間資料交換,可阻塞佇列:ArrayBlockingQueue,通過阻塞佇列間的通訊來演示其作用,最後介紹了幾個同步集合。

1. Semaphore實現訊號燈

Semaphore可以維護當前訪問自身的執行緒個數,並提供了同步機制,使用Semaphore可以控制同時訪問資源的執行緒個數,例如,實現一個檔案允許的併發訪問數。Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。

Semaphore實現的功能就像:銀行辦理業務,一共有5個視窗,但一共有10個客戶,一次性最多有5個客戶可以進行辦理,其他的人必須等候,當5個客戶中的任何一個離開後,在等待的客戶中有一個人可以進行業務辦理。

Semaphore提供了兩種規則:

  • 一種是公平的:獲得資源的先後,按照排隊的先後。在建構函式中設定true實現
  • 一種是野蠻的:誰有本事搶到資源,誰就可以獲得資源的使用權。

與傳統的互斥鎖的異同:

單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖“,再由另外一個執行緒釋放”鎖“,這可以應用於死鎖恢復的一些場合。

應用場景:共享資源的爭奪,例如遊戲中選手進入房間的情況。

import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Semaphore;  

public class SemaphoreTest {  
    public static void main(String[] args) {  

  //建立一個可根據需要建立新執行緒的執行緒池  
  ExecutorService service = Executors.newCachedThreadPool();  
        final
Semaphore sp = new Semaphore(3);   //建立10個執行緒   for(int i=0;i<10;i++){ Runnable runnable = new Runnable(){ public void run(){ try { sp.acquire(); //獲取燈,即許可權 } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("執行緒" + Thread.currentThread().getName() + "進入,當前已有" + (3-sp.availablePermits()) + "個併發"); try { Thread.sleep((long)(Math.random()*10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("執行緒" + Thread.currentThread().getName() + "即將離開"); sp.release(); // 釋放一個許可,將其返回給訊號量 //下面程式碼有時候執行不準確,因為其沒有和上面的程式碼合成原子單元 System.out.println("執行緒" + Thread.currentThread().getName() + "已離開,當前已有" + (3-sp.availablePermits()) + "個併發"); } }; service.execute(runnable); } } }

輸出結果

執行緒pool-1-thread-3進入,當前已有3個併發
執行緒pool-1-thread-2進入,當前已有3個併發
執行緒pool-1-thread-1進入,當前已有3個併發
執行緒pool-1-thread-2即將離開
執行緒pool-1-thread-2已離開,當前已有2個併發
執行緒pool-1-thread-5進入,當前已有3個併發
執行緒pool-1-thread-1即將離開
執行緒pool-1-thread-1已離開,當前已有2個併發
執行緒pool-1-thread-4進入,當前已有3個併發
執行緒pool-1-thread-4即將離開
執行緒pool-1-thread-4已離開,當前已有2個併發
執行緒pool-1-thread-8進入,當前已有3個併發
執行緒pool-1-thread-3即將離開
執行緒pool-1-thread-7進入,當前已有3個併發
執行緒pool-1-thread-3已離開,當前已有3個併發
執行緒pool-1-thread-8即將離開
執行緒pool-1-thread-8已離開,當前已有2個併發
執行緒pool-1-thread-9進入,當前已有3個併發
執行緒pool-1-thread-7即將離開
執行緒pool-1-thread-7已離開,當前已有2個併發
執行緒pool-1-thread-6進入,當前已有3個併發
執行緒pool-1-thread-9即將離開
執行緒pool-1-thread-9已離開,當前已有2個併發
執行緒pool-1-thread-10進入,當前已有3個併發
執行緒pool-1-thread-5即將離開
執行緒pool-1-thread-5已離開,當前已有2個併發
執行緒pool-1-thread-6即將離開
執行緒pool-1-thread-6已離開,當前已有1個併發
執行緒pool-1-thread-10即將離開
執行緒pool-1-thread-10已離開,當前已有0個併發

控制一個方法的併發量,比如同時只能有3個執行緒進來

public class ThreadPoolTest {
    //訊號量
    private static Semaphore semaphore = new Semaphore(3);//允許個數,相當於放了3把鎖

    public static void main(String[] args) {

        for(int i=0;i<10;i++){

            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        method();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }).start();
        }

    }


    //同時最多隻允許3個執行緒過來
    public static void method() throws InterruptedException{
        semaphore.acquire();//獲取一把鎖
        System.out.println("ThreadName="+Thread.currentThread().getName()+"過來了");

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("ThreadName="+Thread.currentThread().getName()+"出去了");
        semaphore.release();//釋放一把鎖
    }
}

輸出結果

ThreadName=Thread-1過來了
ThreadName=Thread-4過來了
ThreadName=Thread-0過來了
ThreadName=Thread-1出去了
ThreadName=Thread-4出去了
ThreadName=Thread-2過來了
ThreadName=Thread-3過來了
ThreadName=Thread-0出去了
ThreadName=Thread-5過來了
ThreadName=Thread-3出去了
ThreadName=Thread-2出去了
ThreadName=Thread-6過來了
ThreadName=Thread-7過來了
ThreadName=Thread-5出去了
ThreadName=Thread-9過來了
ThreadName=Thread-7出去了
ThreadName=Thread-6出去了
ThreadName=Thread-8過來了
ThreadName=Thread-9出去了
ThreadName=Thread-8出去了

三個執行緒a、b、c 併發執行,b,c 需要a 執行緒的資料怎麼實現

根據問題的描述,我將問題用以下程式碼演示,ThreadA、ThreadB、ThreadC,ThreadA 用於初始化資料num,
只有當num 初始化完成之後再讓ThreadB 和ThreadC 獲取到初始化後的變數num。

分析過程如下:

考慮到多執行緒的不確定性,因此我們不能確保ThreadA 就一定先於ThreadB 和ThreadC 前執行,就算ThreadA先執行了,我們也無法保證ThreadA 什麼時候才能將變數num 給初始化完成。因此我們必須讓ThreadB 和ThreadC去等待ThreadA 完成任何後發出的訊息。

現在需要解決兩個難題,一是讓ThreadB 和ThreadC 等待ThreadA 先執行完,二是ThreadA 執行完之後給
ThreadB 和ThreadC 傳送訊息。

解決上面的難題我能想到的兩種方案,一是使用純Java API 的Semaphore 類來控制執行緒的等待和釋放,二是使用Android 提供的Handler 訊息機制

public class ThreadCommunication {
    private static int num;//定義一個變數作為資料

    public static void main(String[] args) {
        Thread threadA = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    //模擬耗時操作之後初始化變數num
                    Thread.sleep(1000);
                    num = 1;

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread threadB = new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()
                        +"獲取到num 的值為:"+num);
            }
        });
        Thread threadC = new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()
                        +"獲取到num 的值為:"+num);
            }
        });
        //同時開啟3 個執行緒
        threadA.start();
        threadB.start();
        threadC.start();

    }
}
public class ThreadCommunication {
    private static int num;
    /**
     * 定義一個訊號量,該類內部維持了多個執行緒鎖,可以阻塞多個執行緒,釋放多個執行緒,
     * 執行緒的阻塞和釋放是通過permit 概念來實現的執行緒通過semaphore.acquire()方法獲取permit,
     * 如果當前semaphore 有permit 則分配給該執行緒,如果沒有則阻塞該執行緒直到semaphore
     * 呼叫release()方法釋放permit。建構函式中引數:permit(允許) 個數
     */
    private static Semaphore semaphore = new Semaphore(0);
    public static void main(String[] args) {

        Thread threadA = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    //模擬耗時操作之後初始化變數num
                    Thread.sleep(1000);
                    num = 1;
                    //初始化完引數後釋放兩個permit
                    semaphore.release(2);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread threadB = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    //獲取permit,如果semaphore 沒有可用的permit 則等待
                    // 如果有則消耗一個
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()
                        +"獲取到num 的值為:"+num);
            }
        });
        Thread threadC = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    //獲取permit,如果semaphore 沒有可用的permit 則等待
                    // 如果有則消耗一個
                    semaphore.acquire();
                } catch (InterruptedException e) {

                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()
                        +"獲取到num 的值為:"+num);
            }
        });
        //同時開啟3 個執行緒
        threadA.start();
        threadB.start();
        threadC.start();

    }
}

2. CyclicBarrier

一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。

CyclicBarrier 支援一個可選的 Runnable 命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作 很有用。

3個執行緒到達某個集合點後再向下執行,使用await方法實現

import java.util.concurrent.CyclicBarrier;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  

public class CyclicBarrierTest {  

    public static void main(String[] args) {  
        ExecutorService service = Executors.newCachedThreadPool();  
        final  CyclicBarrier cb = new CyclicBarrier(3);  
        for(int i=0;i<3;i++){  
            Runnable runnable = new Runnable(){  
                    public void run(){  
                    try {  
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("執行緒" + Thread.currentThread().getName() +   
                                "即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));                         
                        cb.await();//在所有參與者都已經在此 barrier 上呼叫 await 方法之前,將一直等待。  

                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("執行緒" + Thread.currentThread().getName() +   
                                "即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));  
                        cb.await();   
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("執行緒" + Thread.currentThread().getName() +   
                                "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));                       
                        cb.await();                       
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }                 
                }  
            };  
            service.execute(runnable);  
        }  
        service.shutdown();  
    }  
}  

輸出結果

執行緒pool-1-thread-3即將到達集合地點1,當前已有1個已經到達,正在等候
執行緒pool-1-thread-1即將到達集合地點1,當前已有2個已經到達,正在等候
執行緒pool-1-thread-2即將到達集合地點1,當前已有3個已經到達,都到齊了,繼續走啊
執行緒pool-1-thread-1即將到達集合地點2,當前已有1個已經到達,正在等候
執行緒pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候
執行緒pool-1-thread-3即將到達集合地點2,當前已有3個已經到達,都到齊了,繼續走啊
執行緒pool-1-thread-3即將到達集合地點3,當前已有1個已經到達,正在等候
執行緒pool-1-thread-1即將到達集合地點3,當前已有2個已經到達,正在等候
執行緒pool-1-thread-2即將到達集合地點3,當前已有3個已經到達,都到齊了,繼續走啊

3. CountDownLatch

一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。猶如倒計時計數器,呼叫CountDownLatch物件的countDown方法就將計數器減1,當計數到達0時,則所有等待者或單個等待者開始執行。

可以實現一個人(也可以是多個人)等待其他所有人都來通知他,也可以實現一個人通知多個人的效果,類似裁判一聲口令,運動員開始奔跑(一對多),或者所有運送員都跑到終點後裁判才可以公佈結果(多對一)。

用指定的計數 初始化 CountDownLatch。在呼叫 countDown() 方法之前,await 方法會一直受阻塞。之後,會釋放所有等待的執行緒,await 的所有後續呼叫都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

實現運動員比賽的效果

import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.CyclicBarrier;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  

public class CountdownLatchTest {  

    public static void main(String[] args) {  

  ExecutorService service = Executors.newCachedThreadPool();  
    
  //構造一個用給定計數初始化的 CountDownLatch,相當於裁判的口哨  
  final CountDownLatch cdOrder = new CountDownLatch(1);  
    
  //相當於定義3個執行員  
        final CountDownLatch cdAnswer = new CountDownLatch(3);  
        for (int i = 0; i < 3; i++) {  
            Runnable runnable = new Runnable() {  
                public void run() {  
                    try {  
                        System.out.println("執行緒"  
                                + Thread.currentThread().getName() + "正準備接受命令");  

                        // 等待發令槍  
                        cdOrder.await();//使當前執行緒在鎖存器倒計數至零之前一直等待  

                        System.out.println("執行緒"  
                                + Thread.currentThread().getName() + "已接受命令");  
                        Thread.sleep((long) (Math.random() * 10000));  
                        System.out  
                                .println("執行緒"  
                                        + Thread.currentThread().getName()  
                                        + "迴應命令處理結果");  

                        // 各個運動員完報告成績之後,通知裁判  
                        cdAnswer.countDown();//遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒  

                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }  
                }  
            };  
            service.execute(runnable);  
        }  
        try {  
            Thread.sleep((long) (Math.random() * 10000));  

            System.out.println("執行緒" + Thread.currentThread().getName()  
                    + "即將釋出命令");  
            // 發令槍打響,比賽開始  
            cdOrder.countDown();  

            System.out.println("執行緒" + Thread.currentThread().getName()  
                    + "已傳送命令,正在等待結果");  

            // 裁判等待各個運動員的結果  
            cdAnswer.await();  

            // 裁判公佈獲得所有運動員的成績  
            System.out.println("執行緒" + Thread.currentThread().getName()  
                    + "已收到所有響應結果");  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
        service.shutdown();  

    }  
}  

輸出結果

執行緒pool-1-thread-2正準備接受命令
執行緒pool-1-thread-3正準備接受命令
執行緒pool-1-thread-1正準備接受命令
執行緒main即將釋出命令
執行緒main已傳送命令,正在等待結果
執行緒pool-1-thread-1已接受命令
執行緒pool-1-thread-2已接受命令
執行緒pool-1-thread-3已接受命令
執行緒pool-1-thread-1迴應命令處理結果
執行緒pool-1-thread-3迴應命令處理結果
執行緒pool-1-thread-2迴應命令處理結果
執行緒main已收到所有響應結果

4. Exchanger

用於實現兩個物件之間的資料交換,每個物件在完成一定的事務後想與對方交換資料,第一個先拿出資料的物件將一直等待第二個物件拿著資料到來時,彼此才能交換資料。

方法:exchange(V x)

等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。

應用:使用 Exchanger 線上程間交換緩衝區

示例:模擬毒品交易情景

import java.util.concurrent.Exchanger;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  

public class ExchangerTest {  

    public static void main(String[] args) {  
        ExecutorService service = Executors.newCachedThreadPool();  
        final Exchanger exchanger = new Exchanger();  
        service.execute(new Runnable(){  
            public void run() {  
                try {                 

                    String data1 = "毒品";  
                    System.out.println("執行緒" + Thread.currentThread().getName() +   
                    "正在把: " + data1 +"   交易出去");  
                    Thread.sleep((long)(Math.random()*10000));  
                    String data2 = (String)exchanger.exchange(data1);  
                    System.out.println("執行緒" + Thread.currentThread().getName() +   
                    "換得了: " + data2);  
                }catch(Exception e){  

                }  
            }     
        });  
        service.execute(new Runnable(){  
            public void run() {  
                try {                 

                    String data1 = "美金";  
                    System.out.println("執行緒" + Thread.currentThread().getName() +   
                    "正在把: " + data1 +"   交易出去");  
                    Thread.sleep((long)(Math.random()*10000));                    
                    String data2 = (String)exchanger.exchange(data1);  
                    System.out.println("執行緒" + Thread.currentThread().getName() +   
                    "換得了: " + data2);  
                }catch(Exception e){  

                }                 
            }     
        });       
    }  
}  
執行緒pool-1-thread-1正在把: 毒品   交易出去
執行緒pool-1-thread-2正在把: 美金   交易出去
執行緒pool-1-thread-1換得了: 美金
執行緒pool-1-thread-2換得了: 毒品

5. ArrayBlockingQueue

一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列包含固定長度的佇列和不固定長度的佇列。

這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。

通俗的講:當指定佇列大小,如果已經放滿,其他存入資料的執行緒就阻塞,等著該佇列中有空位,才能放進去。當取的比較快,佇列中沒有資料,取資料的執行緒阻塞,等佇列中放入了資料,才可以取。

ArrayBlockingQueue中只有put和take方法才具有阻塞功能。方法型別如下

功能 丟擲異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用


示例:用3個空間的佇列來演示向阻塞佇列中存取資料的效果

package cn.xushuai.thread;  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  

public class BlockingQueueTest {  
    public static void main(String[] args) {  
        final BlockingQueue queue = new ArrayBlockingQueue(3);  
        for(int i=0;i<2;i++){  
            new Thread(){  
                public void run(){  
                    while(true){  
                        try {  
                            Thread.sleep((long)(Math.random()*1000));  
                            System.out.println(Thread.currentThread().getName() + "準備放資料!");                              
                            queue.put(1);   //放進去後,可能立即執行“準備取資料”  
                            System.out.println(Thread.currentThread().getName() + "已經放了資料," +                             
                                        "佇列目前有" + queue.size() + "個數據");  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  

                    }  
                }  

            }.start();  
        }  

        new Thread(){  
            public void run(){  
                while(true){  
                    try {  
                        //將此處的睡眠時間分別改為100和1000,觀察執行結果  
                        Thread.sleep(1000);  
                        System.out.println(Thread.currentThread().getName() + "準備取資料!");  
                        queue.take();   //取出後可能來不及執行下面的列印語句,就跑到了“準備放資料”,  
                        System.out.println(Thread.currentThread().getName() + "已經取走資料," +                             
                                "佇列目前有" + queue.size() + "個數據");                      
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            }  
        }.start();            
    }  
}  
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有1個數據
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有2個數據
Thread-1準備放資料!
Thread-1已經放了資料,佇列目前有3個數據
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有3個數據
Thread-0準備放資料!
Thread-1準備放資料!
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-0已經放了資料,佇列目前有3個數據
Thread-0準備放資料!
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-1已經放了資料,佇列目前有3個數據
Thread-1準備放資料!
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有2個數據
Thread-0已經放了資料,佇列目前有3個數據
Thread-0準備放資料!
Thread-2準備取資料!
...

6. 阻塞佇列間的通訊

A佇列向空間中存資料,B從空間裡取資料,A存入後,通知B去取,B取過之後,通知A去放,依次迴圈

示例:子執行緒先迴圈10次,接著主執行緒迴圈100次,接著又回到子執行緒,迴圈10次,再回到主執行緒又迴圈100,如此迴圈50次。

說明:這裡通過使 用兩個具有1個空間的佇列來實現同步通知的功能(實現了鎖和condition的功能),以便實現佇列間的通訊,其中使用到了構造程式碼塊為主佇列先存入一個數據,以使其先阻塞,子佇列先執行。

使用構造程式碼塊的原因:

成員變數在建立類的例項物件時,才分配空間,才能有值,所以建立一個構造方法來給main_quene賦值,這裡不可以使用靜態程式碼塊,因為靜態在還沒建立物件就存在, 而sub_quene和main_quene是物件建立以後的成員變數,所以這裡用匿名構造方法,它的執行時期在任何構造方法之前,建立幾個物件就執行幾次

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  

public class BlockingQueueCommunication {  
    public static void main(String[] args){  

        final Business business = new Business();  

        new Thread(new Runnable(){  
            @Override  
            public void run() {  
                for(int i=1;i<=50;i++){  
                    business.sub(i);  
                }  
            }  
        }).start();  

            //主執行緒外部迴圈  
            for(int i=1;i<=50;i++){  
                business.main(i);  
          }  
       }  

    //業務類  
    static class Business{  

        BlockingQueue<Integer> sub_quene = new ArrayBlockingQueue<Integer>(1);  
        BlockingQueue<Integer> main_quene = new ArrayBlockingQueue<Integer>(1);  

        {  
  //為了讓子佇列先走,所以在一開始就往主佇列中存入一個物件,使其阻塞。  
            try {  
                main_quene.put(1);    
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }         
        }  

        //子佇列先走       
        public  void sub(int i){  

            try {  
                sub_quene.put(1);   //子佇列第一次存入,可以執行,但由於只有1個空間,已經存滿,所以只有在執行後要等到take之後才能繼續下次執行  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
            //子佇列迴圈執行  
            for(int j=1;j<=10;j++){  
                System.out.println("sub thread sequence of"+i+",loop of "+j);  
            }  
            try {  
                main_quene.take();  //讓主佇列從已經填滿的佇列中取出資料,使其開始第一次執行  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  

        public void main(int i){  

            try {  
                main_quene.put(1);  //主佇列先前放過1個空間,現在處於阻塞狀態,等待子佇列通知,即子執行緒中的main_quene.take();   
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  

  //主佇列迴圈執行  
            for(int j=1;j<=100;j++){  
                System.out.println("main thread sequence of"+i+", loop of "+j);  
            }     
            try {  
                sub_quene.take(); //讓子佇列從已經填滿的佇列中取出資料,使其執行  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }     
    }  
   }  

7. 同步集合類

7.1 同步Map集合

  • java.util.concurrent.ConcurrentMap
  • ConcurrentHashMap
  • ConcurrentNavigableMap
  • ConcurrentSkipListMap

ConcurrentHashMap

同步的HashMap,支援獲取的完全併發和更新的所期望可調整併發的雜湊表。此類遵守與 Hashtable 相同的功能規範,並且包括對應於 Hashtable 的每個方法的方法版本。

不過,儘管所有操作都是執行緒安全的,但獲取操作不 必鎖定,並且不 支援以某種防止所有訪問的方式鎖定整個表。此類可以通過程式完全與 Hashtable 進行互操作,這取決於其執行緒安全,而與其同步細節無關。

內部原理:

其實內部使用了代理模式,你給我一個HashMap,我就給你一個同步的HashMap。同步的HashMap在呼叫方法時,是去分配給原始的HashMap只是在去呼叫方法的同時加上了Synchronized,以此實現同步效果

ConcurrentHashMap是執行緒安全的HashMap的實現,預設構造同樣有initialCapacity和loadFactor屬性,不過還多了一個concurrencyLevel屬性,三屬性預設值分別為16、0.75及16。其內部使用鎖分段技術,維持這鎖Segment的陣列,在Segment陣列中又存放著Entity[]陣列,內部hash演算法將資料較均勻分佈在不同鎖中。

  • put(key , value)

並沒有在此方法上加上synchronized,首先對key.hashcode進行hash操作,得到key的hash值。hash操作的演算法和map也不同,根據此hash值計算並獲取其對應的陣列中的Segment物件(繼承自ReentrantLock),接著呼叫此Segment物件的put方法來完成當前操作。

ConcurrentHashMap基於concurrencyLevel劃分出了多個Segment來對key-value進行儲存,從而避免每次put操作都得鎖住整個陣列。在預設的情況下,最佳情況下可允許16個執行緒併發無阻塞的操作集合物件,儘可能地減少併發時的阻塞現象。

  • get(key)

首先對key.hashCode進行hash操作,基於其值找到對應的Segment物件,呼叫其get方法完成當前操作。而Segment的get操作首先通過hash值和物件陣列大小減1的值進行按位與操作來獲取陣列上對應位置的HashEntry。在這個步驟中,可能會因為物件陣列大小的改變,以及陣列上對應位置的HashEntry產生不一致性,那麼ConcurrentHashMap是如何保證的?

物件陣列大小的改變只有在put操作時有可能發生,由於HashEntry物件陣列對應的變數是volatile型別的,因此可以保證如HashEntry物件陣列大小發生改變,讀操作可看到最新的物件陣列大小。

在獲取到了HashEntry物件後,怎麼能保證它及其next屬性構成的連結串列上的物件不會改變呢?這點ConcurrentHashMap採用了一個簡單的方式,即HashEntry物件中的hash、key、next屬性都是final的,這也就意味著沒辦法插入一個HashEntry物件到基於next屬性構成的連結串列中間或末尾。這樣就可以保證當獲取到Hash