併發佇列之ConcurrentLinkedQueue
本來想著直接說執行緒池的,不過在說執行緒池之前,我們必須要知道併發安全佇列;因為一般情況下執行緒池中的執行緒數量是一定的,肯定不會超過某個閾值,那麼當任務太多了的時候,我們必須把多餘的任務儲存到併發安全佇列中,當執行緒池中的執行緒空閒下來了,就會到併發安全佇列中拿任務;
那麼什麼是併發安全佇列呢?其實可以簡單看作是一個連結串列,然後我們先辦法去存取節點;總的來說,併發安全佇列分為兩種,一種是阻塞的,一種是非阻塞的,前者是用鎖來實現的,後者用CAS實現的;
一.簡單介紹ConcurrentLinkedQueue
這個佇列用法沒什麼好說的,就類似LinkedList的用法,怎麼對一個連結串列繼續增刪改查,不多說,我們就說一下其中幾個關鍵的方法;
首先,這個佇列是一個執行緒安全的無界非阻塞佇列,其實就是一個單向連結串列,無界的意思就是沒有限制最大長度,非阻塞表示用CAS實現入隊和出隊操作,我們開啟這個類就可以知道,有一個內部類Node,其中重要的屬性如下所示:
//用於存放節點的值 volatile E item; //指向下一個節點 volatile Node<E> next; //這裡也是用的是UNSAFE類,前面說過了,這個類直接提供CAS操作 private static final sun.misc.Unsafe UNSAFE; //item欄位的偏移量 private static final long itemOffset; //next的偏移量 private static final long nextOffset;
然後ConcurrentLinkedQueue中幾個重要的屬性,好像也沒什麼重要的,就儲存了頭節點和尾節點,注意,預設情況下頭節點和尾節點都是哨兵節點,也就是一個存null的Node節點
//存放連結串列的頭節點 private transient volatile Node<E> head; //存放連結串列的尾節點 private transient volatile Node<E> tail; //UNSAFE物件 private static final sun.misc.Unsafe UNSAFE; //head欄位的偏移量 private static final long headOffset; //tail欄位偏移量 private static final long tailOffset;
下面我們直接看一些重要方法吧!慢慢分析其中的演算法才是關鍵的
二.offer方法
這個方法的作用就是在佇列末端新增一個節點,如果傳遞的引數是null,就丟擲空指標異常,否則由於該佇列是無界佇列,該方法會一直返回true,而且該方法使用CAS演算法實現的,所以不會阻塞執行緒;
//佇列末端新增一個節點 public boolean offer(E e) { //如果e為空,那麼丟擲空指標異常 checkNotNull(e); //將傳進來的元素封裝成一個節點,Node的構造器中呼叫UNSAFE.putObject(this, itemOffset, item)把e賦值給節點中的item final Node<E> newNode = new Node<E>(e); //[1] //這裡的for迴圈從最後的節點開始 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //[2]如果q為null,說明p就是最後的節點了 if (q == null) { //[3]CAS更新:如果p節點的下一個節點是null,就把寫個節點更新為newNode if (p.casNext(null, newNode)) { //[4]CAS成功,但是這時p==t,所以不會進入到這裡的if裡面,直接返回true //那麼什麼時候會走到這裡面來呢?其實是要有另外一個執行緒也在呼叫offer方法的時候,會進入到這裡面來 if (p != t) casTail(t, newNode); return true; } } else if (p == q) //[5] p = (t != (t = tail)) ? t : head; else //[6] p = (p != t && t != (t = tail)) ? t : q; } }
上面執行到[3]的時候,由於頭節點和尾節點預設都是指向哨兵節點的,由於這個時候p的下一個節點為null,所以當前執行緒A執行CAS會成功,下圖所示;
如果此時還有一個執行緒B也來嘗試[3]中CAS,由於此時p節點的下一個節點不是null了,於是執行緒B會跳到[1]出進行第二次迴圈,然後會到[6]中,由於p和t此時是相等的,所以這裡是false,即p=q,下圖所示:
然後執行緒B又會跳到[1]處進行第三次迴圈,由於執行了Node<E> q = p.next,所以此時q指向最後的null,就到了[3]處進行CAS,這次是可以成功的,成功之後如下圖所示:
這個時候因為p!=t,所以可以進入到[4],這裡又會進行一個CAS:如果tail和t指向的節點一樣,那麼就將tail指向新新增的節點,如圖所示,這個時候執行緒B也就執行完了;
其實還有[5]沒有走到,這個是在poll操作之後才執行的,我們先跳過,等說完poll方法之後再回頭看看;另外說一下,add方法其實就是呼叫的是offer方法,就不多說了;
三.poll方法
這個方法是獲取頭部的這個節點,如果佇列為空則返回null;
public E poll() { //這裡其實就是一個goto的標記,用於跳出for迴圈 restartFromHead: //[1] for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; //[2]如果當前節點中存的值不為空,則CAS設定為null if (item != null && p.casItem(item, null)) { //[3]CAS成功就更新頭節點的位置 if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } //[4]當前佇列為空,就返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } //[5]當前節點和下一個節點一樣,說明節點自引用,則重新找頭節點 else if (p == q) continue restartFromHead; //[6] else p = q; } } } final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
分為幾種情況,第一種情況是執行緒A呼叫poll方法的時候,發現佇列是空的,即頭節點和尾節點都指向哨兵節點,就會直接到[4],返回null
第二種情況,執行緒A執行到了[4],此時有一個執行緒卻呼叫offer方法添加了一個節點,下圖所示,那麼此時執行緒A就不會走[4]了,[5]也不滿足,於是會到[6]這裡來,然後執行緒A又會跳到[1]處進行迴圈,此時p指向的節點中item不為null,所以會到[2]中;
到了[2]中將p指向的節點中item用CAS設定為null,然後就到了[3],下面第一個圖,由於p!=h,q=null,所以最後呼叫的是updateHead(h,p),這方法:如果頭節點和h指向的是一樣的,就將頭節點指向p,我們還能看到updateHead方法中h.lazySetNext(h)表示h的下一個節點指向自己,下面圖二
到了這裡還沒完,還記不記得offer方法中有一個地方的程式碼沒有執行的啊!就是這種情況,尾節點自己引用自己,我們再呼叫offer會怎麼樣呢?
回到offer方法,先會到[1],然後q指向自己這個哨兵節點(注意,此時雖然p指向的節點中存的是null,但是p!=null},於是再到[5],此時的圖如下左圖所示;此時由於t==tail,所以p=head;
再在offer方法迴圈一次,此時q指向null,下面左圖所示,然後就可以進入[2]中進行CAS,CAS成功,因為此時p!=t,所以還要進行CAS將tail指向新節點,下面右圖所示,可以讓GC回收那個垃圾!
媽耶,這裡比較繞!哈哈哈哈哈哈哈哈哈哈哈
四.peek方法
這個方法的作用就是獲取佇列頭部的元素,只獲取不移除,注意這個方法和上面的poll方法的區別啊!
public E peek() { //[1]goto標誌 restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { //[2] E item = p.item; //[3] if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } //[4] else if (p == q) continue restartFromHead; else//[5] p = q; } } }
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
如果佇列中為空的時候,走到[3]的時候,就會如下圖所示,由於h==p,所以updateHead方法啥也不做,然後返回就返回item為null
如果佇列不為空,那麼如下左圖所示,此時進入迴圈內不滿足條件,會到[5]這裡,將p指向q,然後再進行一次迴圈到[3],將q指向p的後一個節點,下面右圖所示;
然後呼叫updateHead方法,用CAS將頭節點指向p這裡,然後將h自己指向自己,下圖所示,最後返回item
五.總結
其實還有幾個方法沒說,但是感覺比較容易就不浪費篇幅了,有興趣的可以看看:size方法用於計算佇列中節點的數量,可是由於沒有加鎖,在併發的條件下不準確;remove方法刪除某個節點,其實就是遍歷然後用equals方法比較item是不是一樣,只不過如果存在多個符合條件的節點只刪除第一個,然後返回true,否則返回false;contains方法判斷佇列中是否包含指定item的節點,也就是遍歷,很容易;
最麻煩的就是offer方法和poll方法,offer方法是在佇列的最後面新增節點,而poll是獲取頭節點,並且刪除第一個真正的佇列節點(注意,節點分為兩種,一種是哨兵節點,一種是真正的存了資料的節點啊),還簡單的說了一下poll方法和peek方法的區別,後者只是獲取,而不刪除啊!用下面這個圖幫助記憶一下;
&n