1. 程式人生 > >Java並發容器--ConcurrentLinkedQueue

Java並發容器--ConcurrentLinkedQueue

跳轉指令 cnblogs 什麽 success 出隊 表示 otn error 鏈表

概述

  ConcurrentLinkedQueue是一種基於鏈表實現的無界非阻塞線程安全隊列,遵循先入先出規則。

  線程安全隊列有兩種實現方式:

    阻塞方式:對入隊和出隊操作加鎖。阻塞隊列。

    非阻塞方式:通過自旋CAS實現。例如:ConcurrentLinkedQueue

  下面從源代碼中分析ConcurrentLinkedQueue的實現方法。

類關系圖

      技術分享

    從類圖可以看出,ConcurrentLinkedQueue有head和tail兩個volatile域,節點是用靜態內部類Node表示,每個Node含有元素item和指向下一個節點的指針next,都是volatile變量。

源碼分析

  Node源碼

    Node的item和next兩個域都是volatile變量,保證可見性。casItem和casNext方法使用了UNSAFE提供的CAS方法保證操作的原子性。

技術分享
 1         //Node代碼中使用了UNSAFE提供的CAS方法保證操作的原子性,
 2         //UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 
 3         //第一個參數表示要更新的對象,第二個參數nextOffset是Field的偏移量,第三個參數表示期望值,最後一個參數更新後的值。若next域的值等於cmp,則把next域更新為val並返回true;否則不更新並返回false。
4 private static class Node<E> { 5 volatile E item; //Node值,volatile保證可見性 6 volatile Node<E> next; //Node的下一個元素,volatile保證可見性 7 8 /** 9 * Constructs a new node. Uses relaxed write because item can 10 * only be seen after publication via casNext.
11 */ 12 Node(E item) { 13 UNSAFE.putObject(this, itemOffset, item); 14 } 15 16 boolean casItem(E cmp, E val) { 17 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 18 } 19 20 void lazySetNext(Node<E> val) { 21 UNSAFE.putOrderedObject(this, nextOffset, val); 22 } 23 24 boolean casNext(Node<E> cmp, Node<E> val) { 25 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 26 } 27 28 // Unsafe mechanics 29 30 private static final sun.misc.Unsafe UNSAFE; 31 private static final long itemOffset; 32 private static final long nextOffset; 33 34 static { 35 //初始化UNSAFE和各個域在類中的偏移量 36 try { 37 UNSAFE = sun.misc.Unsafe.getUnsafe();//初始化UNSAFE 38 Class k = Node.class; 39 //itemOffset是指類中item字段在Node類中的偏移量,先通過反射獲取類的item域,然後通過UNSAFE獲取item域在內存中相對於Node類首地址的偏移量。 40 itemOffset = UNSAFE.objectFieldOffset 41 (k.getDeclaredField("item")); 42 //nextOffset是指類中next字段在Node類中的偏移量 43 nextOffset = UNSAFE.objectFieldOffset 44 (k.getDeclaredField("next")); 45 } catch (Exception e) { 46 throw new Error(e); 47 } 48 } 49 }
View Code

  初始化

    創建一個空的Queue,head節點為null且tail節點等於head節點。

技術分享
1             //創建一個空的Queue,head節點為null且tail節點等於head節點
2             public ConcurrentLinkedQueue() {
3                 head = tail = new Node<E>(null);
4         
5             }
View Code

  入隊

    入隊的方法為offer,向隊列的尾部插入指定的元素,由於ConcurrentLinkedQueue是無界的,所以offer永遠返回true,不能通過返回值來判斷是否入隊成功。

    入隊大致有以下幾個步驟:

      1)根據tail節點定位出尾節點(last node);

      2)將新節點置為尾節點的下一個節點;

      3)更新尾節點casTail。

技術分享
 1         //向隊列的尾部插入指定的元素
 2         public boolean offer(E e) {
 3             checkNotNull(e);
 4             final Node<E> newNode = new Node<E>(e);//構造新Node
 5             //循環CAS直到入隊成功。1、根據tail節點定位出尾節點(last node);2、將新節點置為尾節點的下一個節點,3、更新尾節點casTail。
 6             for (Node<E> t = tail, p = t;;) {
 7                 Node<E> q = p.next;
 8                 if (q == null) {    //判斷p是不是尾節點,tail節點不一定是尾節點,判斷是不是尾節點的依據是該節點的next是不是null
 9                     // p is last node
10                     if (p.casNext(null, newNode)) {    
11                     //設置P節點的下一個節點為新節點,如果p的next為null,說明p是尾節點,casNext返回true;如果p的next不為null,說明有其他線程更新過隊列的尾節點,casNext返回false。
12                         // Successful CAS is the linearization point
13                         // for e to become an element of this queue,
14                         // and for newNode to become "live".
15                         if (p != t) // hop two nodes at a time
16                             casTail(t, newNode);  // Failure is OK.
17                         return true;
18                     }
19                     // Lost CAS race to another thread; re-read next
20                 }
21                 else if (p == q)
22                     //p節點是null的head節點剛好被出隊,更新head節點時h.lazySetNext(h)把舊的head節點指向自己
23                     // We have fallen off list.  If tail is unchanged, it
24                     // will also be off-list, in which case we need to
25                     // jump to head, from which all live nodes are always
26                     // reachable.  Else the new tail is a better bet.
27                     p = (t != (t = tail)) ? t : head;
28                 else
29                     // Check for tail updates after two hops.
30                     p = (p != t && t != (t = tail)) ? t : q;
31                     //判斷tail節點有沒有被更新,如果沒被更新,1)p=q:p指向p.next繼續尋找尾節點;
32                     //如果被更新了,2)p=t:P賦值為新的tail節點
33                     //p != t && t != (t = tail)是怎麽執行的?見隨筆附錄《通過字節碼指令分析 p != t && t != (t = tail) 語句的執行》
34                     //什麽情況下p!=t.只有本分支和else if (p == q)分支含有更新變量p和t的語句,所以在p!=t出現之前已經循環過這兩個分支至少一次。
35                     
36             }
37         }
38         
39         private boolean casTail(Node<E> cmp, Node<E> val) {
40             return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
41         }
View Code

    需要註意的是:tail不總是尾節點(last node)。DougLea大師為什麽這麽設計呢?把tail節點永遠作為Queue的尾節點實現起來不是更簡單嗎?

    下面是tail節點永遠作為Queue的尾節點的入隊方法代碼:

技術分享
 1         public boolean offer(E e) {
 2             if (e == null)
 3                 throw new NullPointerException();
 4             Node<E> n = new Node<E>(e);
 5             for (;;) {
 6                 Node<E> t = tail;
 7                 //此處如果casNext成功,那麽casTail可能會成功。因為在這兩個原子操作期間,其他線程的casNext操作都會失敗,之後的casTail不會被執行,即tail節點不變。
 8                 if (t.casNext(null, n) && casTail(t, n)) {
 9                     return true;
10                 }
11             }
12         }
View Code

    這麽做的缺點是每次入隊都會自旋CAS更新tail節點,入隊效率會降低,而DougLea的設計通過hops變量來減少入隊時減少更新tail節點的次數,默認情況下hops為1。當tail節點與尾節點的距離大於等於hops值時才更新Queue的tail節點。這樣帶來的壞處是入隊時需要根據tail定位尾節點,hops的值越大,定位時間就越長。DougLea的設計思想是通過增加對volatile變量的讀來減少對volatile變量的寫,而寫操作的開銷遠遠大於讀操作。所以從總體上來說入隊效率是提升的。

  出隊

    和入隊相似,出隊時也不是每次都會更新head節點,當head節點的item不為null時,直接彈出item;否則會更新head節點。更新head節點成功時,會把舊的head節點指向自己。

技術分享
 1             public E poll() {
 2                 restartFromHead:
 3                 //兩層循環
 4                 for (;;) {
 5                     for (Node<E> h = head, p = h, q;;) {
 6                         E item = p.item;
 7 
 8                         if (item != null && p.casItem(item, null)) {
 9                             // Successful CAS is the linearization point
10                             // for item to be removed from this queue.
11                             if (p != h) // hop two nodes at a time
12                                 updateHead(h, ((q = p.next) != null) ? q : p);
13                             return item;
14                         }
15                         //隊列為空,更新head節點
16                         else if ((q = p.next) == null) {
17                             updateHead(h, p);
18                             return null;
19                         }
20                         else if (p == q)
21                             //p節點是null的head節點剛好被出隊,更新head節點時h.lazySetNext(h);把舊的head節點指向自己。
22                             //重新從head節點開始
23                             continue restartFromHead;
24                         else
25                             p = q;    //將p執行p的下一個節點
26                     }
27                 }
28             }
29             
30             //更新head節點
31             final void updateHead(Node<E> h, Node<E> p) {
32                 //通過CAS將head更新為P
33                 if (h != p && casHead(h, p))
34                     h.lazySetNext(h);//把舊的head節點指向自己
35             }
36             
37             void lazySetNext(Node<E> val) {
38                 UNSAFE.putOrderedObject(this, nextOffset, val);
39             }
View Code

附錄:通過字節碼指令分析 p != t && t != (t = tail) 語句的執行

  在讀ConcurrentLinkedQueue源代碼時,在入隊方法的定位尾節點中讀到 p = (p != t && t != (t = tail)) ? t : q; 語句,不太理解 p != t && t != (t = tail) 的執行順序,遂通過反匯編語句仔細研究一下。

  我們都知道 A && B 運算,在A不滿足條件的情況下,B將不會執行。那在字節碼指令中是怎麽實現的呢?

  通過以下代碼模擬:

技術分享
1             public class Test {
2                 public static void main(String[] args) {
3                     int t = 8;
4                     int p = t;
5                     int tail = 9;
6                     boolean result = (p != t && t != (t = tail));
7                     System.out.println("p=" + p + ", t=" + t + ", result=" + result);
8                 }
9             }
View Code

  不出所料,運行結果為p=8, t=8, result=false。t=8說明沒有執行t != (t = tail)語句。

  看反匯編後的字節碼指令:

技術分享
 1         public class Test {
 2           public static void main(java.lang.String[] args);
 3              0  bipush 8                //將單字節常量(-128~127)壓入棧頂
 4              2  istore_1 [t]            //將棧頂int型數值存入第二個本地變量,即賦值給變量t,同時常量8出棧
 5              3  iload_1 [t]                //將第二個int型本地變量(t)壓入棧頂 
 6              4  istore_2 [p]            //將棧頂int型數值存入第三個本地變量,即賦值給變量P,同時t出棧
 7              5  bipush 9                
 8              7  istore_3 [tail]
 9              8  iload_2 [p]
10              9  iload_1 [t]
11             10  if_icmpeq 24            //比較棧頂兩int型數值大小,當結果等於0時跳轉。即比較p!=t,結果為false(0),跳轉到24行,同時p和t出棧
12             13  iload_1 [t]
13             14  iload_3 [tail]
14             15  dup
15             16  istore_1 [t]
16             17  if_icmpeq 24
17             20  iconst_1
18             21  goto 25
19             24  iconst_0                //將int型0壓入棧頂。
20             25  istore 4 [result]        //將棧頂int型數值存入指定本地變量。即將result賦值為0(false)
21             27  return
22         }
View Code

  接下來再看一下第一個條件成立時的情況。代碼將p != t改為p == t:

技術分享
1             public class Test {
2                 public static void main(String[] args) {
3                     int t = 8;
4                     int p = t;
5                     int tail = 9;
6                     boolean result = (p == t && t != (t = tail));
7                     System.out.println("p=" + p + ", t=" + t + ", result=" + result);
8                 }
9             }
View Code

  先來看運行結果p=8, t=9, result=true。說明執行了t != (t = tail)語句。

  看反匯編後的字節碼指令:

技術分享
 1         public class Test {
 2           public static void main(java.lang.String[] args);
 3              0  bipush 8
 4              2  istore_1 [t]
 5              3  iload_1 [t]
 6              4  istore_2 [p]
 7              5  bipush 9
 8              7  istore_3 [tail]
 9              8  iload_2 [p]
10              9  iload_1 [t]
11             10  if_icmpne 24            //比較棧頂兩int型數值大小,當結果不等於0時跳轉。即比較p == t,結果為true(1)。所以不會跳轉到24行,繼續執行下一行。
12             13  iload_1 [t]                //將變量t壓入棧頂,此時t=8
13             14  iload_3 [tail]            //將變量tail壓入棧頂,tail=9
14             15  dup                        //復制棧頂數值並將復制值壓入棧頂。即復制tail變量值並壓入棧頂,tail=9
15             16  istore_1 [t]            //將棧頂數值存入t變量,同時出棧
16             17  if_icmpeq 24            //比較棧頂兩int型數值大小,當結果等於0時跳轉。此時棧頂有9、8。比較9!=8,結果為true(1)。所以不會跳轉到24行,繼續執行下一行。
17             20  iconst_1                //將int型1壓入棧頂
18             21  goto 25                    //無條件跳轉到25行
19             24  iconst_0
20             25  istore 4 [result]        //將棧頂1存入result,同時出棧。即result返回true
21             27  return
22         }
View Code

  通過字節碼指令分析可知,編譯器是通過if_icmpeq和if_icmpne比較並條件跳轉指令實現&&短路與運算的。在第二種情況中,還分析了t != (t = tail)語句的執行過程,理解會更加深入。

參考資料:

  《Java並發編程的藝術》

  ConcurrentLinkedQueue源碼分析(http://www.jianshu.com/p/7816c1361439)

Java並發容器--ConcurrentLinkedQueue