1. 程式人生 > >集合及concurrent併發包總結

集合及concurrent併發包總結

1.集合包

    集合包最常用的有Collection和Map兩個介面的實現類,Colleciton用於存放多個單物件,Map用於存放Key-Value形式的鍵值對。

  Collection中最常用的又分為兩種型別的介面:List和Set,兩者最明顯的差別為List支援放入重複的元素,而Set不支援。

List最常用的實現類有:ArrayList、LinkedList、Vector及Stack;Set介面常用的實現類有:HashSet、TreeSet。

1.1 ArrayList

  ArrayList基於陣列方式實現,預設構造器通過呼叫ArrayList(int)來完成建立,傳入的值為10,例項化了一個Object陣列,並將此陣列賦給了當前例項的elementData屬性,此Object陣列的大小即為傳入的initialCapacity,因此呼叫空構造器的情況下會建立一個大小為10的Object陣列。

插入物件:add(E)

    基於已有元素數量加1作為名叫minCapacity的變數,比較此值和Object陣列的大小,若大於陣列值,那麼先將當前Object陣列值賦給一個數組物件,接著產生一個鑫的陣列容量值。此值的計算方法為當前陣列值*1.5+1,如得出的容量值仍然小於minCapacity,那麼就以minCapacity作為新的容量值,呼叫Arrays.copyOf來生成新的陣列物件。

    還提供了add(int,E)這樣的方法將元素直接插入指定的int位置上,將目前index及其後的資料都往後挪一位,然後才能將指定的index位置的賦值為傳入的物件,這種方式要多付出一次複製陣列的代價。還提供了addAll

 刪除物件:remove(E)

   這裡呼叫了faseRemove方法將index後的物件往前複製一位,並將陣列中的最後一個元素的值設定為null,即釋放了對此物件的引用。 還提供了remove(int)方法來刪除指定位置的物件,remove(int)的實現比remove(E)多了一個數組範圍的檢測,但少了物件位置的查詢,因此效能會更好。

獲取單個物件:get(int)

遍歷物件:iterator()

判斷物件是否存在:contains(E)

 總結:

    1,ArrayList基於陣列方式實現,無容量的限制;

    2,ArrayList在執行插入元素時可能要擴容,在刪除元素時並不會減小陣列的容量(如希望相應的縮小陣列容量,可以呼叫ArrayList的trimToSize()),在查詢元素時要遍歷陣列,對於非null的元素採取equals的方式尋找;

    3,ArrayList是非執行緒安全的。

1.2 LinkedList

    LinkedList基於雙向連結串列機制,所謂雙向連結串列機制,就是集合中的每個元素都知道其前一個元素及其後一個元素的位置。在LinkedList中,以一個內部的Entry類來代表集合中的元素,元素的值賦給element屬性,Entry中的next屬性指向元素的後一個元素,Entry中的previous屬性指向元素的前一個元素,基於這樣的機制可以快速實現集合中元素的移動。

總結:

    1,LinkedList基於雙向連結串列機制實現;

    2,LinkedList在插入元素時,須建立一個新的Entry物件,並切換相應元素的前後元素的引用;在查詢元素時,須遍歷連結串列;在刪除元素時,要遍歷連結串列,找到要刪除的元素,然後從連結串列上將此元素刪除即可,此時原有的前後元素改變引用連在一起;

    3,LinkedList是非執行緒安全的。

1.3 Vector

    其add、remove、get(int)方法都加了synchronized關鍵字,預設建立一個大小為10的Object陣列,並將capacityIncrement設定為0。容量擴充策略:如果capacityIncrement大於0,則將Object陣列的大小擴大為現有size加上capacityIncrement的值;如果capacity等於或小於0,則將Object陣列的大小擴大為現有size的兩倍,這種容量的控制策略比ArrayList更為可控。

    Vector是基於Synchronized實現的執行緒安全的ArrayList,但在插入元素時容量擴充的機制和ArrayList稍有不同,並可通過傳入capacityIncrement來控制容量的擴充。

1.4 Stack

    Stack繼承於Vector,在其基礎上實現了Stack所要求的後進先出(LIFO)的彈出與壓入操作,其提供了push、pop、peek三個主要的方法:

    push操作通過呼叫Vector中的addElement來完成;

    pop操作通過呼叫peek來獲取元素,並同時刪除陣列中的最後一個元素;

    peek操作通過獲取當前Object陣列的大小,並獲取陣列上的最後一個元素。

1.5 HashSet

    預設構造建立一個HashMap物件

add(E):呼叫HashMap的put方法來完成此操作,將需要增加的元素作為Map中的key,value則傳入一個之前已建立的Object物件。

remove(E):呼叫HashMap的remove(E)方法完成此操作。

contains(E):HashMap的containsKey

iterator():呼叫HashMap的keySet的iterator方法。

HashSet不支援通過get(int)獲取指定位置的元素,只能自行通過iterator方法來獲取。

總結:

    1,HashSet基於HashMap實現,無容量限制;

    2,HashSet是非執行緒安全的。

1.6 TreeSet

    TreeSet和HashSet的主要不同在於TreeSet對於排序的支援,TreeSet基於TreeMap實現。

1.7 HashMap

    HashMap空構造,將loadFactor設為預設的0.75,threshold設定為12,並建立一個大小為16的Entry物件陣列。

    基於陣列+連結串列的結合體(連結串列雜湊)實現,將key-value看成一個整體,存放於Entity[]陣列,put的時候根據key hash後的hashcode和陣列length-1按位與的結果值判斷放在陣列的哪個位置,如果該陣列位置上若已經存放其他元素,則在這個位置上的元素以連結串列的形式存放。如果該位置上沒有元素則直接存放。

當系統決定儲存HashMap中的key-value對時,完全沒有考慮Entry中的value,僅僅只是根據key來計算並決定每個Entry的儲存位置。我們完全可以把Map集合中的value當成key的附屬,當系統決定了key的儲存位置之後,value隨之儲存在那裡即可。get取值也是根據key的hashCode確定在陣列的位置,在根據key的equals確定在連結串列處的位置。

while (capacity < initialCapacity)
     capacity <<= 1;

以上程式碼保證了初始化時HashMap的容量總是2的n次方,即底層陣列的長度總是為2的n次方。它通過h & (table.length -1) 來得到該物件的儲存位,若length為奇數值,則與運算產生相同結果,便會形成連結串列,儘可能的少出現連結串列才能提升hashMap的效率,所以這是hashMap速度上的優化。

擴容resize():

當HashMap中的元素越來越多的時候,hash衝突的機率也就越來越高,因為陣列的長度是固定的。所以為了提高查詢的效率,就要對HashMap的陣列進行擴容,而在HashMap陣列擴容之後,最消耗效能的點就出現了:原陣列中的資料必須重新計算其在新陣列中的位置,並放進去,這就是resize。那麼HashMap什麼時候進行擴容呢?當HashMap中的元素個數超過陣列大小*loadFactor時,就會進行陣列擴容,loadFactor的預設值為0.75,這是一個折中的取值。

負載因子衡量的是一個散列表的空間的使用程度,負載因子越大表示散列表的裝填程度越高,反之愈小。負載因子越大,對空間的利用更充分,然而後果是查詢效率的降低;如果負載因子太小,那麼散列表的資料將過於稀疏,對空間造成嚴重浪費。

HashMap的實現中,通過threshold欄位來判斷HashMap的最大容量。threshold就是在此loadFactor和capacity對應下允許的最大元素數目,超過這個數目就重新resize,以降低實際的負載因子。預設的的負載因子0.75是對空間和時間效率的一個平衡選擇。

initialCapacity*2,成倍擴大容量,HashMap(int initialCapacity, float loadFactor):以指定初始容量、指定的負載因子建立一個 HashMap。不設定引數,則初始容量值為16,預設的負載因子為0.75,不宜過大也不宜過小,過大影響效率,過小浪費空間。擴容後需要重新計算每個元素在陣列中的位置,是一個非常消耗效能的操作,所以如果我們已經預知HashMap中元素的個數,那麼預設元素的個數能夠有效的提高HashMap的效能。

     HashTable資料結構的原理大致一樣,區別在於put、get時加了同步關鍵字,而且HashTable不可存放null值。

在高併發時可以使用ConcurrentHashMap,其內部使用鎖分段技術,維持這鎖Segment的陣列,在陣列中又存放著Entity[]陣列,內部hash演算法將資料較均勻分佈在不同鎖中。

總結:

    1,HashMap採用陣列方式儲存key、value構成的Entry物件,無容量限制;

    2,HashMap基於key hash尋找Entry物件存放到陣列的位置,對於hash衝突採用連結串列的方式解決;

    3,HashMap在插入元素時可能會擴大陣列的容量,在擴大容量時須要重新計算hash,並複製物件到新的陣列中;

    4,HashMap是非執行緒安全的。

1.8 TreeMap

    TreeMap基於紅黑樹的實現,因此它要求一定要有key比較的方法,要麼傳入Comparator實現,要麼key物件實現Comparable藉口。在put操作時,基於紅黑樹的方式遍歷,基於comparator來比較key應放在樹的左邊還是右邊,如找到相等的key,則直接替換掉value。

2.併發包

 jdk5.0一很重要的特性就是增加了併發包java.util.concurrent.*,在說具體的實現類或介面之前,這裡先簡要說下Java記憶體模型、volatile變數及AbstractQueuedSynchronizer(以下簡稱AQS同步器),這些都是併發包眾多實現的基礎。

Java記憶體模型

    描述了執行緒記憶體與主存見的通訊關係。定義了執行緒內的記憶體改變將怎樣傳遞到其他執行緒的規則,同樣也定義了執行緒記憶體與主存進行同步的細節,也描述了哪些操作屬於原子操作及操作間的順序。

程式碼順序規則:

    一個執行緒內的每個動作happens-before同一個執行緒內在程式碼順序上在其後的所有動作.

volatile變數規則:

    對一個volatile變數的讀,總是能看到(任意執行緒)對這個volatile變數最後的寫入.

傳遞性:

    如果A happens-before B, B happens-before C, 那麼A happens-before C.    

volatile

當我們宣告共享變數為volatile後,對這個變數的讀/寫將會很特別。理解volatile特性的一個好方法是:把對volatile變數的單個讀/寫,看成是使用同一個監視器鎖對這些單個讀/寫操作做了同步。

監視器鎖的happens-before規則保證釋放監視器和獲取監視器的兩個執行緒之間的記憶體可見性,這意味著對一個volatile變數的讀,總是能看到(任意執行緒)對這個volatile變數最後的寫入。

簡而言之,volatile變數自身具有下列特性:

  • 可見性。對一個volatile變數的讀,總是能看到(任意執行緒)對這個volatile變數最後的寫入。

  • 原子性:對任意單個volatile變數的讀/寫具有原子性,但類似於volatile++這種複合操作不具有原子性。

volatile寫的記憶體語義如下:

  • 當寫一個volatile變數時,JMM會把該執行緒對應的本地記憶體中的共享變數重新整理到主記憶體。

volatile讀的記憶體語義如下:

  • 當讀一個volatile變數時,JMM會把該執行緒對應的本地記憶體置為無效。執行緒接下來將從主記憶體中讀取共享變數。

下面對volatile寫和volatile讀的記憶體語義做個總結:

  • 執行緒A寫一個volatile變數,實質上是執行緒A向接下來將要讀這個volatile變數的某個執行緒發出了(其對共享變數所在修改的)訊息。

  • 執行緒B讀一個volatile變數,實質上是執行緒B接收了之前某個執行緒發出的(在寫這個volatile變數之前對共享變數所做修改的)訊息。

  • 執行緒A寫一個volatile變數,隨後執行緒B讀這個volatile變數,這個過程實質上是執行緒A通過主記憶體向執行緒B傳送訊息。

鎖釋放-獲取與volatile的讀寫具有相同的記憶體語義,

鎖釋放的記憶體語義如下:

    當執行緒釋放鎖時,JMM會把該執行緒對應的本地記憶體中的共享變數重新整理到主記憶體。

鎖獲取的記憶體語義如下:

    當執行緒獲取鎖時,JMM會把該執行緒對應的本地記憶體置為無效,從而使得被監視器保護的臨界區程式碼必須要從主記憶體中讀取共享變數。

下面對鎖釋放和鎖獲取的記憶體語義做個總結:

  • 執行緒A釋放一個鎖,實質上是執行緒A向接下來將要獲取這個鎖的某個執行緒發出了(執行緒A對共享變數所做修改的)訊息。

  • 執行緒B獲取一個鎖,實質上是執行緒B接收了之前某個執行緒發出的(在釋放這個鎖之前對共享變數所做修改的)訊息。

  • 執行緒A釋放鎖,隨後執行緒B獲取這個鎖,這個過程實質上是執行緒A通過主記憶體向執行緒B傳送訊息。

示例:

class VolatileExample {
    int x = 0;
    volatile int b = 0;

    private void write() {
        x = 5;
        b = 1;
    }

    private void read() {
        int dummy = b;
        while (x != 5) {
        }
    }

    public static void main(String[] args) throws Exception {
        final VolatileExample example = new VolatileExample();
        Thread thread1 = new Thread(new Runnable() {
            public void run() {
                example.write();
            }
        });
        Thread thread2 = new Thread(new Runnable() {
            public void run() {
                example.read();
            }
        });
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
    }
}

若thread1先於thread2執行,則程式執行流程分析如上圖所示,thread2讀的結果是dummy=1,x=5所以不會進入死迴圈。

但並不能保證兩執行緒的執行順序,若thread2先於thread1執行,則程式在兩執行緒join中斷之前的結果為:因為b變數的型別是volatile,故thread1寫之後,thread2即可讀到b變數的值發生變化,

而x是普通變數,故最後情況是dummy=1,但thread2的讀操作因為x=0而進入死迴圈中。

    在JSR-133之前的舊Java記憶體模型中,雖然不允許volatile變數之間重排序,但舊的Java記憶體模型仍然會允許volatile變數與普通變數之間重排序。JSR-133則增強了volatile的記憶體語義:嚴格限制編譯器(在編譯期)和處理器(在執行期)對volatile變數與普通變數的重排序,確保volatile的寫-讀和監視器的釋放-獲取一樣,具有相同的記憶體語義。限制重排序是通過記憶體屏障實現的,具體可見JMM的描述。

    由於volatile僅僅保證對單個volatile變數的讀/寫具有原子性,而監視器鎖的互斥執行的特性可以確保對整個臨界區程式碼的執行具有原子性。在功能上,監視器鎖比volatile更強大;在可伸縮性和執行效能上,volatile更有優勢。如果讀者想在程式中用volatile代替監視器鎖,請一定謹慎。

AbstractQueuedSynchronizer (AQS)

    AQS使用一個整型的volatile變數(命名為state)來維護同步狀態,這是接下來實現大部分同步需求的基礎。提供了一個基於FIFO佇列,可以用於構建鎖或者其他相關同步裝置的基礎框架。使用的方法是繼承,子類通過繼承同步器並需要實現它的方法來管理其狀態,管理的方式就是通過類似acquire和release的方式來操縱狀態。然而多執行緒環境中對狀態的操縱必須確保原子性,因此子類對於狀態的把握,需要使用這個同步器提供的以下三個方法對狀態進行操作:

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子類推薦被定義為自定義同步裝置的內部類,同步器自身沒有實現任何同步介面,它僅僅是定義了若干acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式,當它被定義為一個排他模式時,其他執行緒對其的獲取就被阻止,而共享模式對於多個執行緒獲取都可以成功。

    同步器是實現鎖的關鍵,利用同步器將鎖的語義實現,然後在鎖的實現中聚合同步器。可以這樣理解:鎖的API是面向使用者的,它定義了與鎖互動的公共行為,而每個鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個執行緒進行加鎖,排除兩個以上的執行緒),但是實現是依託給同步器來完成;同步器面向的是執行緒訪問和資源控制,它定義了執行緒對資源是否能夠獲取以及執行緒的排隊等操作。鎖和同步器很好的隔離了二者所需要關注的領域,嚴格意義上講,同步器可以適用於除了鎖以外的其他同步設施上(包括鎖)。 同步器的開始提到了其實現依賴於一個FIFO佇列,那麼佇列中的元素Node就是儲存著執行緒引用和執行緒狀態的容器,每個執行緒對同步器的訪問,都可以看做是佇列中的一個節點。

對於一個獨佔鎖的獲取和釋放有如下偽碼可以表示:

獲取一個排他鎖

while(獲取鎖) {
	if (獲取到) {
		退出while迴圈
	} else {
		if(當前執行緒沒有入佇列) {
			那麼入佇列
		}
		阻塞當前執行緒
	}
}

釋放一個排他鎖

if (釋放成功) {
	刪除頭結點
	啟用原頭結點的後繼節點
}

示例:

下面通過一個排它鎖的例子來深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能更加深入瞭解其他的併發元件。

排他鎖的實現,一次只能一個執行緒獲取到鎖:

public class Mutex implements Lock, java.io.Serializable {
    // 內部類,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
      // 是否處於佔用狀態
      protected boolean isHeldExclusively() {
        return getState() == 1;
      }
      // 當狀態為0的時候獲取鎖
      public boolean tryAcquire(int acquires) {
        assert acquires == 1; // Otherwise unused
        if (compareAndSetState(0, 1)) {
          setExclusiveOwnerThread(Thread.currentThread());
          return true;
        }
        return false;
      }
      // 釋放鎖,將狀態設定為0
      protected boolean tryRelease(int releases) {
        assert releases == 1; // Otherwise unused
        if (getState() == 0) throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
      }
      // 返回一個Condition,每個condition都包含了一個condition佇列
      Condition newCondition() { return new ConditionObject(); }
    }
    // 僅需要將操作代理到Sync上即可
    private final Sync sync = new Sync();
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
  }

可以看到Mutex將Lock介面均代理給了同步器的實現。使用方將Mutex構造出來後,呼叫lock獲取鎖,呼叫unlock將鎖釋放。

獲取鎖,acquire(int arg)的主要邏輯包括:

1. 嘗試獲取(呼叫tryAcquire更改狀態,需要保證原子性);

    在tryAcquire方法中適用了同步器提供的對state操作的方法,利用compareAndSet保證只有一個執行緒能夠對狀態進行成功修改,而沒有成功修改的執行緒將進入sync佇列排隊。

2. 如果獲取不到,將當前執行緒構造成節點Node並加入sync佇列;

    進入佇列的每個執行緒都是一個節點Node,從而形成了一個雙向佇列,類似CLH佇列,這樣做的目的是執行緒間的通訊會被限制在較小規模(也就是兩個節點左右)。

3. 再次嘗試獲取,如果沒有獲取到那麼將當前執行緒從執行緒排程器上摘下,進入等待狀態。

釋放鎖,release(int arg)的主要邏輯包括:

1. 嘗試釋放狀態;

    tryRelease能夠保證原子化的將狀態設定回去,當然需要使用compareAndSet來保證。如果釋放狀態成功之後,就會進入後繼節點的喚醒過程。

2. 喚醒當前節點的後繼節點所包含的執行緒。

    通過LockSupport的unpark方法將休眠中的執行緒喚醒,讓其繼續acquire狀態。

回顧整個資源的獲取和釋放過程:

在獲取時,維護了一個sync佇列,每個節點都是一個執行緒在進行自旋,而依據就是自己是否是首節點的後繼並且能夠獲取資源;

在釋放時,僅僅需要將資源還回去,然後通知一下後繼節點並將其喚醒。

這裡需要注意,佇列的維護(首節點的更換)是依靠消費者(獲取時)來完成的,也就是說在滿足了自旋退出的條件時的一刻,這個節點就會被設定成為首節點。

佇列裡的節點執行緒的禁用和喚醒是通過LockSupport的park()及unpark(),呼叫的unsafe、底層也是native的實現。

共享模式和以上的獨佔模式有所區別,分別呼叫acquireShared(int arg)和releaseShared(int arg)獲取共享模式的狀態。

以檔案的檢視為例,如果一個程式在對其進行讀取操作,那麼這一時刻,對這個檔案的寫操作就被阻塞,相反,這一時刻另一個程式對其進行同樣的讀操作是可以進行的。如果一個程式在對其進行寫操作,

那麼所有的讀與寫操作在這一時刻就被阻塞,直到這個程式完成寫操作。

以讀寫場景為例,描述共享和獨佔的訪問模式,如下圖所示:

上圖中,紅色代表被阻塞,綠色代表可以通過。

在上述對同步器AbstractQueuedSynchronizer進行了實現層面的分析之後,我們通過一個例子來加深對同步器的理解:

設計一個同步工具,該工具在同一時刻,只能有兩個執行緒能夠並行訪問,超過限制的其他執行緒進入阻塞狀態。

對於這個需求,可以利用同步器完成一個這樣的設定,定義一個初始狀態,為2,一個執行緒進行獲取那麼減1,一個執行緒釋放那麼加1,狀態正確的範圍在[0,1,2]三個之間,當在0時,代表再有新的執行緒對資源進行獲取時只能進入阻塞狀態(注意在任何時候進行狀態變更的時候均需要以CAS作為原子性保障)。由於資源的數量多於1個,同時可以有兩個執行緒佔有資源,因此需要實現tryAcquireShared和tryReleaseShared方法。

public class TwinsLock implements Lock {
    private final Sync  sync    = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long   serialVersionUID    = -7889272986162341211L;

        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }

        public int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        public boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    public Condition newCondition() {
        return null;
    }
}

這裡我們編寫一個測試來驗證TwinsLock是否能夠正常工作並達到預期。

public class TwinsLockTest {

    @Test
    public void test() {
        final Lock lock = new TwinsLock();

        class Worker extends Thread {
            public void run() {
                while (true) {
                    lock.lock();

                    try {
                        Thread.sleep(1000L);
                System.out.println(Thread.currentThread());
                        Thread.sleep(1000L);
                    } catch (Exception ex) {

                    } finally {
                        lock.unlock();
                    }
                }
            }
        }

        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.start();
        }

        new Thread() {
            public void run() {
                while (true) {

                    try {
                        Thread.sleep(200L);
                        System.out.println();
                    } catch (Exception ex) {

                    }
                }
            }
        }.start();

        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述測試用例的邏輯主要包括:

1. 列印執行緒

Worker在兩次睡眠之間列印自身執行緒,如果一個時刻只能有兩個執行緒同時訪問,那麼打印出來的內容將是成對出現。

2. 分隔執行緒

不停的列印換行,能讓Worker的輸出看起來更加直觀。

該測試的結果是在一個時刻,僅有兩個執行緒能夠獲得到鎖,並完成列印,而表象就是列印的內容成對出現。

利用CAS(compare and set)是不會進行阻塞的,只會一個返回成功,一個返回失敗,保證了一致性。

CAS操作同時具有volatile讀和volatile寫的記憶體語義。

2.1 ConcurrentHashMap

    ConcurrentHashMap是執行緒安全的HashMap的實現,預設構造同樣有initialCapacity和loadFactor屬性,不過還多了一個concurrencyLevel屬性,三屬性預設值分別為16、0.75及16。其內部使用鎖分段技術,維持這鎖Segment的陣列,在Segment陣列中又存放著Entity[]陣列,內部hash演算法將資料較均勻分佈在不同鎖中。

put操作:並沒有在此方法上加上synchronized,首先對key.hashcode進行hash操作,得到key的hash值。hash操作的演算法和map也不同,根據此hash值計算並獲取其對應的陣列中的Segment物件(繼承自ReentrantLock),接著呼叫此Segment物件的put方法來完成當前操作。

ConcurrentHashMap基於concurrencyLevel劃分出了多個Segment來對key-value進行儲存,從而避免每次put操作都得鎖住整個陣列。在預設的情況下,最佳情況下可允許16個執行緒併發無阻塞的操作集合物件,儘可能地減少併發時的阻塞現象。

get(key)

    首先對key.hashCode進行hash操作,基於其值找到對應的Segment物件,呼叫其get方法完成當前操作。而Segment的get操作首先通過hash值和物件陣列大小減1的值進行按位與操作來獲取陣列上對應位置的HashEntry。在這個步驟中,可能會因為物件陣列大小的改變,以及陣列上對應位置的HashEntry產生不一致性,那麼ConcurrentHashMap是如何保證的?

    物件陣列大小的改變只有在put操作時有可能發生,由於HashEntry物件陣列對應的變數是volatile型別的,因此可以保證如HashEntry物件陣列大小發生改變,讀操作可看到最新的物件陣列大小。

    在獲取到了HashEntry物件後,怎麼能保證它及其next屬性構成的連結串列上的物件不會改變呢?這點ConcurrentHashMap採用了一個簡單的方式,即HashEntry物件中的hash、key、next屬性都是final的,這也就意味著沒辦法插入一個HashEntry物件到基於next屬性構成的連結串列中間或末尾。這樣就可以保證當獲取到HashEntry物件後,其基於next屬性構建的連結串列是不會發生變化的。

    ConcurrentHashMap預設情況下采用將資料分為16個段進行儲存,並且16個段分別持有各自不同的鎖Segment,鎖僅用於put和remove等改變集合物件的操作,基於volatile及HashEntry連結串列的不變性實現了讀取的不加鎖。這些方式使得ConcurrentHashMap能夠保持極好的併發支援,尤其是對於讀遠比插入和刪除頻繁的Map而言,而它採用的這些方法也可謂是對於Java記憶體模型、併發機制深刻掌握的體現。

2.2 ReentrantLock

    在併發包的開始部分介紹了volatile特性及AQS同步器,而這兩部分正是ReentrantLock實現的基礎。通過上面AQS的介紹及原理分析,可知道是以volatile維持的int型別的state值,來判斷執行緒是執行還是在syn佇列中等待。

ReentrantLock的實現不僅可以替代隱式的synchronized關鍵字,而且能夠提供超過關鍵字本身的多種功能。

    這裡提到一個鎖獲取的公平性問題,如果在絕對時間上,先對鎖進行獲取的請求一定被先滿足,那麼這個鎖是公平的,反之,是不公平的,也就是說等待時間最長的執行緒最有機會獲取鎖,也可以說鎖的獲取是有序的。ReentrantLock這個鎖提供了一個建構函式,能夠控制這個鎖是否是公平的。

    對於公平和非公平的定義是通過對同步器AbstractQueuedSynchronizer的擴充套件加以實現的,也就是tryAcquire的實現上做了語義的控制。

2.3 Condition

    Condition是併發包中提供的一個介面,典型的實現有ReentrantLock,ReentrantLock提供了一個mewCondition的方法,以便使用者在同一個鎖的情況下可以根據不同的情況執行等待或喚醒動作。典型的用法可參考ArrayBlockingQueue的實現,下面來看ReentrantLock中

newCondition的實現。

ReentrantLock.newCondition()

    建立一個AbstractQueuedSynchronizer的內部類ConditionObject的物件例項。

ReentrantLock.newCondition().await()

    將當前執行緒加入此condition的等待佇列中,並將執行緒置為等待狀態。

ReentrantLock.newCondition().signal()

    從此condition的等待佇列中獲取一個等待節點,並將節點上的執行緒喚醒,如果要喚醒全部等待節點的執行緒,則呼叫signalAll方法。

2.4 CopyOnWriteArrayList

    CopyOnWriteArrayList是一個執行緒安全、並且在讀操作時無鎖的ArrayList,其具體實現方法如下。

CopyOnWriteArrayList()

    和ArrayList不同,此步的做法為建立一個大小為0的陣列。

add(E)

    add方法並沒有加上synchronized關鍵字,它通過使用ReentrantLock來保證執行緒安全。此處和ArrayList的不同是每次都會建立一個新的Object陣列,此陣列的大小為當前陣列大小加1,將之前陣列中的內容複製到新的陣列中,並將

新增加的物件放入陣列末尾,最後做引用切換將新建立的陣列物件賦值給全域性的陣列物件。

remove(E)

    和add方法一樣,此方法也通過ReentrantLock來保證其執行緒安全,但它和ArrayList刪除元素採用的方式並不一樣。

    首先建立一個比當前陣列小1的陣列,遍歷新陣列,如找到equals或均為null的元素,則將之後的元素全部賦值給新的陣列物件,並做引用切換,返回true;如未找到,則將當前的元素賦值給新的陣列物件,最後特殊處理陣列中的最後

一個元素,如最後一個元素等於要刪除的元素,即將當前陣列物件賦值為新建立的陣列物件,完成刪除操作,如最後一個元素也不等於要刪除的元素,那麼返回false。

    此方法和ArrayList除了鎖不同外,最大的不同在於其複製過程並沒有呼叫System的arrayCopy來完成,理論上來說會導致效能有一定下降。

get(int)    

    此方法非常簡單,直接獲取當前陣列對應位置的元素,這種方法是沒有加鎖保護的,因此可能會出現讀到髒資料的現象。但相對而言,效能會非常高,對於寫少讀多且髒資料影響不大的場景而言是不錯的選擇。

iterator()

    呼叫iterator方法後建立一個新的COWIterator物件例項,並儲存了一個當前陣列的快照,在呼叫next遍歷時則僅對此快照陣列進行遍歷,因此遍歷此list時不會丟擲ConcurrentModificatiedException。

    與ArrayList的效能對比,在讀多寫少的併發場景中,較之ArrayList是更好的選擇,單執行緒以及多執行緒下增加元素及刪除元素的效能不比ArrayList好

2.5 CopyOnWriteArraySet

    CopyOnWriteArraySet基於CopyOnWriteArrayList實現,其唯一的不同是在add時呼叫的是CopyOnWriteArrayList的addIfAbsent方法。保證了無重複元素,但在add時每次都要進行陣列的遍歷,因此效能會略低於上個。

2.6 ArrayBlockingQueue

2.7 ThreadPoolExecutor

與每次需要時都建立執行緒相比,執行緒池可以降低建立執行緒的開銷,線上程執行結束後進行的是回收操作,提高對執行緒的複用。Java中主要使用的執行緒池是ThreadPoolExecutor,此外還有定時的執行緒池ScheduledThreadPoolExecutor。

Java裡面執行緒池的頂級介面是Executor,但是嚴格意義上講Executor並不是一個執行緒池,而只是一個執行執行緒的工具。真正的執行緒池介面是ExecutorService。

比較重要的幾個類:

ExecutorService 真正的執行緒池介面
ScheduledExecutorService 和Time/TimeTask類似,解決需要任務重複執行的問題
ThreadPoolExecutor ExecutorService的預設實現
SchedulesThreadPoolExecutor 繼承ThreadPoolExecutor的ScheduledExecutorService介面實現,週期性任務排程的類實現

要配置一個執行緒池是比較複雜的,尤其是對於執行緒池的原理不是很清楚的情況下,很有可能配置的執行緒池不是較優的,因此在Executors類裡面提供了一些靜態工廠,生成一些常用的執行緒池。

1. newSingleThreadExecutor

建立一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

2.newFixedThreadPool

建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

3. newCachedThreadPool

建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,

那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

4.newScheduledThreadPool

建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

PS:但需要注意使用,newSingleThreadExecutor和newFixedThreadPool將超過處理的執行緒放在佇列中,但工作執行緒較多時,會引起過多記憶體被佔用,而後兩者返回的執行緒池是沒有執行緒上線的,所以在使用時需要當心,建立過多的執行緒容易引起伺服器的宕機。

使用ThreadPoolExecutor自定義執行緒池,具體使用時需根據系統及JVM的配置設定適當的引數,下面是一示例:

int corePoolSize = Runtime.getRuntime().availableProcessors();
threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
               new LinkedBlockingQueue<Runnable>(2000));

2.8 Future和FutureTask

Future是一個介面,FutureTask是一個具體實現類。這裡先通過兩個場景看看其處理方式及優點。

場景1,

現在通過呼叫一個方法從遠端獲取一些計算結果,假設有這樣一個方法:

HashMap data = getDataFromRemote();

如果是最傳統的同步方式的使用,我們將一直等待getDataFromRemote()的返回,然後才能繼續後面的工作。這個函式是從遠端獲取資料的計算結果的,如果需要的時間很長,並且後面的那部分程式碼與這些資料沒有關係的話,阻塞在這裡等待結果就會比較浪費時間。如何改進呢?

能夠想到的辦法就是呼叫函式後馬上返回,然後繼續向下執行,等需要用資料時再來用或者再來等待這個資料。具體實現有兩種方式:一個是用Future,另一個使用回撥。

Future的用法

Future<HashMap> future = getDataFromRemote2();
//do something
HashMap data = future.get();

可以看到,我們呼叫的方法返回一個Future物件,然後接著進行自己的處理,後面通過future.get()來獲取真正的返回值。也即,在呼叫了getDataFromRemote2後,就已經啟動了對遠端計算結果的獲取,同時自己的執行緒還在繼續處理,直到需要時再獲取資料。來看一下getDataFromRemote2的實現:

privete Future<HashMap> getDataFromRemote2(){
    return threadPool.submit(new Callable<HashMap>(){
        public HashMap call() throws Exception{
            return getDataFromRemote();
        }
    });
}

可以看到,在getDataFromRemote2中還是使用了getDataFromRemote來完成具體操作,並且用到了執行緒池:把任務加入到執行緒池中,把Future物件返回出去。我們呼叫了getDataFromRemote2的執行緒,然後返回來繼續下面的執行,而背後是另外的執行緒在進行遠端呼叫及等待的工作。get方法也可設定超時時間引數,而不是一直等下去。

場景2,

key-value的形式儲存連線,若key存在則獲取,若不存在這個key,則建立新連線並存儲。

傳統的方式會使用HashMap來儲存並判斷key是否存在而實現連線的管理。而這在高併發的時候會出現多次建立連線的現象。那麼新的處理方式又是怎樣呢?

通過ConcurrentHashMap及FutureTask實現高併發情況的正確性,ConcurrentHashMap的分段鎖儲存滿足資料的安全性又不影響效能,FutureTask的run方法呼叫Sync.innerRun方法只會執行Runnable的run方法一次(即使是高併發情況)。

2.9 併發容器

在JDK中,有一些執行緒不安全的容器,也有一些執行緒安全的容器。併發容器是執行緒安全容器的一種,但是併發容器強調的是容器的併發性,也就是說不僅追求執行緒安全,還要考慮併發性,提升在容器併發環境下的效能。

加鎖互斥的方式確實能夠方便地完成執行緒安全,不過代價是降低了併發性,或者說是串行了。而併發容器的思路是儘量不用鎖,比較有代表性的是以CopyOnWrite和Concurrent開頭的幾個容器。CopyOnWrite容器的思路是在更改容器的時候,把容器寫一份進行修改,保證正在讀的執行緒不受影響,這種方式用在讀多寫少的場景中會非常好,因為實質上是在寫的時候重建了一次容器。而以Concurrent開頭的容器的具體實現方式則不完全相同,總體來說是儘量保證讀不加鎖,並且修改時不影響讀,所以達到比使用讀寫鎖更高的併發效能。比如上面所說的ConcurrentHashMap,其他的併發容器的具體實現,可直接分析JDK中的原始碼。