1. 程式人生 > >《JAVA併發程式設計實戰》基礎構建模組

《JAVA併發程式設計實戰》基礎構建模組

文章目錄

同步容器類

同步容器類包括Vector和hashtable,還包括由Collections.synchronizedXxx等工廠方法建立的封裝器類。這些類實現執行緒安全的方式是:將他們的狀態封裝起來,並對每個公有方法進行同步,使得每次只有一個執行緒能訪問容器的狀態。

同步容器類的問題

同步容器類都是執行緒安全的,但在某些情況下可能需要額外的客戶端加鎖來保護複合操作。包括:迭代、跳轉、條件運算(例如:若沒有則新增)。

在同步容器類中,這些複合操作在沒有客戶端加鎖的情況下仍然是執行緒安全的,但當其他執行緒併發的修改容器時,他們可能會表現出意料之外的行為。

例如:

public static Object getLast(Vector list) {
    int lastIndex = list.size() - 1;
    return list.get(lastIndex);
}
 
public static void deleteLast(Vector list) {
    int lastIndex = list.size() - 1;
    list.remove(lastIndex);
}

在多執行緒呼叫時就會出錯。

我們可以採用給客戶端加鎖的方式

public static Object getLast
(Vector list) { synchronized(list) { int lastIndex = list.size() - 1; return list.get(lastIndex); } } public static void deleteLast(Vector list) { synchronized(list) { int lastIndex = list.size() - 1; list.remove(lastIndex); } }

迭代器和ConcurrentModificationException

在設計同步容器類的迭代器時沒有考慮到併發修改的問題,並且他們表現出的行為是“fail-fast”的,這意味著當在迭代過程中容器被修改,就會丟擲ConcurrentModificationException.

這種及時失敗的迭代器並不是一種完備的處理機制,只能作為併發問題的錯誤指示器,它採用的實現方式是,將計數器的變化和容器關聯起來:如果在迭代期間計數器被修改,hasNext或next將丟擲ConcurrentModificationException.這是一種設計上的權衡,從而降低併發修改操作的檢測程式碼對程式效能的影響。

List<Widget> list = Collections.synchronizedList(new ArrayList<Widget>());

//可能丟擲ConcurrentModificationException
for(Widget w : list) {
    doSomething(x);
}

上面這段程式碼無法確保list在迭代時不被修改,如果需要確保list無法修改,一種做法是這樣的:

List<Widget> list = Collections.synchronizedList(new ArrayList<Widget>());

synchronized(list) {
    for(Widget w : list) {
        doSomething(x);
    }    
}

這同時會帶來一些問題,如果迭代沒有完成,其他執行緒無法訪問容器。如果容器規模很大或者doSomething很久那麼鎖的競爭會非常激烈,如果許多執行緒都在迭代鎖的釋放,那麼將極大的降低吞吐量和CPU的利用率。

如果不希望在迭代期間對容器加鎖,一種替代方法是“克隆”容器,並在副本上進行迭代。由於副本被封閉線上程內,因此其他執行緒不會再迭代期間對其進行修改,這樣就避免了ConcurrentModificationException(在克隆過程中仍然需要對容器進行加鎖)

隱藏迭代器

雖然加鎖可以防止迭代器丟擲ConcurrentModificationException,但要記住在所有對共享容器進行迭代的地方都需要加鎖。

public class HiddenIterator {
    private final Set<Integer> set = new HashSet<>();
    
    public synchronized void add(Integer i) {
        set.add(i);
    }
    
    public synchronized void remove(Integer i) {
        set.remove(i);
    }
    
    public void addTenThings() {
        Random r = new Random();
        for(int i = 0; i < 10;i++) {
            add(r.nextInt);
        }
        System.out.println("DEBUG: added ten elements to " + set);
    }
}

在這段程式碼中沒有顯式的呼叫set的迭代方法,但是在println裡隱式的呼叫了set的toString(),這個方法隱式的對set的元素進行迭代。

這裡如果要避免丟擲ConcurrentModificationException,那麼使用Collections.synchornizedSet()是一個好方法。

併發容器

同步容器將所有對容器狀態的訪問都序列化以實現他們的執行緒安全性,這種方法的代價是嚴重降低了併發性,當多個執行緒競爭容器的鎖時,吞吐量將嚴重減低。

通過併發容器來代替同步容器,極大的提高伸縮性並降低風險

ConcurrentHashMap

同步容器類在執行每個操作期間都持有一個鎖。

ConcurrentHashMap和其他併發容器一起增強了同步容器類:它們提供的迭代器不會丟擲ConcurrentModificationException,因此不需要在迭代過程對容器進行加鎖。

ConcurrentHashMap返回的迭代器具有弱一致性(weakly consistent),而並非“fast-fail".弱一致性的迭代器可以容忍併發的修改,當建立迭代器時會遍歷已有的元素,並可以在迭代器被構造後將修改操作反映給容器。

只有當應用程式需要加鎖Map以進行獨佔訪問時,才應該放棄使用ConcurrentHashMap.

額外的原子Map操作

由於ConcurrentHashMap不能被加鎖來執行獨佔訪問,因此我們無法使用客戶端加鎖來建立新的原子操作。一些常用的符合操作都已經實現為原子操作並且在ConcurrentMap介面中宣告,如“putIfAbsent","remove if equal"等。如果你需要在現有的同步Map中新增這樣的功能,那麼可以考慮使用ConcurrentMap.

介面ConcurrentMap包括以下方法:
getOrDefault
forEach
putIfAbsent
remove
replace
replace
replaceAll
computeIfAbsent
computeIfPresent
compute
merge

CopyOnWriteArrayList

CopyOnWriteArrayList用於替換同步list,在某些情況下它提供了更好的併發效能,並且在迭代期間不需要對容器進行加鎖或複製。類似的也有CopyOnWriteArraySet。

“CopyOnWrite” 寫入時複製容器的執行緒安全性在於,只要正確的釋出一個事實不可變的物件,那麼在訪問該物件時就不再需要進一步的同步。在每次修改時,都會建立並重新發佈一個新的容器副本,從而實現可變性。“寫入時複製”容器的迭代器保留一個指向底層基礎陣列的引用,這個陣列當前位於迭代器的起始位置,由於它不會被修改,因此在對其進行同步時只需要確保陣列內容的可見性。因此多個執行緒可以同時對這個容器進行迭代,而不會彼此干擾或者與修改容器的執行緒相互干擾。

"CopyOnWrite"容器返回的迭代器不會丟擲ConcurrentModificationException,並且返回的元素和迭代器建立時的元素完全一致,而不必考慮之後修改操作所帶來的影響。

顯然,每當修改容器時會複製底層陣列,這需要一定的開銷,特別是容器規模較大時。僅當迭代操作遠遠多於修改操作時,從應該使用該類容器。

阻塞佇列和生產者-消費者模式

阻塞佇列提供了可阻塞的put和take方法,以及支援定時的offer和poll方法。如果佇列滿了,put將阻塞到有空間可用;如果佇列為空,那麼take將會阻塞到有元素可用。

在構建高可靠的應用程式時,有界佇列是一種強大的資源管理工具:它們能抑制並防止產生過多的工作項,使應用程式在負荷過載的情況下變得更加健壯。

例項:桌面搜尋

// 生產者,搜尋檔案並將他們放入工作佇列。
public class FileClawer implements Runnable {
    private final BlockingQueue<File> fileQueue;
    private final FileFilter fileFilter;
    private final File root;
    
    public void run() {
        try{
            crawl(root);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void crawl(File root) {
        File[] entries = root.listFiles(fileFilter);
        if(entries != null) {
            for(File entry : entries) {
                if(entry.isDirectory()) {
                    crawl(entry);
                } else if(!alreadyIndexed(entry)){
                    fileQueue.put(entry);
                }
            }
        }
    }
}
 
 //消費者,將佇列裡的file取出並建立索引
public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;
    
    public Indexer(BlockingQueue<File> queue){
        this.queue = queue;
    }
    
    public void run(){
        try{
            while(true) {
                indexFile(queue.take());
            }
        } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}


//啟動類
public static void startIndexing(File[] roots) {
    BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
    FileFilter filter = new FileFilter() {
        public boolean accept(File file) {
            return true;
        }
    }
    
    for(File root : roots) {
        new Thread(new FileCrawler(queue,filter,root)).start();
    }
    
    for(int i = 0;i < N_CONSUMERS;i++) {
        new Thread(new Indexer(queue)).start();
    }
}

序列執行緒封閉

對於可變物件,生產者-消費者這種設計和阻塞佇列一起,促進了序列執行緒封閉,從而將物件所有權從生產者交付給消費者。執行緒封閉物件只能由單個執行緒擁有,但可以通過安全的釋出該物件來“轉移”所有權。在轉移所有權後,也只有另一個執行緒能獲得這個物件的訪問許可權,並且釋出物件的執行緒不會再訪問它。這種安全的釋出確保了物件狀態對於新的所有者來說是可見的,並且由於最初的所有者不會再訪問它,因此物件會被封閉在新的執行緒中,新的所有者執行緒可以對該物件做任意修改,因為它具有獨佔的訪問權。

物件池利用了序列執行緒封閉,將物件“借給”一個請求執行緒。主要物件池包含足夠的內部同步來安全的釋出池中的物件,並且只要客戶程式碼本身不會發布池中的物件,或者在將物件返回給物件後就不再使用它,那麼就可以安全的線上程之間傳遞所有權。

我們也可以使用其他釋出機制來傳遞可變物件的所有權,但必須確保只有一個執行緒能接受被轉移的物件。

雙端佇列和工作密取

正如阻塞佇列適用於生產者-消費者設計模式,雙端佇列適用於 工作密取。

每個消費者都有各自的雙端佇列,如果一個消費者完成了自己雙端佇列中的全部工作,那麼它可以從其他消費者佇列末尾祕密獲取工作。

工作密取非常適用於既是消費者又是生產者問題——當執行某個工作時可能導致出現更多的工作。例如,在網頁爬蟲程式中處理一個頁面時,通常會發現有更多頁面需要處理。

阻塞方法和中斷方法

執行緒可能會阻塞或者暫停執行,原因有多種:等待IO結束,等待獲得一個鎖,等待從Thread.sleep()中醒來,等待另一個執行緒的計算結果。阻塞操作和執行時間很長的普通操作的差別在於:被阻塞的執行緒必須等待某個不受它控制的事件發生後才能繼續執行。

Thread提供了interrupt方法,用於中斷執行緒或者查詢執行緒是否已經被中斷。

中斷是一種協作機制。一個執行緒不能強制其他執行緒停止正在執行的操作而去執行其他的操作。

當在程式碼中呼叫了一個將丟擲InterruptedException異常的方法時,你自己的方法也就成了一個阻塞方法,並且必須要處理對中斷的響應。通常由兩種做法:

  1. 傳遞InterruptedException。避開這個異常通常是最明智的策略——只需要把這個異常傳遞給方法的呼叫者。包括:根本不捕獲該異常,或者捕獲該異常然後執行某種簡單的清理工作後再次丟擲這個異常。
  2. 恢復中斷。有時候不能丟擲InterruptedException,例如當代碼是Runnable的一部分時,在這些情況下,必須捕獲該異常,並通過讀取執行緒的interrupt方法恢復中斷狀態。
public class TaskRunnable implements Runnable {
    BlockingQueue<Task> queue;
    
    public void run() {
        try{
            processTask(queue.take());
        } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

同步工具類

同步工具類可以是任何一個物件,只要它根據自身的狀態來協調執行緒的控制流。阻塞佇列可以作為工具類,其他型別的同步工具類還包括訊號量(Semaphore)、柵欄(Barrier)、閉鎖(Latch)。

閉鎖

閉鎖是一種同步工具類,可以延遲執行緒的進度直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能通過,當到達結束狀態時,這扇門將永遠保持開啟狀態。

閉鎖可以確保某些活動直到其他活動都完成後才繼續執行,例如:

  • 確保某個計算在其需要的所有資源都被初始化之後才繼續執行。二元閉鎖(包括兩個狀態)可以用來表示“資源R已經被初始化”,而所有需要R的操作都必須先在這個閉鎖上等待。
  • 確保某個服務在其依賴的所有服務都已啟動後再啟動。
  • 等待直到某個操作的所有參與者都就緒再繼續執行。

CountDownLatch是一種閉鎖實現,它可以使一個或多個執行緒等待一組事件發生。閉鎖狀態包括一個計數器,該計數器初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示一個事件已經發生了,而await方法等待計數器達到0,這表示所有需要等待的事件都已經發生。如果計數器的值非0,那麼await會一直阻塞直到計數器為0或者等待中的執行緒中斷,或者等待超時。

public class TestHarness {
    public long timeTasks(int nThreads,final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        
        for(int i = 0;i < nThreads;i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                        
                    } catch (InterruptedException ignored) {
                        
                    }
                }
            }
            
            t.start();
        }
        
        
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

啟動門使主執行緒能夠同時釋放所有工作執行緒,而結束門則使主執行緒能夠等待最後一個執行緒執行完成,而不是順序的等待每個執行緒執行完成。

FutureTask

FutureTask也可以用作閉鎖。FutureTask表示計算是通過Callable來實現的,相當於一種可生成結果的Runnable,並且可以處於以下3中狀態:等待執行,正在執行,執行完成。執行完成表示計算的所有可能結束方式包括正常結束,由於取消而結束,由於異常而結束等。當FutureTask進入完成狀態後,它會永遠停在這個狀態上。

Future.get的行為取決於任務的狀態。如果任務已經完成get立即返回結果,否則get將阻塞直到任務進入完成狀態,然後返回結果或者丟擲異常。FutureTask將計算結果從執行計算的執行緒傳遞到獲取這個結果的執行緒,而FutureTask的規範確保了這種傳遞能實現結果的安全釋出。

FutureTask在Executor框架中表示非同步任務,此外還可以用來表示一些時間較長的計算,這些計算可以在使用計算結果之前啟動。

public class Preloader {
    private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
        public ProductInfo call() throws DataLoadException {
            return loadProductInfo();
        }
    });
    private final Thread thread = new Thread(future);
    
    public void start() {
        thread.start();
    }
    public ProductInfo get() throws DataLoadException,InterruptedException{
        try {
            return future.get();
        } catch(ExecutionException e) {
            Throwable cause = e.getCause();
            if(cause instanceof DataLoadException) {
                throw (DataLoadException) cause;
            } else {
                throw launderThrowable(cause);
            }
        }
    }
    
    
    public static RuntimeException launderThrowable(Throwable t) {
        if(t instanceof RuntimeException) {
            return (RuntimeException) t;
        } else if (t instanceof Error) {
            throw (Error) t;
        } else {
            throw new IllegalStateException("Not unchecked " , t);
        }
    }
}


Preloader建立了一個FutureTask,其中包含從資料庫載入產品資訊的任務,以及一個執行運算的執行緒。由於在建構函式或靜態初始化中啟動執行緒並不是一種好方法,因此提供了一個start方法來啟動執行緒。當程式隨後需要ProductInfo時,可以呼叫get方法,如果資料已經載入,那麼將返回這些資料,否則將等待載入完成再返回。

Callable表示的任務可以丟擲受查異常或為受查異常,並且任何程式碼都可能丟擲一個Error。無論任務程式碼丟擲什麼異常都會被封裝到一個ExecutionExecution中,並在Future.get中被重新丟擲。

在Preloader中,當get方法丟擲ExecutionException時,可能是以下三種情況之一:Callable丟擲的受查異常,RuntimeExecution,以及Error。

訊號量

計數訊號量用來控制同時訪問某個特定資源的運算元量,或者同時執行某個指定操作的數量。還可以用來實現某種資源池,或者對容器施加邊界。

Semaphore中管理著一組虛擬的許可(permit),許可的初始數量有建構函式來指定,在執行操作時先獲取許可,在結束使用時釋放許可。如果沒有許可,acquire將阻塞到有許可(或者知道被中斷或者操作超時)。release方法將返回一個許可。構造一個超時值為1的Semaphore可以用作互斥體,並具備不可重入的加鎖語義。

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;
    
    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }
    
    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if(!wasAdded) {
                sem.release();
            }
        }
    }
    
    
    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if(wasRemoved) {
            sem.release();
        }
        return wasRemoved;
    }
}

柵欄

柵欄和閉鎖類似,但是有所區別,所有執行緒必須同時到達柵欄位置,才能繼續執行。閉鎖用來等待事件,而柵欄用來等待其他執行緒。柵欄可以用來實現一些協議。

CyclicBarrier可以使一定數量的參與方反覆的在柵欄位置彙集,他在並行迭代演算法中非常有用:這種演算法通常將一個問題拆分成一系列相互獨立的子問題。當執行緒到達柵欄位置時呼叫await反覆,這個方法將阻塞知道所有執行緒都到達柵欄位置。如果所有執行緒都到達了柵欄位置,你們柵欄將開啟,此時所有執行緒都被釋放,而柵欄將被重置以便下次使用。如果對await呼叫超時,或者await阻塞的執行緒被中斷,你們柵欄就被認為是打破了,所有阻塞的await呼叫都將終止並丟擲BrokenBarrierException.如果成功通過柵欄,你們await將為每個執行緒返回一個唯一的到達索引號,我們可以利用這些索引來“選舉”產生一個領導執行緒,並在下一次迭代中由該領導執行緒執行一些特殊工作。CyclicBarrier還可以使你將一個柵欄操作傳遞給建構函式,這是一個Runnable,當成功通過柵欄時會執行它,但在阻塞執行緒被釋放之前是不能執行的。

生命遊戲:

public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;
    
    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime().getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,new Runnable(){
            public void run() {
                mainBoard.commitNewValues();
            }
        });
        this.workers = new Worker[count];
        for(int i = 0;i < count;i++) {
            workers[i] = new Worker(mainBoard.getSubBoard(count,i));
        }
    }
    
    private class Worker implements Runnable {
        private final Board board;
        public Worker(Board board) {
            this.board = board;
        }
        
        public void run() {
            while(!board.hasConverged()) {
                for(int x = 0; x < board.getMaxX();x++) {
                    for(int y =