ArrayBlockingQueue原理分析(二)-迭代器
概述
在上篇文章的結構圖中可以看出,所有的佇列最後都實現了Queue介面,而Queue繼承了Collection介面,而Collection介面繼承了Iterable,由於不同的集合會根據自己集合的特性實現自己的迭代器,那本文就分析一下ArrayBlockingQueue集合迭代器的實現方式,因為之前都是一直使用這玩意,從來不清楚內部如何工作的,所以就拿這個集合的迭代器分析一下。
Iterable介面
public interface Iterable<T> { //順序迭代器 Iterator<T> iterator(); //遍歷集合,裡面傳入一個Consumer default void forEach(Consumer<? super T> action) { Objects.requireNonNull(action); for (T t : this) { action.accept(t); } } //可分割迭代器 default Spliterator<T> spliterator() { return Spliterators.spliteratorUnknownSize(iterator(), 0); } }
這個方法會返回兩個迭代器,後面都會分析,中間那個是遍歷集合中的元素,傳入一個Consumer,意思是說對集合中的每個元素都執行一下accept方法。
Iterator介面
public interface Iterator<E> { public interface Iterator<E> { //是否有下一個元素 boolean hasNext(); //返回當前的下一個 E next(); //移除,一般呼叫集合的remove方法進行移除 default void remove() { throw new UnsupportedOperationException("remove"); } //對剩餘元素遍歷 default void forEachRemaining(Consumer<? super E> action) { Objects.requireNonNull(action); while (hasNext()) action.accept(next()); } }
Spliterator介面
public interface Spliterator<T> { //單個對元素執行給定的動作,如果有剩下元素未處理返回true,否則返回false boolean tryAdvance(Consumer<? super T> action); //對每個剩餘元素執行給定的動作,依次處理,直到所有元素已被處理或被異常終止。預設方法呼叫tryAdvance方法 default void forEachRemaining(Consumer<? super T> action) { do { } while(tryAdvance(action)); } //對任務分割,返回一個新的Spliterator迭代器 Spliterator<T> trySplit(); }
從這三個介面可以看出,第一個介面只是把下面的兩個介面封裝了一下,其本身基本沒有定義迭代相關的方法,就只有一個遍歷。Iterator和Spliterator的區別在於一個是順序遍歷,另一個是可分割的遍歷,因為Java8為了加快大集合的遍歷,採用了分散式的方式,先把大集合拆分成小集合,之後多執行緒並行遍歷。本文不會介紹ArrayBlockingQueue的Spliterator迭代器,只介紹Iterator迭代器。
Itrs類
該類封裝了Itr類,而Itr實現了Iterator介面。
class Itrs { /** * Node in a linked list of weak iterator references. * 定義節點,節點中包裝了Itr迭代器,當Itr沒有強引用存在的時候 * 可以直接回收Itr,然後doSomeSweeping會回收無用的節點 */ private class Node extends WeakReference<Itr> { Node next; Node(Itr iterator, Node next) { super(iterator); this.next = next; } } /** Incremented whenever takeIndex wraps around to 0 */ int cycles = 0; /** Linked list of weak iterator references,首節點 */ private Node head; /** Used to expunge stale iterators ,要被清除的node*/ private Node sweeper = null; private static final int SHORT_SWEEP_PROBES = 4; private static final int LONG_SWEEP_PROBES = 16; //構造方法,將迭代器加入連結串列 Itrs(Itr initial) { register(initial); } //清除迭代器已經被回收的Node節點 void doSomeSweeping(boolean tryHarder) { // assert lock.getHoldCount() == 1; // assert head != null; int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; Node o, p; final Node sweeper = this.sweeper; boolean passedGo; // to limit search to one full sweep //如果為null,從頭開始清理 if (sweeper == null) { o = null; p = head; passedGo = true; } else { o = sweeper; p = o.next; passedGo = false; } for (; probes > 0; probes--) { if (p == null) { if (passedGo) break; o = null; p = head; passedGo = true; } final Itr it = p.get(); final Node next = p.next; if (it == null || it.isDetached()) { // found a discarded/exhausted iterator //這裡只要發現了需要清理的node,就重置probes,讓迴圈一直進行下去 probes = LONG_SWEEP_PROBES; // "try harder" // unlink p //移除節點 p.clear(); p.next = null; if (o == null) { head = next; if (next == null) { // We've run out of iterators to track; retire itrs = null; return; } } else o.next = next; } else { o = p; } p = next; } this.sweeper = (p == null) ? null : o; } /** * Adds a new iterator to the linked list of tracked iterators. */ void register(Itr itr) { // assert lock.getHoldCount() == 1; //將加入的節點放在連結串列的頭部 head = new Node(itr, head); } //移除節點 void removedAt(int removedIndex) { for (Node o = null, p = head; p != null;) { final Itr it = p.get(); final Node next = p.next; if (it == null || it.removedAt(removedIndex)) { // unlink p // assert it == null || it.isDetached(); p.clear(); p.next = null; if (o == null) head = next; else o.next = next; } else { o = p; } p = next; } if (head == null) // no more iterators to track itrs = null; } /** * Called whenever the queue becomes empty. * * Notifies all active iterators that the queue is empty, * clears all weak refs, and unlinks the itrs datastructure. */ void queueIsEmpty() { // assert lock.getHoldCount() == 1; for (Node p = head; p != null; p = p.next) { Itr it = p.get(); if (it != null) { p.clear(); it.shutdown(); } } head = null; itrs = null; } /** * Called whenever an element has been dequeued (at takeIndex). */ void elementDequeued() { // assert lock.getHoldCount() == 1; if (count == 0) queueIsEmpty(); else if (takeIndex == 0) takeIndexWrapped(); } }
Itr內部類
從上面的分析可知,Itrs用來管理Itr,將Itr包裝到一個Node節點中,然後節點中有一個指標,最終構成一個連結串列,其中Itr使用了弱引用包裝,方便垃圾回收。Itr類是ArrayBlockingQueue內部的一個類,實現了Iterator介面,下面看一下這個類。
屬性
private class Itr implements Iterator<E> { /** Index to look for new nextItem; NONE at end,迭代器的迭代位置*/ private int cursor; /** Element to be returned by next call to next(); null if none,下一個元素的值,這個值是佇列中的值 */ private E nextItem; /** Index of nextItem; NONE if none, REMOVED if removed elsewhere,下一個元素位置 */ private int nextIndex; /** Last element returned; null if none or not detached. */ private E lastItem; /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ private int lastRet; }
構造方法
Itr() { // assert lock.getHoldCount() == 0; lastRet = NONE; final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { //如果佇列中沒有元素 if (count == 0) { // assert itrs == null; cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { //初始化賦值,takeIndex為消費者消費的位置 final int takeIndex = ArrayBlockingQueue.this.takeIndex; prevTakeIndex = takeIndex; //這裡就是下一個待消費資料 nextItem = itemAt(nextIndex = takeIndex); //初始化遊標,初始化為takeIndex的下一個位置 cursor = incCursor(takeIndex); if (itrs == null) { //itrs為連結串列的首節點,如果首節點為null,說明當前佇列沒有迭代器 itrs = new Itrs(this); } else { //如果有的,就把當前迭代器節點放到連結串列的最前面 itrs.register(this); // in this order //清除連結串列中無效的迭代器節點 itrs.doSomeSweeping(false); } prevCycles = itrs.cycles; // assert takeIndex >= 0; // assert prevTakeIndex == takeIndex; // assert nextIndex >= 0; // assert nextItem != null; } } finally { lock.unlock(); } }
從上面可以看出,構造方法,就是為裡面的一些屬性進行賦值,這些屬性都是為了控制迭代過程的。
常用方法
public boolean hasNext() { // assert lock.getHoldCount() == 0; //直接看屬性的變數中有沒有,有就返回true if (nextItem != null) return true; noNext(); return false; } public E next() { // assert lock.getHoldCount() == 0; //直接從變數中拿,不從佇列中獲取 final E x = nextItem; if (x == null) throw new NoSuchElementException(); final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { if (!isDetached()) incorporateDequeues(); // assert nextIndex != NONE; // assert lastItem == null; lastRet = nextIndex; final int cursor = this.cursor; if (cursor >= 0) { //重新更新nextItem,如果先執行next方法,之後執行了兩次take方法 //第二次即便take已經把第二個元素取出了,這個時候再呼叫next,依然為 //執行take方法之前的第二個元素 nextItem = itemAt(nextIndex = cursor); // assert nextItem != null; //更新遊標 this.cursor = incCursor(cursor); } else { nextIndex = NONE; nextItem = null; } } finally { lock.unlock(); } return x; }
總結
整個來說,迭代器原理就是使用一些變數控制到迭代的位置,當佇列中的元素髮生變更的時候,迭代器的控制變數跟著變動就可以,由於ArrayBlockingQueue採用了加鎖的方式,所以迭代的過程中佇列中刪除元素是沒有影響的,因為會自動修復迭代器的控制變數,而且只有一個變數修改,所以不會出現執行緒不安全的問題。一個佇列可以有多個迭代器,迭代器被封裝在Node中,最後構成一個連結串列,當佇列中的元素更新的時候,方便統一更新迭代器的控制變數。