Java併發程式設計的藝術之六----併發程式設計容器和框架
1.ConcurrentHashMap的實現原理與使用
ConcurrentHashMap是執行緒安全且高效的HashMap。
1.1為什麼要使用ConcurrentHashMap
併發程式設計中使用HashMap可能導致程式死迴圈(1.8解決了擴容和put成環),可能使的對HashMap的資料操作出現未知的結果,而使用執行緒安全的Hashtable效率又非常低下,基於以上原因,才有了ConcurrentHashMap。
①執行緒不安全的HashMap,在併發執行put操作的時候,會引起資料出錯,例如++size的過程中,其他執行緒也進行插入,++size,就有可能導致插入兩個,只加一次
②效率低下Hashtable
Hashtable使用synchronized保證執行緒安全,但是線上程競爭激烈的情況下,一旦某個執行緒使用put進行新增元素,其他執行緒不能使用put方法,也不能使用get方法獲取元素。
③ConcurrentHashMap鎖分段技術可有效提高併發訪問率
假如容器裡有多把鎖,每一把鎖用於鎖容器裡的一部分資料,那麼多執行緒訪問容器裡的不同資料段的資料時執行緒間就不會存在鎖競爭,這就是鎖分段。將資料分成一段一段儲存,然後給每一段資料配一把鎖,當一個執行緒佔用鎖,訪問其中一個數據段的時候,其他段的資料能被其他執行緒訪問。
1.2ConcurrentHashMap的結構1.7
ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成。Segment是一種可重入鎖(ReentrantLock),在ConcurrentHashMap裡扮演鎖的角色;HashEntry則用於儲存鍵值對資料。一個ConcurrentHashMap裡包含一個Segment陣列。Segment的結構和HashMap類似,是一種陣列和連結串列結構。一個Segment裡包含一個HashEntry陣列,每個HashEntry是一個連結串列結構的元素,每個Segment守護著一個HashEntry數組裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得與它對應的Segment鎖
1.3ConcurrenthashMap初始化
ConcurrentHashMap初始化方法是通過initialCapacity、loadFactor和concurrencyLevel等幾個引數來初始化segment陣列、段偏移量segmentShift、段掩碼segmentMask和每個segment裡的HashEntry陣列來實現的。
①初始化segments陣列
if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); |
和concurrencyLevel相關,由於為了與hashmap一樣計算index,因此每個segments陣列的長度也要是2^n,所以ssize必定是等於或者大於concurrencyLevel的2的n次冪的數。容器裡的鎖的個數也為16。
注意:concurrencyLevel的最大值是65535,這意味著segments陣列的長度最大為65536,對應的二進位制是16位。
②初始化segmentShift和segmentMask
這兩個全域性變數需要在定位segment時的雜湊演算法裡使用,sshift等於ssize從1向左移位的次數,在預設情況下concurrencyLevel等於16,1需要向左移位移動4次,所以sshift等於4。segmentShift用於定位參與雜湊運算的位數,segmentShift等於32減sshift,所以等於28,這裡之所以用32是因為ConcurrentHashMap裡的hash()方法輸出的最大數是32位的。segmentMask是雜湊運算的掩碼,等於ssize減1,即15,掩碼的二進位制各個位的值都是1。因為ssize的最大長度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,對應的二進位制是16位,每個位都是1。
③初始化每個segment
輸入引數initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每個segment的負載因子。
if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); |
上面程式碼中的變數cap就是segment裡HashEntry陣列的長度,它等於initialCapacity除以ssize的倍數c,如果c大於1,就會取大於等於c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,預設情況下initialCapacity等於16,loadfactor等於0.75,通過運算cap等於1,threshold等於零
1.4 定位segment
元素的hashCode進行一次再雜湊
private static int hash(int h) { h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } |
減少hash雜湊衝突,使元素能夠均勻的分佈在不同的segment上,提高儲存效率。
通過下面的雜湊演算法定位segment
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; } |
1.5 ConcurrentHashMap的操作
①get操作
先經過再雜湊,然後使用這個雜湊值通過雜湊運算定位到segment,通過雜湊演算法定位到元素
get操作的高效之處在於整個get過程不需要加鎖,除非讀到的值是空才會加鎖重讀。原因是它的get方法裡將要使用的共享變數都定義成volatile型別,如用於統計當前Segement大小的count欄位和用於儲存值的HashEntry的value。在get操作裡只需要讀不需要寫共享變數count和value,所以可以不用加鎖。之所以不會讀到過期的值,是因為根據Java記憶體模型的happen before原則,對volatile欄位的寫入操作先於讀操作,即使兩個執行緒同時修改和獲取volatile變數,get操作也能拿到最新的值,這是用volatile替換鎖的經典應用場景。
雖然兩次都是用長度值減1,但是定位segment使用了高位,而hashentry直接使用
②put操作
Put方法首先定位到segment,然後在segment裡進行插入操作。兩個步驟:判斷是否需要segment裡的HashEntry陣列進行擴容,第二部定位新增元素的位置,然後放到hashentry裡面
1.是否需要擴容
首先判斷hashentry陣列長度是否超過threshold,如果超過閾值,就對陣列進行擴容。
2.如何擴容
擴容的時候,首先會建立一個容量是原來兩倍的陣列,然後將原陣列的元素再hash之後存到新的數組裡,不會對整個容器進行擴容,而只是對某個segment進行擴容。
③size操作
統計所有segment裡的陣列元素個數求和。Segment裡面的全域性變數是一個volatile變數,那麼在多執行緒場景下,並不能簡單進行count的求和。ConcurrentHashMap先嚐試兩次通過不鎖住segment的方式來統計各個segment的大小,如果統計過程中,count發生了變化,則採用加鎖方式統計所有segment大小。使用modCount變數,在put、remove、clean方法操作元素前都會對modcount進行加1,那麼只要統計size前後比較modcount是否發生變化,就可以知道容器大小是否發生變化。
2.ConcurrentLinkedQueue
實現執行緒安全的佇列,一種是使用阻塞演算法,另一種是使用非阻塞演算法。
阻塞演算法可以用一個鎖或者兩個鎖等方式實現。非阻塞實心方式,則可以使用迴圈CAS的方式實現。
ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則進行排序,當我們新增一個元素的時候,他會新增到佇列的尾部;當我們獲取一個元素的時候,他會返回佇列頭部的元素。它會返回佇列頭部的元素。它採用了“wait-free”演算法(即CAS演算法)來實現,該演算法在Michael&Scott演算法上進行了一些修改。
2.1 ConcurrentLinkedQueue的結構
ConcurrentLinkedQueue由head節點和tail節點組成,每個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,節點與節點之間就是通過這個next關聯起來,從而組成一張連結串列結構的佇列。預設情況下head節點儲存的元素為空,tail節點等於head節點。
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
2.2 入佇列
①入佇列就是將入隊節點新增到佇列的尾部。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); retry: for (;;) { Node<E> t = tail; Node<E> p = t; for (int hops = 0; ; hops++) { Node<E> next = succ(p); // 1.獲取p的後繼節點。(如果p的next指向自身,返回head節點) if (next != null) { // 2.如果next不為null if (hops > HOPS && t != tail) continue retry; // 3.如果自旋次數大於HOPS,且t不是尾節點,跳出2層迴圈重試。 p = next; // 4.如果自旋字數小於HOPS或者t是尾節點,將p指向next。 } else if (p.casNext(null, n)) { // 5.如果next為null,嘗試將p的next節點設定為n,然後自旋。 if (hops >= HOPS) casTail(t, n); // 6.如果設定成功且自旋次數大於HOPS,嘗試將n設定為尾節點,失敗也沒關係。 return true; // 7.新增成功。 } else { p = succ(p); // 8。如果第5步嘗試將p的next節點設定為n失敗,那麼將p指向p的後繼節點,然後自旋。 } } } } final Node<E> succ(Node<E> p) { Node<E> next = p.getNext(); //如果p節點的next節點指向自身,那麼返回head節點;否則返回p的next節點。 return (p == next) ? head : next; } /** * 允許頭尾節點的指標滯後,所以當頭尾節點離"實際位置"的距離 * (按節點)小於HOPS時,不會去更新頭尾指標。這裡是假設volatile寫代價比volatile讀高。 */ private static final int HOPS = 1; |
1定位出尾節點2使用cas演算法將入隊節點設定成尾節點的next節點,不成功就重試
②定位尾節點
tail節點並不總是尾節點,所以每次入隊都必須先通過tail節點來找到尾節點。尾節點可能是tail節點,也可能是tail節點的next節點。程式碼中迴圈體中的第一個if就是判斷tail是否有next節點,有則表示next節點可能是尾節點。獲取tail節點的next節點需要注意的是p節點等於p的next節點的情況,只有一種可能就是p節點和p的next節點都等於空,表示這個佇列剛初始化,正準備新增節點,所以需要返回head節點。獲取p節點的next節點程式碼如下。
③設定入隊節點為尾節點
p.casNext(null,n)方法用於將入隊節點設定為當前佇列尾節點的next節點,如果p的next是null,表示p是當前佇列的尾節點,如果不為null,表示有其他執行緒更新了尾節點,則需要重新獲取當前佇列的尾節點。
④HOPS的設計意圖
使用hops變數來控制並減少tail節點的更新頻率,並不是每次節點入隊後都將tail節點更新成尾節點,而是當tail節點和尾節點的距離大於等於常量HOPS的值(預設等於1)時才更新tail節點,tail和尾節點的距離越長,使用CAS更新tail節點的次數就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,因為迴圈體需要多迴圈一次來定位出尾節點,但是這樣仍然能提高入隊的效率,因為從本質上來看它通過增加對volatile變數的讀操作來減少對volatile變數的寫操作。
2.3出佇列
public E poll() { Node<E> h = head; Node<E> p = h; for (int hops = 0; ; hops++) { E item = p.getItem(); // 1.獲取p節點上的元素item。 if (item != null && p.casItem(item, null)) { // 2.如果item不為null,嘗試將p的item設定為null。 if (hops >= HOPS) { Node<E> q = p.getNext(); updateHead(h, (q != null) ? q : p); // 3.如果自旋次數大於HOPS,嘗試更新頭節點。 } return item; // 4.獲取元素成功。 } // 如果頭節點的元素為空或頭節點發生了變化,這說明頭節點已經被另外 // 一個執行緒修改了。那麼獲取p節點的下一個節點 Node<E> next = succ(p); // 5.獲取p的後繼節點。(如果p的next指向自身,那麼返回head節點) if (next == null) { updateHead(h, p); // 6.如果p的後繼節點為null,嘗試將p設定為頭節點,然後跳出迴圈。 break; } p = next; } return null; // 7.從第6步過來,沒有成功獲取元素,返回null。 }
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); //將h的next指向自身,幫助GC。 } |
首先獲取頭節點的元素,然後判斷頭節點元素是否為空,如果為空,表示另外一個執行緒已經進行了一次出隊操作將該節點的元素取走,如果不為空,則使用CAS的方式將頭節點的引用設定成null,如果CAS成功,則直接返回頭節點的元素,如果不成功,表示另外一個執行緒已經進行了一次出隊操作更新了head節點,導致元素髮生了變化,需要重新獲取頭節點。
3.java中的阻塞佇列
3.1 什麼是阻塞佇列
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作是阻塞的插入和移除方法。
1)支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。
2)支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
阻塞佇列常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。
阻塞佇列不可用時,這兩個附加操作提供了四種處理方式:
拋異常:add remove element;佇列滿時,再插入,與 佇列空時再取資料,都會拋異常
返回特殊值:offer,poll,插入成功返回true,移除成功返回元素值。
一直阻塞:put,take佇列滿,插入或者佇列空刪除,都會阻塞。知道不滿或者非空
超時退出:offer(time,unit),poll(time,unit)佇列滿插入或者佇列空刪除,阻塞指定時間,然後退出。
3.2 Java裡的阻塞佇列
①ArrayBlockingQueue
用陣列實現的有界阻塞佇列。此佇列按照先進先出的原則對元素進行排序。預設不保證公平性;當然也可以用可重入鎖實現公平性
②LinkedBlockingQueue
連結串列實現的有界阻塞佇列,此佇列的預設和最大長度為Integer.MAX_value.按照先進先出的規則排序
③PriorityBlockingQueue
支援優先順序的無界阻塞佇列,預設情況下采用自然排序升序,使用堆實現,不能保證同優先順序的順序。
④DelayQueue
支援延時獲取元素的無界阻塞佇列,佇列使用PriorityQueue來實現,佇列中元素實現Delayed介面,在建立元素時,可以指定多久才能從佇列中獲取當前元素。只有在延時到的時候才能取出佇列資料
運用場景:
快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從中獲取元素時,表示快取有效期到了。
定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,比如TimerQueue就是使用DelayQueue實現的。
1.如何實現Delayed介面
第一步:建立物件的時候,初始化基本資料,使用time記錄當前物件延遲到什麼時候可使用,使用sequenceNumber來標識元素在佇列中的先後順序。
第二步,實現getDelay方法,該方法返回當前元素還需要延時多少時間,單位納秒,
第三步,實現compareTo方法來指定元素的順序,讓延時時間最長的放在對列末尾
package cn.huangwei.sixth;
import java.util.Date; import java.util.concurrent.DelayQueue;
public class DelayTask implements Runnable { private int id; private DelayQueue<DelayEvent> queue;
public DelayTask(int id, DelayQueue<DelayEvent> queue) { super(); this.id = id; this.queue = queue; }
@Override public void run() { // TODO Auto-generated method stub Date now = new Date(); Date delay = new Date(); delay.setTime(now.getTime() + id * 5000); System.out.println("Thread " + id + " " + delay); DelayEvent delayEvent = new DelayEvent(id + "", delay); queue.add(delayEvent); }
}
package cn.huangwei.sixth;
import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;
public class DelayEvent implements Delayed{ private String name; private Date removeTime;
public DelayEvent(String name, Date removeTime){ super(); this.name = name; this.removeTime = removeTime; }
@Override /* * DelayQueue內部使用PriorityQueue,即用了堆結構,此比較方法的result表明,this與o物件剩餘的時間大小 * 剩餘時間小的放到堆頂,在DelayQueue中poll方法只有在getDelay小於等於0的時候才會呼叫poll方法, * 從而實現快取的效果。物件只能在快取中存活一定時間,時間到了就要被清除出去 */ public int compareTo(Delayed o) { long result = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); if(result < 0){ return -1; }else if(result > 0){ return 1; }else{ return 0; } }
@Override public long getDelay(TimeUnit unit) { Date now = new Date(); long diff = removeTime.getTime() - now.getTime(); return unit.convert(diff, TimeUnit.MILLISECONDS); }
public String getName() { return name; }
public void setName(String name) { this.name = name; } }
package cn.huangwei.sixth;
import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit;
public class DelayedQueueCache { public static void main(String[] args) throws Exception { DelayQueue<DelayEvent> queue = new DelayQueue<DelayEvent>(); Thread[] ts = new Thread[5]; for(int i = 0; i < ts.length; i++){ DelayTask task = new DelayTask(i + 1, queue); ts[i] = new Thread(task); } for(int i = 0; i < ts.length; i++){ ts[i].start(); }
for (int i = 0; i < ts.length; i++) { try { ts[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } }
do { DelayEvent delayEvent; do { delayEvent = queue.poll(); if (delayEvent != null) { System.out.println("At " + new Date() + " you have read " + delayEvent.getName()+ " event"); }else{ System.out.println("no data"); } } while (delayEvent != null); TimeUnit.MILLISECONDS.sleep(500); } while (queue.size() > 0); } }
|
⑤SynchronizedQueue
SynchronizedQueue是一個不儲存元素的阻塞佇列,每個put操作必須等待一個take 操作,否則不能繼續新增元素。
它支援公平訪問佇列。預設情況下執行緒採用非公平性策略訪問佇列。使用以下構造方法可以建立公平性訪問的SynchronousQueue,如果設定為true,則等待的執行緒會採用先進先出的
順序訪問佇列。
public SynchronousQueue(boolean fair) {
transferer = fair ?new TransferQueue() : new TransferStack();
}
佇列本身並不儲存任何元素,非常適合傳遞性場景.
⑥LinkedTransferQueue
由連結串列組成的無界阻塞TransferQueue佇列。多了tryTransfer和transfer方法
1.transfer方法
如果有消費者等待接收元素(消費者使用take方法)時,transfer可以把生產者傳入的元素立刻transfer給消費者。如果沒有消費者等待接受元素,transfer方法會將元素存放在佇列的tail節點,等到消費者接收的時候在返回。
第一行程式碼是試圖把存放當前元素的s節點作為tail節點。第二行程式碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數後使用Thread.yield()方法來暫停當前正在執行的執行緒,並執行其他執行緒。
2.tryTransfer方法
是用來試探生產者傳入的元素能否直接傳給消費者,如果消費者等待接收元素,返回false,和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法是必須等到消費者消費了才返回。
⑦LinkedBlockingDeque
由連結串列結構組成的雙向阻塞佇列。可以從佇列的兩端插入和移除元素,含有first與last相關的方法。
3.3 阻塞佇列的實現原理
①使用通知模式實現
生產者往滿的佇列裡新增元素時會阻塞住生產者,當消費者消費了一個佇列中的元素後,會通知生產者當前佇列可用。
Await方法當往佇列裡插入一個元素時,如果佇列不可用,那麼阻塞生產者主要通過LockSupport.park(this)來實現。park(this)發現呼叫setBlocker先儲存一下將要阻塞的執行緒,然後呼叫unsafe.park阻塞當前執行緒。
Park這個方法返回的情況:
①與park對應的unpark執行或已經執行時。“已經執行”是指unpark先執行,然後再執行park的情況。
②執行緒被中斷時。
③等待完time引數指定的毫秒數時。
④異常現象發生時,這個異常現象沒有任何原因。
4.fork/join框架
4.1什麼是fork/join框架
一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架。
Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。
4.2工作竊取演算法
某個執行緒從其他佇列裡竊取任務來執行;一個任務被分為若干個互不依賴的子任務,把這些子任務放到不同佇列中,為每個佇列建立一個單獨的執行緒執行任務,執行緒和佇列一一對應。如果某個執行緒的任務已做完,就會去執行其他執行緒的任務,採用雙端佇列,一個從頭取,一個從尾取。
4.3 fork/join框架的設計
步驟1 分割任務,fork類來把大人物分割成子任務,有可能子任務還是很大,還需要不停分割,直到分割出的子任務足夠小
步驟2 執行任務併合並結果,分割的任務都放在雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行,子任務執行完的結果都統一放在一個佇列中,啟動一個執行緒,從佇列裡拿資料,併合並
Fork/join使用兩個類完成以上事情。
①ForkJoinTask:建立一個ForkJoin任務,它提供fork和join操作機制,通常情況下,不需要直接繼承ForkJoinTask,只需要繼承其子類,RecursiveAction用於沒有返回結果的任務,recursiveTask用於有返回結果的任務。
②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行
任務分割出的子任務會新增到當前工作執行緒所維護的雙端佇列中,進入佇列的頭部。當一個工作執行緒的佇列裡暫時沒有任務時,它會隨機從其他工作執行緒的佇列的尾部獲取一個任務。
4.4使用Fork/Join框架
使用Fork/Join框架,需求是:計算1+2+3+4的結果
首先要考慮到的是如何分割任務,如果希望每個子任務最多執行兩個數的相加,那麼我們設定分割的閾值是2,由於是4個數字相加,所以Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然後再join兩個子任務的結果。因為是有結果的任務,所以必須繼承RecursiveTask,實現程式碼如下。
package cn.huangwei.sixth;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2; //閾值 private int start; private int end; public CountTask(int start, int end){ this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任務足夠小就計算任務 boolean canCompute = (end - start) <= THRESHOLD; if(canCompute){ for(int i = start; i <= end; i++){ sum += i; } }else{ //如果大於閾值,就分裂成兩個子任務計算 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); //執行子任務 leftTask.fork(); rightTask.fork(); //等待子任務執行完,並得到其結果 int leftResult = leftTask.join(); int rightResult = rightTask.join();
//合併子任務 sum = leftResult + rightResult; } return sum; }
public static void main(String[] args){ ForkJoinPool forkJoinPool = new ForkJoinPool(); //生成一個計算任務,負責計算1+2+3+4 CountTask task = new CountTask(1, 4); //執行一個任務 Future<Integer> result = forkJoinPool.submit(task); try{ System.out.println(result.get()); }catch(Exception e){ e.printStackTrace(); } } }
|