1. 程式人生 > 其它 >6.java併發容器與框架

6.java併發容器與框架

一、ConcurrentHashMap的實現原理與使用

ConcurrentHashMap是執行緒安全且高效的HashMap。

1.1、為什麼要使用ConcurrentHashMap

JDK1.7(ReentrantLock+Segment+HashEntry)

JDK1.8(Synchroized+CAS+紅黑樹+HashEntry)

(1)執行緒不安全的HashMap

(2)效率低下的HashTable

(3)ConcurrentHashMap的鎖分段技術可有效提升併發訪問率

1.2、ConcurrentHashMap的結構

通過ConcurrentHashMap的類圖來分析ConcurrentHashMap的結構,如圖6-1所示。

ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成。Segment是一種可重入鎖(ReentrantLock),在ConcurrentHashMap裡扮演鎖的角色;HashEntry則用於儲存鍵值對資料。一個ConcurrentHashMap裡包含一個Segment陣列。Segment的結構和HashMap類似,是一種陣列和連結串列結構。一個Segment裡包含一個HashEntry陣列,每個HashEntry是一個連結串列結構的元素,每個Segment守護著一個HashEntry數組裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得與它對應的Segment鎖,

如圖6-2所示。

1.3、ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是通過initialCapacity、loadFactor和concurrencyLevel等幾個引數來初始化segment陣列、段偏移量segmentShift、段掩碼segmentMask和每個segment裡的HashEntry陣列來實現的。

1.3.1、初始化segments陣列

讓我們來看一下初始化segments陣列的原始碼。

if (concurrencyLevel > MAX_SEGMENTS)

	concurrencyLevel = MAX_SEGMENTS;

	int sshift = 0;

	int ssize = 1;

	while (ssize < concurrencyLevel) {

		++sshift;

		ssize <<= 1;

	}

	segmentShift = 32 - sshift;

	segmentMask = ssize - 1;

	this.segments = Segment.newArray(ssize);

由上面的程式碼可知,segments陣列的長度ssize是通過concurrencyLevel計算得出的。為了能通過按位與的雜湊演算法來定位segments陣列的索引,必須保證segments陣列的長度是2的N次方(power-of-two size),所以必須計算出一個大於或等於concurrencyLevel的最小的2的N次方值來作為segments陣列的長度假如concurrencyLevel等於14、15或16,ssize都會等於16,即容器裡鎖的個數也是16。

注意 concurrencyLevel的最大值是65535,這意味著segments陣列的長度最大為65536,對應的二進位制是16位。

1.3.2、初始化segmentShift和segmentMask

這兩個全域性變數需要在定位segment時的雜湊演算法裡使用,sshift等於ssize從1向左移位的次數,在預設情況下concurrencyLevel等於16,1需要向左移位移動4次,所以sshift等於4。

segmentShift用於定位參與雜湊運算的位數,segmentShift等於32減sshift,所以等於28,這裡之所以用32是因為ConcurrentHashMap裡的hash()方法輸出的最大數是32位的,後面的測試中我們可以看到這點。segmentMask是雜湊運算的掩碼,等於ssize減1,即15,掩碼的二進位制各個位的值都是1。因為ssize的最大長度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,對應的二進位制是16位,每個位都是1。

1.3.3、初始化每個segment

輸入引數initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每個segment的負載因子,在構造方法裡需要通過這兩個引數來初始化陣列中的每個segment。

	if (initialCapacity > MAXIMUM_CAPACITY)

		initialCapacity = MAXIMUM_CAPACITY;

	int c = initialCapacity / ssize;

	if (c * ssize < initialCapacity)

		++c;

	int cap = 1;

	while (cap < c)

		cap <<= 1;

	for (int i = 0; i < this.segments.length; ++i)

		this.segments[i] = new Segment<K,V>(cap, loadFactor);

上面程式碼中的變數cap就是segment裡HashEntry陣列的長度,它等於initialCapacity除以ssize的倍數c,如果c大於1,就會取大於等於c的2的N次方值,所以cap不是1,就是2的N次方。

segment的容量threshold=(int)cap*loadFactor,預設情況下initialCapacity等於16,loadfactor等於0.75,通過運算cap等於1,threshold等於零。

1.4、定位Segment

既然ConcurrentHashMap使用分段鎖Segment來保護不同段的資料,那麼在插入和獲取元素的時候,必須先通過雜湊演算法定位到Segment。可以看到ConcurrentHashMap會首先使用Wang/Jenkins hash的變種演算法對元素的hashCode進行一次再雜湊。

private static int hash(int h) {

	h += (h << 15) ^ 0xffffcd7d;

	h ^= (h >>> 10);

	h += (h << 3);

	h ^= (h >>> 6);

	h += (h << 2) + (h << 14);

	return h ^ (h >>> 16);

}

之所以進行再雜湊,目的是減少雜湊衝突,使元素能夠均勻地分佈在不同的Segment上,從而提高容器的存取效率。假如雜湊的質量差到極點,那麼所有的元素都在一個Segment中,不僅存取元素緩慢,分段鎖也會失去意義。筆者做了一個測試,不通過再雜湊而直接執行雜湊計算。

	System.out.println(Integer.parseInt("0001111", 2) & 15);

	System.out.println(Integer.parseInt("0011111", 2) & 15);

	System.out.println(Integer.parseInt("0111111", 2) & 15);

	System.out.println(Integer.parseInt("1111111", 2) & 15);

計算後輸出的雜湊值全是15,通過這個例子可以發現,如果不進行再雜湊,雜湊衝突會非常嚴重,因為只要低位一樣,無論高位是什麼數,其雜湊值總是一樣。我們再把上面的二進位制資料進行再雜湊後結果如下(為了方便閱讀,不足32位的高位補了0,每隔4位用豎線分割下)。

	0100|0111|0110|0111|1101|1010|0100|1110

	1111|0111|0100|0011|0000|0001|1011|1000

	0111|0111|0110|1001|0100|0110|0011|1110

	1000|0011|0000|0000|1100|1000|0001|1010

可以發現,每一位的資料都雜湊開了,通過這種再雜湊能讓數字的每一位都參加到雜湊運算當中,從而減少雜湊衝突。ConcurrentHashMap通過以下雜湊演算法定位segment。

final Segment<K,V> segmentFor(int hash) {

	return segments[(hash >>> segmentShift) & segmentMask];

}

預設情況下segmentShift為28,segmentMask為15,再雜湊後的數最大是32位二進位制資料,向右無符號移動28位,意思是讓高4位參與到雜湊運算中,(hash>>>segmentShift)&segmentMask的運算結果分別是4、15、7和8,可以看到雜湊值沒有發生衝突。

1.5、ConcurrentHashMap的操作

本節介紹ConcurrentHashMap的3種操作——get操作、put操作和size操作。

1.5.1、get操作

Segment的get操作實現非常簡單和高效。先經過一次再雜湊,然後使用這個雜湊值通過雜湊運算定位到Segment,再通過雜湊演算法定位到元素,程式碼如下。

public V get(Object key) {

	int hash = hash(key.hashCode());

	return segmentFor(hash).get(key, hash);

}

get操作的高效之處在於整個get過程不需要加鎖,除非讀到的值是空才會加鎖重讀。我們知道HashTable容器的get方法是需要加鎖的,那麼ConcurrentHashMap的get操作是如何做到不加鎖的呢?原因是它的get方法裡將要使用的共享變數都定義成volatile型別,如用於統計當前Segement大小的count欄位和用於儲存值的HashEntry的value。定義成volatile的變數,能夠線上程之間保持可見性,能夠被多執行緒同時讀,並且保證不會讀到過期的值,但是隻能被單執行緒寫(有一種情況可以被多執行緒寫,就是寫入的值不依賴於原值),在get操作裡只需要讀不需要寫共享變數count和value,所以可以不用加鎖。之所以不會讀到過期的值,是因為根據Java記憶體模型的happen before原則,對volatile欄位的寫入操作先於讀操作,即使兩個執行緒同時修改和獲取volatile變數,get操作也能拿到最新的值,這是用volatile替換鎖的經典應用場景。

	transient volatile int count;

	volatile V value;

在定位元素的程式碼裡我們可以發現,定位HashEntry和定位Segment的雜湊演算法雖然一樣,都與陣列的長度減去1再相“與”,但是相“與”的值不一樣,定位Segment使用的是元素的hashcode通過再雜湊後得到的值的高位,而定位HashEntry直接使用的是再雜湊後的值。其目的是避免兩次雜湊後的值一樣,雖然元素在Segment裡雜湊開了,但是卻沒有在HashEntry裡雜湊開。

1.5.2、put操作

由於put方法裡需要對共享變數進行寫入操作,所以為了執行緒安全,在操作共享變數時必須加鎖。put方法首先定位到Segment,然後在Segment裡進行插入操作。插入操作需要經歷兩個步驟,第一步判斷是否需要對Segment裡的HashEntry陣列進行擴容,第二步定位新增元素的位置,然後將其放在HashEntry數組裡。

(1)是否需要擴容

在插入元素前會先判斷Segment裡的HashEntry陣列是否超過容量(threshold),如果超過閾值,則對陣列進行擴容。值得一提的是,Segment的擴容判斷比HashMap更恰當,因為HashMap是在插入元素後判斷元素是否已經到達容量的,如果到達了就進行擴容,但是很有可能擴容之後沒有新元素插入,這時HashMap就進行了一次無效的擴容。

(2)如何擴容

在擴容的時候,首先會建立一個容量是原來容量兩倍的陣列,然後將原數組裡的元素進行再雜湊後插入到新的數組裡。為了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。

1.5.3、size操作

如果要統計整個ConcurrentHashMap裡元素的大小,就必須統計所有Segment裡元素的大小後求和。Segment裡的全域性變數count是一個volatile變數,那麼在多執行緒場景下,是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?不是的,雖然相加時可以獲取每個Segment的count的最新值,但是可能累加前使用的count發生了變化,那麼統計結果就不準了。所以,最安全的做法是在統計size的時候把所有Segment的put、remove和clean方法全部鎖住,但是這種做法顯然非常低效。

因為在累加count操作過程中,之前累加過的count發生變化的機率非常小,所以ConcurrentHashMap的做法是先嚐試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再採用加鎖的方式來統計所有Segment的大小。

那麼ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變數,在put、remove和clean方法裡操作元素前都會將變數modCount進行加1,那麼在統計size前後比較modCount是否發生變化,從而得知容器的大小是否發生變化。

二、ConcurrentLinkedQueue

在併發程式設計中,有時候需要使用執行緒安全的佇列。如果要實現一個執行緒安全的佇列有兩種方式:一種是使用阻塞演算法,另一種是使用非阻塞演算法。使用阻塞演算法的佇列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現。非阻塞的實現方式則可以使用迴圈CAS的方式來實現。本節讓我們一起來研究一下Doug Lea是如何使用非阻塞的方式來實現執行緒安全佇列ConcurrentLinkedQueue的,相信從大師身上我們能學到不少併發程式設計的技巧。

ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部;當我們獲取一個元素時,它會返回佇列頭部的元素。它採用了“wait-free”演算法(即CAS演算法)來實現,該演算法在Michael&Scott演算法上進行了一些修改。

2.1、ConcurrentLinkedQueue的結構

通過ConcurrentLinkedQueue的類圖來分析一下它的結構,如圖6-3所示。

ConcurrentLinkedQueue由head節點和tail節點組成,每個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,節點與節點之間就是通過這個next關聯起來,從而組成一張連結串列結構的佇列。預設情況下head節點儲存的元素為空,tail節點等於head節點。

private transient volatile Node<E> tail = head;

2.2、入佇列

2.2.1、入佇列過程

入佇列就是將入隊節點新增到佇列的尾部。為了方便理解入隊時佇列的變化,以及head節點和tail節點的變化,這裡以一個示例來展開介紹。假設我們想在一個佇列中依次插入4個節點,為了幫助大家理解,每新增一個節點就做了一個佇列的快照圖,如圖6-4所示。

圖6-4所示的過程如下。

  • 新增元素1。佇列更新head節點的next節點為元素1節點。又因為tail節點預設情況下等於head節點,所以它們的next節點都指向元素1節點。

  • 新增元素2。佇列首先設定元素1節點的next節點為元素2節點,然後更新tail節點指向元素2節點。

  • 新增元素3,設定tail節點的next節點為元素3節點。

  • 新增元素4,設定元素3的next節點為元素4節點,然後將tail節點指向元素4節點。

通過除錯入隊過程並觀察head節點和tail節點的變化,發現入隊主要做兩件事情:第一是將入隊節點設定成當前佇列尾節點的下一個節點第二是更新tail節點,如果tail節點的next節點不為空,則將入隊節點設定成tail節點,如果tail節點的next節點為空,則將入隊節點設定成tail的next節點,所以tail節點不總是尾節點(理解這一點對於我們研究原始碼會非常有幫助)。

通過對上面的分析,我們從單執行緒入隊的角度理解了入隊過程,但是多個執行緒同時進行入隊的情況就變得更加複雜了,因為可能會出現其他執行緒插隊的情況。如果有一個執行緒正在入隊,那麼它必須先獲取尾節點,然後設定尾節點的下一個節點為入隊節點,但這時可能有另外一個執行緒插隊了,那麼佇列的尾節點就會發生變化,這時當前執行緒要暫停入隊操作,然後重新獲取尾節點。讓我們再通過原始碼來詳細分析一下它是如何使用CAS演算法來入隊的。

public boolean offer(E e) {

	if (e == null) throw new NullPointerException();

	// 入隊前,建立一個入隊節點

	Node<E> n = new Node<E>(e);

	retry:

	// 死迴圈,入隊不成功反覆入隊。

	for (;;) {

		// 建立一個指向tail節點的引用

		Node<E> t = tail;

		// p用來表示佇列的尾節點,預設情況下等於tail節點。

		Node<E> p = t;

		for (int hops = 0; ; hops++) {

			// 獲得p節點的下一個節點。

			Node<E> next = succ(p);

			// next節點不為空,說明p不是尾節點,需要更新p後在將它指向next節點

			if (next != null) {

			// 迴圈了兩次及其以上,並且當前節點還是不等於尾節點

				if (hops > HOPS && t != tail)

					continue retry;

				p = next;

			}

			// 如果p是尾節點,則設定p節點的next節點為入隊節點。

			else if (p.casNext(null, n)) {

			/*如果tail節點有大於等於1個next節點,則將入隊節點設定成tail節點,

			更新失敗了也沒關係,因為失敗了表示有其他執行緒成功更新了tail節點*/

				if (hops >= HOPS)

					casTail(t, n); // 更新tail節點,允許失敗

				return true;

			}

			// p有next節點,表示p的next節點是尾節點,則重新設定p節點

			else {

				p = succ(p);

			}

		}

	}

}

從原始碼角度來看,整個入隊過程主要做兩件事情:第一是定位出尾節點;第二是使用CAS演算法將入隊節點設定成尾節點的next節點,如不成功則重試。

2.2.2、定位尾結點

tail節點並不總是尾節點,所以每次入隊都必須先通過tail節點來找到尾節點。尾節點可能是tail節點,也可能是tail節點的next節點。程式碼中迴圈體中的第一個if就是判斷tail是否有next節點,有則表示next節點可能是尾節點。獲取tail節點的next節點需要注意的是p節點等於p的next節點的情況,只有一種可能就是p節點和p的next節點都等於空,表示這個佇列剛初始化,正準備新增節點,所以需要返回head節點。獲取p節點的next節點程式碼如下。

final Node<E> succ(Node<E> p) {

	Node<E> next = p.getNext();

	return (p == next) head : next;

}

2.2.3、設定入隊節點為尾節點(CAS)

p.casNext(null,n)方法用於將入隊節點設定為當前佇列尾節點的next節點,如果p是null,表示p是當前佇列的尾節點,如果不為null,表示有其他執行緒更新了尾節點,則需要重新獲取當前佇列的尾節點。

2.2.4、HOPS的設計意圖

上面分析過對於先進先出的佇列入隊所要做的事情是將入隊節點設定成尾節點,doug lea寫的程式碼和邏輯還是稍微有點複雜。那麼,我用以下方式來實現是否可行?

public boolean offer(E e) {

	if (e == null)

		throw new NullPointerException();

	Node<E> n = new Node<E>(e);

	for (;;) {

		Node<E> t = tail;
		
		//更新tail下一個節點是n,並且設定尾結點。
		if (t.casNext(null, n) && casTail(t, n)) {

			return true;

		}

	}

}

讓tail節點永遠作為佇列的尾節點,這樣實現程式碼量非常少,而且邏輯清晰和易懂。但是,這麼做有個缺點,每次都需要使用迴圈CAS更新tail節點。如果能減少CAS更新tail節點的次數,就能提高入隊的效率,所以doug lea使用hops變數來控制並減少tail節點的更新頻率,並不是每次節點入隊後都將tail節點更新成尾節點,而是當tail節點和尾節點的距離大於等於常量HOPS的值(預設等於1)時才更新tail節點,tail和尾節點的距離越長,使用CAS更新tail節點的次數就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,因為迴圈體需要多迴圈一次來定位出尾節點,但是這樣仍然能提高入隊的效率,因為從本質上來看它通過增加對volatile變數的讀操作來減少對volatile變數的寫操作,而對volatile變數的寫操作開銷要遠遠大於讀操作,所以入隊效率會有所提升

private static final int HOPS = 1;

注意 入隊方法永遠返回true,所以不要通過返回值判斷入隊是否成功。

2.3、出佇列

出佇列的就是從佇列裡返回一個節點元素,並清空該節點對元素的引用。讓我們通過每個節點出隊的快照來觀察一下head節點的變化,如圖6-5所示。

從圖中可知,並不是每次出隊時都更新head節點,當head節點裡有元素時,直接彈出head節點裡的元素,而不會更新head節點。只有當head節點裡沒有元素時,出隊操作才會更新head節點。這種做法也是通過hops變數來減少使用CAS更新head節點的消耗,從而提高出隊效率。

讓我們再通過原始碼來深入分析下出隊過程。

public E poll() {

	Node<E> h = head;

		// p表示頭節點,需要出隊的節點Node<E> p = h;

	for (int hops = 0;; hops++) {

		// 獲取p節點的元素

		E item = p.getItem();

		// 如果p節點的元素不為空,使用CAS設定p節點引用的元素為null,

		// 如果成功則返回p節點的元素。

		if (item != null && p.casItem(item, null)) {

			if (hops >= HOPS) {

				// 將p節點下一個節點設定成head節點

				Node<E> q = p.getNext();

				updateHead(h, (q != null) q : p);

			}

		return item;

		}

		// 如果頭節點的元素為空或頭節點發生了變化,這說明頭節點已經被另外

		// 一個執行緒修改了。那麼獲取p節點的下一個節點

		Node<E> next = succ(p);

		// 如果p的下一個節點也為空,說明這個佇列已經空了

		if (next == null) {

		// 更新頭節點。

			updateHead(h, p);

			break;

		}

		// 如果下一個元素不為空,則將頭節點的下一個節點設定成頭節點

		p = next;
	
	}

		return null;

}

首先獲取頭節點的元素,然後判斷頭節點元素是否為空,如果為空,表示另外一個執行緒已經進行了一次出隊操作將該節點的元素取走,如果不為空,則使用CAS的方式將頭節點的引用設定成null,如果CAS成功,則直接返回頭節點的元素,如果不成功,表示另外一個執行緒已經進行了一次出隊操作更新了head節點,導致元素髮生了變化,需要重新獲取頭節點。

三、Java中的阻塞佇列

本節將介紹什麼是阻塞佇列,以及Java中阻塞佇列的4種處理方式,並介紹Java 7中提供的7種阻塞佇列,最後分析阻塞佇列的一種實現方式。

3.1、什麼是阻塞佇列

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。

  • 1)支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。

  • 2)支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。

  • 1、丟擲異常:當佇列滿時,如果再往佇列裡插入元素,會丟擲IllegalStateException("Queuefull")異常。當佇列空時,從佇列裡獲取元素會丟擲NoSuchElementException異常。

  • 2、返回特殊值:當往佇列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從佇列裡取出一個元素,如果沒有則返回null。

  • 3、一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到佇列可用或者響應中斷退出。當佇列空時,如果消費者執行緒從佇列裡take元素,佇列會阻塞住消費者執行緒,直到佇列不為空。

  • 4、超時退出:當阻塞佇列滿時,如果生產者執行緒往佇列裡插入元素,佇列會阻塞生產者執行緒一段時間,如果超過了指定的時間,生產者執行緒就會退出。

這兩個附加操作的4種處理方式不方便記憶,所以我找了一下這幾個方法的規律。put和take分別尾首含有字母t,offer和poll都含有字母o。

注意 如果是無界阻塞佇列,佇列不可能會出現滿的情況,所以使用put或offer方法永遠不會被阻塞,而且使用offer方法時,該方法永遠返回true。

3.2、Java裡的阻塞佇列

JDK 7提供了7個阻塞佇列,如下。

  • ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。

  • LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。

  • PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。

  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。

  • SynchronousQueue:一個不儲存元素的阻塞佇列。

  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。

  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

3.2.1、ArrayBlockingQueue

ArrayBlockingQueue是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。

預設情況下不保證執行緒公平的訪問佇列,所謂公平訪問佇列是指阻塞的執行緒,可以按照阻塞的先後順序訪問佇列,即先阻塞執行緒先訪問佇列。非公平性是對先等待的執行緒是非公平的,當佇列可用時,阻塞的執行緒都可以爭奪訪問佇列的資格,有可能先阻塞的執行緒最後才訪問佇列。為了保證公平性,通常會降低吞吐量。我們可以使用以下程式碼建立一個公平的阻塞佇列。

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的,程式碼如下。

public ArrayBlockingQueue(int capacity, boolean fair) {

	if (capacity <= 0)

		throw new IllegalArgumentException();

	this.items = new Object[capacity];

	lock = new ReentrantLock(fair);

	notEmpty = lock.newCondition();

	notFull = lock.newCondition();

}

3.2.2、LinkedBlockingQueue

LinkedBlockingQueue是一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為Integer.MAX_VALUE。

此佇列按照先進先出的原則對元素進行排序

3.2.3、略

3.3、阻塞佇列的實現原理

如果佇列是空的,消費者會一直等待,當生產者新增元素時,消費者是如何知道當前佇列有元素的呢?如果讓你來設計阻塞佇列你會如何設計,如何讓生產者和消費者進行高效率的通訊呢?讓我們先來看看JDK是如何實現的。

使用通知模式實現。所謂通知模式,就是當生產者往滿的佇列裡新增元素時會阻塞住生產者,當消費者消費了一個佇列中的元素後,會通知生產者當前佇列可用。通過檢視JDK原始碼發現ArrayBlockingQueue使用了Condition來實現,程式碼如下。

private final Condition notFull;

private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {

	// 省略其他程式碼

	notEmpty = lock.newCondition();

	notFull = lock.newCondition();

}

public void put(E e) throws InterruptedException {

	checkNotNull(e);

	final ReentrantLock lock = this.lock;

	lock.lockInterruptibly();

	try {

	while (count == items.length)

		notFull.await();

		insert(e);

	} finally {

		lock.unlock();

	}

}

public E take() throws InterruptedException {

	final ReentrantLock lock = this.lock;

	lock.lockInterruptibly();

	try {

	while (count == 0)

		notEmpty.await();

		return extract();

	} finally {

		lock.unlock();

	}

}

private void insert(E x) {

	items[putIndex] = x;

	putIndex = inc(putIndex);

	++count;

	notEmpty.signal();

}

當往佇列裡插入一個元素時,如果佇列不可用,那麼阻塞生產者主要通過LockSupport.park(this)來實現。

public final void await() throws InterruptedException {

	if (Thread.interrupted())

		throw new InterruptedException();

	Node node = addConditionWaiter();

	int savedState = fullyRelease(node);

	int interruptMode = 0;

	while (!isOnSyncQueue(node)) {

		LockSupport.park(this);
	
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

			break;

	}

	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

		interruptMode = REINTERRUPT;

	if (node.nextWaiter != null) // clean up if cancelled

		unlinkCancelledWaiters();

	if (interruptMode != 0)

		reportInterruptAfterWait(interruptMode);

}

繼續進入原始碼,發現呼叫setBlocker先儲存一下將要阻塞的執行緒,然後呼叫unsafe.park阻塞當前執行緒。

public static void park(Object blocker) {

	Thread t = Thread.currentThread();

	setBlocker(t, blocker);

	unsafe.park(false, 0L);

	setBlocker(t, null);

}

unsafe.park是個native方法,程式碼如下。

public native void park(boolean isAbsolute, long time);

park這個方法會阻塞當前執行緒,只有以下4種情況中的一種發生時,該方法才會返回。

  • 與park對應的unpark執行或已經執行時。“已經執行”是指unpark先執行,然後再執行park的情況。

  • 執行緒被中斷時。

  • 等待完time引數指定的毫秒數時。

  • 異常現象發生時,這個異常現象沒有任何原因。

繼續看一下JVM是如何實現park方法:park在不同的作業系統中使用不同的方式實現,在Linux下使用的是系統方法pthread_cond_wait實現。實現程式碼在JVM原始碼路徑src/os/linux/vm/os_linux.cpp裡的os::PlatformEvent::park方法,程式碼如下。

後面看不懂了 略

四、Fork/Join框架(分治思想)

4.1、什麼是Fork/Join框架

Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架。

我們再通過Fork和Join這兩個單詞來理解一下Fork/Join框架。Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+…+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果。Fork/Join的執行流程如圖6-6所示

4.2、工作竊取演算法

工作竊取(work-stealing)演算法是指某個執行緒從其他佇列裡竊取任務來執行。那麼,為什麼需要使用工作竊取演算法呢?假如我們需要做一個比較大的任務,可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,把這些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應。比如A執行緒負責處理A佇列裡的任務。但是,有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有任務等待處理。幹完活的執行緒與其等著,不如去幫其他執行緒幹活,於是它就去其他執行緒的佇列裡竊取一個任務來執行。而在這時它們會訪問同一個佇列,所以為了減少竊取任務執行緒和被竊取任務執行緒之間的競爭,通常會使用雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行

工作竊取的執行流程如圖6-7所示。

工作竊取演算法的優點:充分利用執行緒進行平行計算,減少了執行緒間的競爭。

工作竊取演算法的缺點:在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。並且該演算法會消耗了更多的系統資源,比如建立多個執行緒和多個雙端佇列。

4.3、Fork/Join框架的設計

我們已經很清楚Fork/Join框架的需求了,那麼可以思考一下,如果讓我們來設計一個Fork/Join框架,該如何設計?這個思考有助於你理解Fork/Join框架的設計。

  • 1、分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停地分割,直到分割出的子任務足夠小。

  • 2、執行任務併合並結果。分割的子任務分別放在雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執行完的結果都統一放在一個佇列裡,啟動一個執行緒從佇列裡拿資料,然後合併這些資料。

Fork/Join使用兩個類來完成以上兩件事情。

  • ForkJoinTask:我們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類,Fork/Join框架提供了以下兩個子類。

1、RecursiveAction:用於沒有返回結果的任務。(遞迴行動)

2、RecursiveTask:用於有返回結果的任務。(遞迴任務)

  • ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。

任務分割出的子任務會新增到當前工作執行緒所維護的雙端佇列中,進入佇列的頭部。當一個工作執行緒的佇列裡暫時沒有任務時,它會隨機從其他工作執行緒的佇列的尾部獲取一個任務。

4.4、使用Fork/Join框架

讓我們通過一個簡單的需求來使用Fork/Join框架,需求是:計算1+2+3+4的結果。使用Fork/Join框架首先要考慮到的是如何分割任務,如果希望每個子任務最多執行兩個數的相加,那麼我們設定分割的閾值是2,由於是4個數字相加,所以Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然後再join兩個子任務的結果。因為是有結果的任務,所以必須繼承RecursiveTask,實現程式碼如下。

package com.book;  
  
import java.util.concurrent.ForkJoinPool;  
import java.util.concurrent.Future;  
import java.util.concurrent.RecursiveTask;  
  
/**  
 * @author shkstart  
 * @create 2021-08-03 21:08  
 */
 public class CountTask extends RecursiveTask<Integer> {  
    // 閾值  
 	private static final int THRESHOLD = 2;   
    private int start;  
    private int end;  
  
    public CountTask(int start, int end) {  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected Integer compute() {  
        int sum = 0;  
        // 如果任務足夠小就計算任務  
		 boolean canCompute = (end - start) <= THRESHOLD;  
        if (canCompute) {  
            for (int i = start; i <= end; i++) {  
                sum += i;  
            }  
        } else {  
            // 如果任務大於閾值,就分裂成兩個子任務計算  
 			int middle = (start + end) / THRESHOLD;  
            CountTask leftTask = new CountTask(start, middle);  
            CountTask rightTask = new CountTask(middle + 1, end); // 執行子任務  
 			leftTask.fork();  
            rightTask.fork(); // 等待子任務執行完,並得到其結果  
 			int leftResult = leftTask.join();  
            int rightResult = rightTask.join(); // 合併子任務  
			 sum = leftResult + rightResult;  
        }  
        return sum;  
    }  
  
    public static void main(String[] args) throws Exception{  
        ForkJoinPool forkJoinPool = new ForkJoinPool();  
        // 生成一個計算任務,負責計算1+2+3+4  
 		CountTask task = new CountTask(1, 4);  
        // 執行一個任務  
		 Future<Integer> result = forkJoinPool.submit(task);  
  
        System.out.println(result.get());  
  
    }  
  
}

通過這個例子,我們進一步瞭解ForkJoinTask,ForkJoinTask與一般任務的主要區別在於它需要實現compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在呼叫fork方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。

4.5、Fork/Join框架的異常處理

ForkJoinTask在執行的時候可能會丟擲異常,但是我們沒辦法在主執行緒裡直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。使用如下程式碼:

if(task.isCompletedAbnormally()){

	System.out.println(task.getException());

}

getException方法返回Throwable物件,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有丟擲異常則返回null。

4.6、Fork/Join框架的實現原理

ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成ForkJoinTask陣列負責將存放程式提交給ForkJoinPool的任務,而ForkJoinWorkerThread陣列負責執行這些任務。

(1)ForkJoinTask的fork方法實現原理

當我們呼叫ForkJoinTask的fork方法時,程式會呼叫ForkJoinWorkerThread的pushTask方法非同步地執行這個任務,然後立即返回結果。程式碼如下。

public final ForkJoinTask<V> fork() {

	((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);

	return this;

}

pushTask方法把當前任務存放在ForkJoinTask陣列佇列裡。然後再呼叫ForkJoinPool的signalWork()方法喚醒或建立一個工作執行緒來執行任務。程式碼如下。

final void pushTask(ForkJoinTask<> t) {

	ForkJoinTask<>[] q; int s, m;

	if ((q = queue) != null) { // ignore if queue removed

		long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;

		UNSAFE.putOrderedObject(q, u, t);

		queueTop = s + 1; // or use putOrderedInt

		if ((s -= queueBase) <= 2)

			pool.signalWork();
	
		else if (s == m)

			growQueue();

	}

}

(2)ForkJoinTask的join方法實現原理

Join方法的主要作用是阻塞當前執行緒並等待獲取結果。讓我們一起看看ForkJoinTask的join方法的實現,程式碼如下:

public final V join() {

	if (doJoin() != NORMAL)

		return reportResult();

	else

		return getRawResult();

}

private V reportResult() {

	int s; Throwable ex;

	if ((s = status) == CANCELLED)

		throw new CancellationException();

	if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)

		UNSAFE.throwException(ex);

	return getRawResult();

}

首先,它呼叫了doJoin()方法,通過doJoin()方法得到當前任務的狀態來判斷返回什麼結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、訊號(SIGNAL)和出現異常(EXCEPTIONAL)。

  • 1、如果任務狀態是已完成,則直接返回任務結果。

  • 2、如果任務狀態是被取消,則直接丟擲CancellationException。

  • 3、如果任務狀態是丟擲異常,則直接丟擲對應的異常。

讓我們再來分析一下doJoin()方法的實現程式碼。

private int doJoin() {

	Thread t; ForkJoinWorkerThread w; int s; boolean completed;

	if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {

		if ((s = status) < 0)

			return s;

		if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {

			try {

				completed = exec();

			} catch (Throwable rex) {

				return setExceptionalCompletion(rex);

			}

			if (completed)

				return setCompletion(NORMAL);

		}

		return w.joinTask(this);

	}

	else

		return externalAwaitDone();

}

在doJoin()方法裡,首先通過檢視任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;如果沒有執行完,則從任務數組裡取出任務並執行。如果任務順利執行完成,則設定任務狀態為NORMAL,如果出現異常,則記錄異常,並將任務狀態設定為EXCEPTIONAL。

五、本章小結

本章介紹了Java中提供的各種併發容器和框架,並分析了該容器和框架的實現原理,從中我們能夠領略到大師級的設計思路,希望讀者能夠充分理解這種設計思想,並在以後開發的併發程式時,運用上這些併發程式設計的技巧。