Java並發容器--ConcurrentLinkedQueue
概述
ConcurrentLinkedQueue是一種基於鏈表實現的無界非阻塞線程安全隊列,遵循先入先出規則。
線程安全隊列有兩種實現方式:
阻塞方式:對入隊和出隊操作加鎖。阻塞隊列。
非阻塞方式:通過自旋CAS實現。例如:ConcurrentLinkedQueue
下面從源代碼中分析ConcurrentLinkedQueue的實現方法。
類關系圖
從類圖可以看出,ConcurrentLinkedQueue有head和tail兩個volatile域,節點是用靜態內部類Node表示,每個Node含有元素item和指向下一個節點的指針next,都是volatile變量。
源碼分析
Node源碼
Node的item和next兩個域都是volatile變量,保證可見性。casItem和casNext方法使用了UNSAFE提供的CAS方法保證操作的原子性。
1 //Node代碼中使用了UNSAFE提供的CAS方法保證操作的原子性, 2 //UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 3 //第一個參數表示要更新的對象,第二個參數nextOffset是Field的偏移量,第三個參數表示期望值,最後一個參數更新後的值。若next域的值等於cmp,則把next域更新為val並返回true;否則不更新並返回false。View Code4 private static class Node<E> { 5 volatile E item; //Node值,volatile保證可見性 6 volatile Node<E> next; //Node的下一個元素,volatile保證可見性 7 8 /** 9 * Constructs a new node. Uses relaxed write because item can 10 * only be seen after publication via casNext.11 */ 12 Node(E item) { 13 UNSAFE.putObject(this, itemOffset, item); 14 } 15 16 boolean casItem(E cmp, E val) { 17 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 18 } 19 20 void lazySetNext(Node<E> val) { 21 UNSAFE.putOrderedObject(this, nextOffset, val); 22 } 23 24 boolean casNext(Node<E> cmp, Node<E> val) { 25 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 26 } 27 28 // Unsafe mechanics 29 30 private static final sun.misc.Unsafe UNSAFE; 31 private static final long itemOffset; 32 private static final long nextOffset; 33 34 static { 35 //初始化UNSAFE和各個域在類中的偏移量 36 try { 37 UNSAFE = sun.misc.Unsafe.getUnsafe();//初始化UNSAFE 38 Class k = Node.class; 39 //itemOffset是指類中item字段在Node類中的偏移量,先通過反射獲取類的item域,然後通過UNSAFE獲取item域在內存中相對於Node類首地址的偏移量。 40 itemOffset = UNSAFE.objectFieldOffset 41 (k.getDeclaredField("item")); 42 //nextOffset是指類中next字段在Node類中的偏移量 43 nextOffset = UNSAFE.objectFieldOffset 44 (k.getDeclaredField("next")); 45 } catch (Exception e) { 46 throw new Error(e); 47 } 48 } 49 }
初始化
創建一個空的Queue,head節點為null且tail節點等於head節點。
1 //創建一個空的Queue,head節點為null且tail節點等於head節點 2 public ConcurrentLinkedQueue() { 3 head = tail = new Node<E>(null); 4 5 }View Code
入隊
入隊的方法為offer,向隊列的尾部插入指定的元素,由於ConcurrentLinkedQueue是無界的,所以offer永遠返回true,不能通過返回值來判斷是否入隊成功。
入隊大致有以下幾個步驟:
1)根據tail節點定位出尾節點(last node);
2)將新節點置為尾節點的下一個節點;
3)更新尾節點casTail。
1 //向隊列的尾部插入指定的元素 2 public boolean offer(E e) { 3 checkNotNull(e); 4 final Node<E> newNode = new Node<E>(e);//構造新Node 5 //循環CAS直到入隊成功。1、根據tail節點定位出尾節點(last node);2、將新節點置為尾節點的下一個節點,3、更新尾節點casTail。 6 for (Node<E> t = tail, p = t;;) { 7 Node<E> q = p.next; 8 if (q == null) { //判斷p是不是尾節點,tail節點不一定是尾節點,判斷是不是尾節點的依據是該節點的next是不是null 9 // p is last node 10 if (p.casNext(null, newNode)) { 11 //設置P節點的下一個節點為新節點,如果p的next為null,說明p是尾節點,casNext返回true;如果p的next不為null,說明有其他線程更新過隊列的尾節點,casNext返回false。 12 // Successful CAS is the linearization point 13 // for e to become an element of this queue, 14 // and for newNode to become "live". 15 if (p != t) // hop two nodes at a time 16 casTail(t, newNode); // Failure is OK. 17 return true; 18 } 19 // Lost CAS race to another thread; re-read next 20 } 21 else if (p == q) 22 //p節點是null的head節點剛好被出隊,更新head節點時h.lazySetNext(h)把舊的head節點指向自己 23 // We have fallen off list. If tail is unchanged, it 24 // will also be off-list, in which case we need to 25 // jump to head, from which all live nodes are always 26 // reachable. Else the new tail is a better bet. 27 p = (t != (t = tail)) ? t : head; 28 else 29 // Check for tail updates after two hops. 30 p = (p != t && t != (t = tail)) ? t : q; 31 //判斷tail節點有沒有被更新,如果沒被更新,1)p=q:p指向p.next繼續尋找尾節點; 32 //如果被更新了,2)p=t:P賦值為新的tail節點 33 //p != t && t != (t = tail)是怎麽執行的?見隨筆附錄《通過字節碼指令分析 p != t && t != (t = tail) 語句的執行》 34 //什麽情況下p!=t.只有本分支和else if (p == q)分支含有更新變量p和t的語句,所以在p!=t出現之前已經循環過這兩個分支至少一次。 35 36 } 37 } 38 39 private boolean casTail(Node<E> cmp, Node<E> val) { 40 return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); 41 }View Code
需要註意的是:tail不總是尾節點(last node)。DougLea大師為什麽這麽設計呢?把tail節點永遠作為Queue的尾節點實現起來不是更簡單嗎?
下面是tail節點永遠作為Queue的尾節點的入隊方法代碼:
1 public boolean offer(E e) { 2 if (e == null) 3 throw new NullPointerException(); 4 Node<E> n = new Node<E>(e); 5 for (;;) { 6 Node<E> t = tail; 7 //此處如果casNext成功,那麽casTail可能會成功。因為在這兩個原子操作期間,其他線程的casNext操作都會失敗,之後的casTail不會被執行,即tail節點不變。 8 if (t.casNext(null, n) && casTail(t, n)) { 9 return true; 10 } 11 } 12 }View Code
這麽做的缺點是每次入隊都會自旋CAS更新tail節點,入隊效率會降低,而DougLea的設計通過hops變量來減少入隊時減少更新tail節點的次數,默認情況下hops為1。當tail節點與尾節點的距離大於等於hops值時才更新Queue的tail節點。這樣帶來的壞處是入隊時需要根據tail定位尾節點,hops的值越大,定位時間就越長。DougLea的設計思想是通過增加對volatile變量的讀來減少對volatile變量的寫,而寫操作的開銷遠遠大於讀操作。所以從總體上來說入隊效率是提升的。
出隊
和入隊相似,出隊時也不是每次都會更新head節點,當head節點的item不為null時,直接彈出item;否則會更新head節點。更新head節點成功時,會把舊的head節點指向自己。
1 public E poll() { 2 restartFromHead: 3 //兩層循環 4 for (;;) { 5 for (Node<E> h = head, p = h, q;;) { 6 E item = p.item; 7 8 if (item != null && p.casItem(item, null)) { 9 // Successful CAS is the linearization point 10 // for item to be removed from this queue. 11 if (p != h) // hop two nodes at a time 12 updateHead(h, ((q = p.next) != null) ? q : p); 13 return item; 14 } 15 //隊列為空,更新head節點 16 else if ((q = p.next) == null) { 17 updateHead(h, p); 18 return null; 19 } 20 else if (p == q) 21 //p節點是null的head節點剛好被出隊,更新head節點時h.lazySetNext(h);把舊的head節點指向自己。 22 //重新從head節點開始 23 continue restartFromHead; 24 else 25 p = q; //將p執行p的下一個節點 26 } 27 } 28 } 29 30 //更新head節點 31 final void updateHead(Node<E> h, Node<E> p) { 32 //通過CAS將head更新為P 33 if (h != p && casHead(h, p)) 34 h.lazySetNext(h);//把舊的head節點指向自己 35 } 36 37 void lazySetNext(Node<E> val) { 38 UNSAFE.putOrderedObject(this, nextOffset, val); 39 }View Code
附錄:通過字節碼指令分析 p != t && t != (t = tail) 語句的執行
在讀ConcurrentLinkedQueue源代碼時,在入隊方法的定位尾節點中讀到 p = (p != t && t != (t = tail)) ? t : q; 語句,不太理解 p != t && t != (t = tail) 的執行順序,遂通過反匯編語句仔細研究一下。
我們都知道 A && B 運算,在A不滿足條件的情況下,B將不會執行。那在字節碼指令中是怎麽實現的呢?
通過以下代碼模擬:
1 public class Test { 2 public static void main(String[] args) { 3 int t = 8; 4 int p = t; 5 int tail = 9; 6 boolean result = (p != t && t != (t = tail)); 7 System.out.println("p=" + p + ", t=" + t + ", result=" + result); 8 } 9 }View Code
不出所料,運行結果為p=8, t=8, result=false。t=8說明沒有執行t != (t = tail)語句。
看反匯編後的字節碼指令:
1 public class Test { 2 public static void main(java.lang.String[] args); 3 0 bipush 8 //將單字節常量(-128~127)壓入棧頂 4 2 istore_1 [t] //將棧頂int型數值存入第二個本地變量,即賦值給變量t,同時常量8出棧 5 3 iload_1 [t] //將第二個int型本地變量(t)壓入棧頂 6 4 istore_2 [p] //將棧頂int型數值存入第三個本地變量,即賦值給變量P,同時t出棧 7 5 bipush 9 8 7 istore_3 [tail] 9 8 iload_2 [p] 10 9 iload_1 [t] 11 10 if_icmpeq 24 //比較棧頂兩int型數值大小,當結果等於0時跳轉。即比較p!=t,結果為false(0),跳轉到24行,同時p和t出棧 12 13 iload_1 [t] 13 14 iload_3 [tail] 14 15 dup 15 16 istore_1 [t] 16 17 if_icmpeq 24 17 20 iconst_1 18 21 goto 25 19 24 iconst_0 //將int型0壓入棧頂。 20 25 istore 4 [result] //將棧頂int型數值存入指定本地變量。即將result賦值為0(false) 21 27 return 22 }View Code
接下來再看一下第一個條件成立時的情況。代碼將p != t改為p == t:
1 public class Test { 2 public static void main(String[] args) { 3 int t = 8; 4 int p = t; 5 int tail = 9; 6 boolean result = (p == t && t != (t = tail)); 7 System.out.println("p=" + p + ", t=" + t + ", result=" + result); 8 } 9 }View Code
先來看運行結果p=8, t=9, result=true。說明執行了t != (t = tail)語句。
看反匯編後的字節碼指令:
1 public class Test { 2 public static void main(java.lang.String[] args); 3 0 bipush 8 4 2 istore_1 [t] 5 3 iload_1 [t] 6 4 istore_2 [p] 7 5 bipush 9 8 7 istore_3 [tail] 9 8 iload_2 [p] 10 9 iload_1 [t] 11 10 if_icmpne 24 //比較棧頂兩int型數值大小,當結果不等於0時跳轉。即比較p == t,結果為true(1)。所以不會跳轉到24行,繼續執行下一行。 12 13 iload_1 [t] //將變量t壓入棧頂,此時t=8 13 14 iload_3 [tail] //將變量tail壓入棧頂,tail=9 14 15 dup //復制棧頂數值並將復制值壓入棧頂。即復制tail變量值並壓入棧頂,tail=9 15 16 istore_1 [t] //將棧頂數值存入t變量,同時出棧 16 17 if_icmpeq 24 //比較棧頂兩int型數值大小,當結果等於0時跳轉。此時棧頂有9、8。比較9!=8,結果為true(1)。所以不會跳轉到24行,繼續執行下一行。 17 20 iconst_1 //將int型1壓入棧頂 18 21 goto 25 //無條件跳轉到25行 19 24 iconst_0 20 25 istore 4 [result] //將棧頂1存入result,同時出棧。即result返回true 21 27 return 22 }View Code
通過字節碼指令分析可知,編譯器是通過if_icmpeq和if_icmpne比較並條件跳轉指令實現&&短路與運算的。在第二種情況中,還分析了t != (t = tail)語句的執行過程,理解會更加深入。
參考資料:
《Java並發編程的藝術》
ConcurrentLinkedQueue源碼分析(http://www.jianshu.com/p/7816c1361439)
Java並發容器--ConcurrentLinkedQueue