1. 程式人生 > >[JCIP筆記](五)JDK並發包

[JCIP筆記](五)JDK並發包

int CP pla step sta 地方 元素 PE interrupt

這一節來講一講java.util.concurrent這個包裏的一些重要的線程安全有關類。

synchronized容器

synchronized容器就是把自己的內部狀態封裝起來,通過把每一個public方法設置成同步來控制對共享變量的訪問的容器。主要包括Vector, Hashtable,以及Collections.synchronizedxxx()方法提供的wrapper。

synchronized容器的問題-client locking

首先,synchronzied容器雖然是線程安全的,但是要訪問容器內部數據的線程只能先拿到容器的內置鎖才能訪問,實際上相當於串行訪問,CPU利用率和效率都不高。

另外還有一個值得註意的地方,就是用戶代碼使用synchronized容器時,如果需要做一些復合操作,比如put-if-absent,仍然要顯式加鎖(稱為client locking),否則會產生race condition。
比如以下操作:

1 public Object getLast(Vector list){
2   int last = list.size() - 1; //1
3   return list.get(last); //2
4 }
5 public void removeLast(Vector list){
6   int last = list.size() - 1; //
3 7   list.remove(last); //4 8 }

以上兩個方法都對Vector進行了復合操作,在不加鎖的情況下可能產生這樣一種場景:線程A調用getLast(),同時線程B調用removeLast()。線程A進行step 1拿到last同時線程B也拿到同樣的last;此時由於線程調度上的原因,線程B先執行了step 4刪除了最後一個節點,而線程A在此之後才執行step 2, 由於最後一個節點已被刪除,線程A這裏會報ArrayIndexOutOfBoundsException,而這個錯誤並不是用戶希望看到的。


  技術分享圖片

所以如果要按照類似的方法使用synchronized容器的話還是需要自己加鎖。由於這些容器內部的線程安全策略是使用自己的內置鎖,所以用戶代碼加鎖的時候需要用到的是容器本身。

 1 public Object getLast(Vector list){
 2   synchronized(list){
 3     int last = list.size() - 1; //1
 4     return list.get(last); //2
 5   }
 6 }
 7 public void removeLast(Vector list){
 8   synchronized(list){
 9     int last = list.size() - 1; //3
10     list.remove(last); //4
11   }
12 }

除了這些用戶自定義的復合操作之外,其實iteration也算復合操作,所以也應該加鎖。此處應註意兩點:

  1. 容器自帶的Iterator本身不支持並發修改,所以它提供了一個所謂的fail-fast的並發修改報錯機制,即容器自身維護一個modCount域,Iterator在創建時記錄這個modCount的值,如果在用戶遍歷容器的過程中modCount值發生了改變,則說明有另一個線程對容器做出了修改,那麽Iterator馬上會拋出ConcurrentModificationException。

    這個機制嚴格意義上並不能夠100%地探測到並發修改,因為modCount這個域並不是volatile的,在判斷    

    if(modCount == expectedModCount)

   時也並未加鎖。作者描述這個機制是在考慮性能的情況下所做的一個best-effort的努力。總之,不應該對這個機制做過多的依賴。

  2. 有一些容器自帶的方法看起來很無辜,但內部會用到iterator,所以用戶用到這些無辜方法的時候還是要加鎖。比如我們常用的toString, for-each語法,hashCode, equals, containsAll, removeAll, retainAll, 以其他容器為參數的構造器,等等。而這些方法有時候也是被隱式調用的,很難檢查到,比如:

    1 //...add some elements to the set
    2 System.out.println("DEBUG: added ten elements to " + set);

    這裏打印時set.toString()方法被隱式調用了。

client locking的問題

由於client code嘗試使用容器內部的線程安全機制,所以容易導致starvation和deadlock,這是因為任意代碼都可以使用容器的內置鎖,散落在各處的線程安全機制使得程序很難維護和debug。如果要解決這個問題,可以把容器克隆到線程內部進行使用,但每次使用的時候都要重新克隆,要考慮克隆本身帶來的代價。

Concurrent容器

相比於synchronized容器,Concurrent容器可以提供更高的並發性。
如果需要並發的Map,相比於synchronized Map,可以優先考慮ConcurrentHashMap;同理,相比於synchronized List/Set,可以優先考慮CopyOnWriteArrayList/Set;相比於synchronized SortedMap/Set,可以優先考慮ConcurrentSkipMap/Set。

ConcurrentHashMap

+ 使用了比Hashtable更細粒度的lock striping線程安全策略,支持多個(有限個)線程同時讀寫。
+ 提供的Iterator是weakly consistent的,容許並發修改。
- size/isEmpty等方法只提供估算值。
- 由於使用的鎖對象是private的,不支持client-side locking。(但是提供put-if-absent等復合操作)

CopyOnWriteArrayList

+ 每次改動時創建和發布新的collection copy。
+ 內部array是effectively immutable的,因此發布後可以不加鎖地安全訪問。
+ 適用於iteration >> modification的情況,如listeners。

BlockingQueue與生產者-消費者

BlockingQueue的最大好處是它不僅是一個簡單的容器,它還能提供flow-control,能讓程序在消息過多的情況下仍然保持健壯。

特殊的BlockingQueue: SynchronousQueue

一種很特殊的queue,實際上沒有內在的存儲,只是用於線程間的交接(rendezvous)。適用於消費者夠多的情況,比起BlockingQueue的最大好處是沒有交接成本。

 1 Thread producer = new Thread("PRODUCER") {
 2   public void run() {
 3     String event = "MY_EVENT";
 4     try {
 5       queue.put(event); // thread will block here
 6       System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event);
 7     } catch (InterruptedException e) {
 8       e.printStackTrace();
 9     }
10   }
11 };
12 producer.start(); // starting publisher thread
13 
14 
15 Thread consumer = new Thread("CONSUMER") {
16   public void run() {
17     try {
18       String event = queue.take(); // thread will block here
19       System.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), event);
20     } catch (InterruptedException e) {
21       e.printStackTrace();
22     }
23   }
24 };
25 consumer.start(); // starting consumer thread
26 
27 [PRODUCER] published event : MY_EVENT
28 [CONSUMER] consumed event : MY_EVENT

Synchronizers

所謂的synchronizer,就是能夠根據其內部狀態調節線程的control flow的對象。

CountDownLatch

主要方法:
  - countDown
  - await
CountDownLatch有如一個閥門,在其達到最終狀態前閥門關閉,線程不可通過。達到最終狀態時,閥門打開,所有線程通過。打開後的閥門永遠打開,狀態不再改變。

適用情景:

  • 等待所依賴的資源全部加載完成後才繼續。 
  • 初始化順序中各個service之間的相互等待。
  • 等待所有參與的player都準備好才開始遊戲。

FutureTask

主要方法:get
task真正結束前get方法會阻塞,直到task執行結束/被取消/拋異常。

Semaphore

主要方法:
  - release
  - acquire

有有限多個permit,acquire時如果permit為0會阻塞,但release可以執行無限多次。

適合:控制可以同時訪問某資源的activity數量。可用來實現資源池或將容器設為可以存儲有限個元素的容器。

CyclicBarrier

主要方法:await

必須所有線程到達Barrier時,所有線程才能通過。

Latch用來等待事件;Barrier用來等待其它線程。

適用場景:N等N

 1 public class CellularAutomata {
 2     private final Board mainBoard;
 3     private final CyclicBarrier barrier;
 4     private final Worker[] workers;
 5 
 6     public CellularAutomata(Board board) {
 7         this.mainBoard = board;
 8         int count = Runtime.getRuntime().availableProcessors();
 9         this.barrier = new CyclicBarrier(count,
10                 new Runnable() {
11                     public void run() {
12                         mainBoard.commitNewValues();
13                     }});
14         this.workers = new Worker[count];
15         for (int i = 0; i < count; i++)
16             workers[i] = new Worker(mainBoard.getSubBoard(count, i));
17     }
18 
19     private class Worker implements Runnable {
20         private final Board board;
21 
22         public Worker(Board board) { this.board = board; }
23         public void run() {
24             while (!board.hasConverged()) {
25                 for (int x = 0; x < board.getMaxX(); x++)
26                     for (int y = 0; y < board.getMaxY(); y++)
27                         board.setNewValue(x, y, computeValue(x, y));
28                 try {
29                     barrier.await();
30                 } catch (InterruptedException ex) {
31                     return;
32                 } catch (BrokenBarrierException ex) {
33                     return;
34                 }
35             }
36         }
37 
38         private int computeValue(int x, int y) {
39             // Compute the new value that goes in (x,y)
40             return 0;
41         }
42     }
43 
44     public void start() {
45         for (int i = 0; i < workers.length; i++)
46             new Thread(workers[i]).start();
47         mainBoard.waitForConvergence();
48     }
49 }

[JCIP筆記](五)JDK並發包