ConcurrentLinkedQueue原碼解析
描述
ConcurrentLinkedQueue是一個基於單鏈表的無界執行緒安全佇列,該佇列是FIFO的。ConcurrentLinkedQueue/ConcurrentLinkedDeue和LinkedBlockingQueue/LinkedBlockingDeue
相比,不同點在於它們是non-blocking的,並且是lock-free的,而後者則是利用ReentrantLock實現的,所以他們具有更高的吞吐量。
原始碼解析(基於jdk1.8.0_40)
資料結構
類圖:
head,tail結點定義和描述:
/** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ private transient volatile Node<E> tail;
連結串列結構
建構函式,初始化時讓head和tail同時指向一個dummy結點(否則入隊時需要同時修改head,tail 2個指標,需要加鎖。而有了dummy結點,入隊/出隊只需各自競爭一個資源即可--入隊CAS競爭修改頭結點item屬性:p.casItem(item, null),出隊CAS競爭修改尾結點next指標:p.casNext(null, newNode)):
/** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
ConcurrentLinkedQueue中單鏈表結構示意圖:
* * head tail * | | * v v * * +------+ +------+ +------+ * |dummy |------>| a | ... ----->| n |----->null * +------+ +------+ +------+ *
注意head和tail指標不是一定要每次更新的,事實上它們的更新是遲鈍的,有滯後的(這樣有助於減少CAS操作,jdk7中,head/tail指標在滯後實際的頭/尾結點2步以上時才會更新,甚至會發生tail指標滯後於head指標)。真正的尾結點可以通過tail指標向後找,直到node.next==null。同樣真正的佇列頭結點可以通過head指標向後找,直到node.item!=null。下面的描述來自ConcurrentLinkedQueue Node類的doc文件:
* Both head and tail are permitted to lag. In fact, failing to
* update them every time one could is a significant optimization
* (fewer CASes). As with LinkedTransferQueue (see the internal
* documentation for that class), we use a slack threshold of two;
* that is, we update head/tail when the current pointer appears
* to be two or more steps away from the first/last node.
*
* Since head and tail are updated concurrently and independently,
* it is possible for tail to lag behind head (why not)?
入隊offer方法
從隊尾入隊,找到隊尾結點p後,cas競爭更新p.next指標,選擇性的更新tail指標(前面提到的滯後更新)。
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
出隊poll方法
從佇列頭部出隊,找到頭部結點p後,cas競爭把p.item值置null,選擇性的更新head指標(前面提到的滯後更新)。注意head,tail指標始終不會為null,即使出隊後佇列空了,head和tail也會指向dummy結點。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
updateHead方法,cas更新head指標,並通過h.lazySetNext(h),把出隊的結點h的next指標指向h自己,標示出已經出隊(雖然已經通過設定node.item為null標示,但這裡修改next指標是為了幫助GC):
/**
* Tries to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
size方法
size(), contains(), toArray()方法都需要遍歷整個連結串列。
/**
* Returns the number of elements in this queue. If this queue
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
* Additionally, if elements are added or removed during execution
* of this method, the returned result may be inaccurate. Thus,
* this method is typically not very useful in concurrent
* applications.
*
* @return the number of elements in this queue
*/
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
succ方法,尋找後繼結點(通過判斷p==p.next來跳過已經出隊的結點):
/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}