1. 程式人生 > >Java多執行緒與併發(三)

Java多執行緒與併發(三)

  • Condition等待和喚醒

在我們的並行程式中,避免不了某些寫成要預先規定好的順序執行,例如:先新增後修改,先買後賣,先進後出,對於這些場景,使用JUC的Conditon物件再合適不過了。

JUC中提供了Condition物件,用於讓指定執行緒等待與喚醒,按預期順序執行。它必須和ReentrantLock重入鎖配合使用

Condition用於代替wait()/notify()方法
wait和notify是屬於Object的,可以對執行緒喚醒(notify只能隨機喚醒等待的執行緒,而Condition可以喚醒指定執行緒,這有利於更好的控制併發程式)
Condition核心方法:
await():阻塞當前執行緒,直到signal喚醒
signal():喚醒被await的執行緒,從中斷處繼續執行
signalAll():喚醒所有被await阻塞的執行緒(不太常用)

通過使用ReentrantLock和Condition的使用讓執行緒有順序的執行(有規劃的、等待、喚醒的過程)

這裡寫圖片描述

程式碼案例:

public class ConditionSample {
    public static void main(String[] args) {
        final ReentrantLock lock = new ReentrantLock();//Condition必須配合ReentrantLock來使用
        final Condition c1 = lock.newCondition();//建立Condition
        final Condition c2 = lock
.newCondition(); final Condition c3 = lock.newCondition(); new Thread(new Runnable() { public void run() { lock.lock();//加鎖 try { c1.await();//阻塞當前執行緒,只有呼叫c1.signal的時候,執行緒繼續啟用執行 Thread.sleep(1000); System.out
.println("粒粒皆幸苦"); } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//釋放 } } }).start(); new Thread(new Runnable() { public void run() { lock.lock();//加鎖 try { c2.await();//阻塞當前執行緒,只有呼叫c1.signal的時候,執行緒繼續啟用執行 Thread.sleep(1000); System.out.println("誰知盤中餐"); c1.signal();//執行緒t1喚醒繼續執行 } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//釋放 } } }).start(); new Thread(new Runnable() { public void run() { lock.lock();//加鎖 try { c3.await();//阻塞當前執行緒,只有呼叫c1.signal的時候,執行緒繼續啟用執行 Thread.sleep(1000); System.out.println("汗滴禾下土"); c2.signal();//執行緒t2喚醒繼續執行 } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//釋放 } } }).start(); new Thread(new Runnable() { public void run() { lock.lock();//加鎖 try { Thread.sleep(1000); System.out.println("鋤禾日當午"); c3.signal();//t3執行緒繼續執行 } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//釋放 } } }).start(); } }
  • JUC之Callable和Future
    Callable是一個介面,它和Runnable一樣代表著任務,區別在於Callable有返回值並且可以丟擲異常
    Future也是一個藉口,它用於非同步計算的結果,提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果,接受Callable返回的方法
    案例:輸出1000以內的質數
public class FutureCallableSample {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        //建立執行緒池
        for(int i = 2;i<=10000 ;i++){
            Computer c = new Computer();
            c.setNum(i);

            //Future是對用於計算的執行緒的監聽,因為計算是在其它執行緒中進行的,所以這個返回結果是非同步的

            Future<Boolean>  fu = threadPool.submit(c);//將c物件提交給執行緒池,如果有空閒執行緒立即執行call方法
            try {
                Boolean result = fu.get();//用於獲取返回值,如果執行緒內部的call沒有計算完畢,則進入等待狀態,直到計算完成
                if(result){
                    System.out.println(c.getNum());
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        threadPool.shutdown();
    }

}
  • JUC之併發容器

這裡寫圖片描述

如何保證既能執行緒安全,又可以有一定效率
執行緒安全—-併發容器
ArrayList —-CopyOnWriteArrayList —寫複製列表
HashSet —–CopyOnWriteArraySet —寫複製集合
HashMap —-ConcurrentHashMap —分段鎖對映

public class CopyOnWriteArrayListSample {
    public static void main(String[] args) {
        List list = new ArrayList();
        for(int i =0; i<1000;i++){
            list.add(i);
        }
        Iterator it = list.iterator();
        while(it.hasNext()){
            Integer i = (Integer)it.next();
            list.remove(i);
        }
        System.out.println(list);
    }
}

程式碼會報錯:

Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.util.ArrayList$Itr.next(Unknown Source)
    at com.wanghaoxin.threadpool.CopyOnWriteArrayListSample.main(CopyOnWriteArrayListSample.java:15)

對於ArrayList預設採用連續儲存,但是會有併發問題,邊讀取邊刪除產生異常

CopyOnWriteArrayList採用即可解決這個問題:

這裡寫圖片描述

public class CopyOnWriteArrayListSample {
    public static void main(String[] args) {
/*      List list = new ArrayList();
        for(int i =0; i<1000;i++){
            list.add(i);
        }
        Iterator it = list.iterator();
        while(it.hasNext()){
            Integer i = (Integer)it.next();
            list.remove(i);
        }
        System.out.println(list);*/
        //寫複製列表
        List list = new CopyOnWriteArrayList();
        for(int i =0; i<1000;i++){
            list.add(i);
        }
        Iterator it = list.iterator();
        while(it.hasNext()){
            Integer i = (Integer)it.next();
            list.remove(i);
        }
        System.out.println(list);
    }
}

寫複製列表:CopyOnWriteArrayList併發原理:
它通過副本解決併發問題
多個執行緒各執一份副本,採用副本的方式,再複製的過程中,不同執行緒是同步的
檢視原始碼得知:採用可重入鎖

    public void add(int index, E element) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            if (index > len || index < 0)
                throw new IndexOutOfBoundsException("Index: "+index+
                                                    ", Size: "+len);
            Object[] newElements;
            int numMoved = len - index;
            if (numMoved == 0)
                newElements = Arrays.copyOf(elements, len + 1);
            else {
                newElements = new Object[len + 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index, newElements, index + 1,
                                 numMoved);
            }
            newElements[index] = element;
            setArray(newElements);
        } finally {
            lock.unlock();
        }
    }

ConcurrentHashMap
Segment——分段鎖,提高了批量同步的效能
HashMap執行緒不安全的,輸出值總是小於5000,但是如果程式碼中修改為HashTable則輸出值為5000(HashTable中所有方法都是synchronized 只允許同一時間同一執行緒修改)
為了解決效率低下的問題,產生了ConcurrentHashMap,輸出值始終為5000—表示執行緒安全

public class ConcurrentHashMapSample {

    private static int user = 100;//同時模擬的併發使用者訪問數量 --為1的話看不出來效果
    //private static int user = 10;
    private static int dowloadCounts = 5000;//使用者的真實下載數
    private static HashMap count= new HashMap();//計數器

    public static void main(String[] args) {
        //排程器,jdk1.5之後引入current對於併發的支援

        ExecutorService executor = Executors.newCachedThreadPool();
        //訊號量 ,用於模擬併發使用者數
        final Semaphore semaphore = new Semaphore(user);
        for(int i =0;i<dowloadCounts ;i++){
            final int index = i;
            //通過多執行緒模擬多個使用者訪問的下載次數
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        semaphore.acquire();
                        count.put(index, index);
                        semaphore.release();
                    }catch(Exception e){
                        e.printStackTrace();
                    }

                }
            });
        }
        try {
            //延遲主執行緒結束--讓for迴圈中程式碼執行完畢
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count.size());
    }
}

ConcurrentHashMap實現原理:
採用分段鎖Segment機制
HashTable:所有操作方法都是同步的,其他執行緒必須等待
這樣效率很低

ConcurrentHashMap把區域分為一個個的很小區域,segment,不同執行緒訪問不同的資料,只要不是同一段內都可以操作,但是如果操作的資料在同一個段內,這樣需要執行緒排隊

這裡寫圖片描述

這樣效率很快,(極限)16倍,把原來的儲存空間進行分段加鎖處理,段的長度都是2的n次方

  • *JUC之Atomic餘CAS演算法(樂觀鎖)
    原子性:是指一個操作或多個操作要麼全部執行,且執行的過程不會被任何因素打斷,要麼就都不執行

Atomic包是java.util.concurrent下的另一個專門為執行緒安全設計的java包,包含多個原子操作類
Atomic包:
這裡寫圖片描述

程式碼:

public class AtomicIntegerSample {


    private static int user = 10;//同時模擬的併發使用者訪問數量
    //private static int user = 10;
    private static int dowloadCounts = 5000;//使用者的真實下載數
    private static AtomicInteger count= new AtomicInteger(0);

    public static void main(String[] args) {
        //排程器,jdk1.5之後引入current對於併發的支援

        ExecutorService executor = Executors.newCachedThreadPool();
        //訊號量 ,用於模擬併發使用者數
        final Semaphore semaphore = new Semaphore(user);
        for(int i =0;i<dowloadCounts ;i++){
            //通過多執行緒模擬多個使用者訪問的下載次數
            executor.execute(new Runnable() {

                @Override
                public void run() {
                    try{
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    }catch(Exception e){
                        e.printStackTrace();
                    }

                }
            });
        }
        try {
            //延遲主執行緒結束--讓for迴圈中程式碼執行完畢
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }
/*  private synchronized static void add(){
        count++;
    }*/
    private  static void add(){
        count.getAndIncrement();//count++
    }
    //此處並沒有用synchronized和鎖機制


}

CAS演算法:
鎖是用來做併發的最簡單的機制,當然其代價也是很高的,獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的情況,並且只有在確保其它執行緒不會造成干擾的情況下執行,會導致所有需要鎖的執行緒颳起,等待持有鎖的執行緒釋放鎖

所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操作,如果因為衝突失敗就重試,直到成功為止。
其中CAS(比較與交換 Compare And Swap ),是一種有名的無鎖演算法

這裡寫圖片描述

比較的是期望值與實際操作的結果

Atomic的應用場景:
雖然基於CAS的執行緒安全機制很好很搞笑,但是要說的是,並非所有執行緒安全都可以用這樣的方法來實現,這隻適合於一些鎖粒度比較小型:例如計數器這樣的需求用起來才更有效果,否則也不會有鎖的存在了

對於大量資料操作反而會損耗效能,因為做了很多次

  • 總結

這裡寫圖片描述