1. 程式人生 > 實用技巧 >ArrayBlockingQueue原理分析(二)-迭代器

ArrayBlockingQueue原理分析(二)-迭代器

概述

  在上篇文章的結構圖中可以看出,所有的佇列最後都實現了Queue介面,而Queue繼承了Collection介面,而Collection介面繼承了Iterable,由於不同的集合會根據自己集合的特性實現自己的迭代器,那本文就分析一下ArrayBlockingQueue集合迭代器的實現方式,因為之前都是一直使用這玩意,從來不清楚內部如何工作的,所以就拿這個集合的迭代器分析一下。

Iterable介面

public interface Iterable<T> {
    //順序迭代器
    Iterator<T> iterator();
    //遍歷集合,裡面傳入一個Consumer
    default void forEach(Consumer<? super T> action) {
        Objects.requireNonNull(action);
        for (T t : this) {
            action.accept(t);
        }
    }
    //可分割迭代器
    default Spliterator<T> spliterator() {
        return Spliterators.spliteratorUnknownSize(iterator(), 0);
     }
}

  這個方法會返回兩個迭代器,後面都會分析,中間那個是遍歷集合中的元素,傳入一個Consumer,意思是說對集合中的每個元素都執行一下accept方法。

Iterator介面

public interface Iterator<E> {

public interface Iterator<E> {
    //是否有下一個元素
    boolean hasNext();
    //返回當前的下一個
    E next();
    //移除,一般呼叫集合的remove方法進行移除
    default void remove() {
        throw new UnsupportedOperationException("remove");
    }
     //對剩餘元素遍歷
     default void forEachRemaining(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        while (hasNext())
            action.accept(next());
    }

}

Spliterator介面

public interface Spliterator<T> {
    //單個對元素執行給定的動作,如果有剩下元素未處理返回true,否則返回false
    boolean tryAdvance(Consumer<? super T> action);
    //對每個剩餘元素執行給定的動作,依次處理,直到所有元素已被處理或被異常終止。預設方法呼叫tryAdvance方法
    default void forEachRemaining(Consumer<? super T> action) {
        do { } while
(tryAdvance(action)); } //對任務分割,返回一個新的Spliterator迭代器 Spliterator<T> trySplit(); }

從這三個介面可以看出,第一個介面只是把下面的兩個介面封裝了一下,其本身基本沒有定義迭代相關的方法,就只有一個遍歷。Iterator和Spliterator的區別在於一個是順序遍歷,另一個是可分割的遍歷,因為Java8為了加快大集合的遍歷,採用了分散式的方式,先把大集合拆分成小集合,之後多執行緒並行遍歷。本文不會介紹ArrayBlockingQueue的Spliterator迭代器,只介紹Iterator迭代器。

Itrs類

該類封裝了Itr類,而Itr實現了Iterator介面。

class Itrs {

        /**
         * Node in a linked list of weak iterator references.
         * 定義節點,節點中包裝了Itr迭代器,當Itr沒有強引用存在的時候
         * 可以直接回收Itr,然後doSomeSweeping會回收無用的節點
         */
        private class Node extends WeakReference<Itr> {
            Node next;

            Node(Itr iterator, Node next) {
                super(iterator);
                this.next = next;
            }
        }

        /** Incremented whenever takeIndex wraps around to 0 */
        int cycles = 0;

        /** Linked list of weak iterator references,首節點 */
        private Node head;

        /** Used to expunge stale iterators ,要被清除的node*/
        private Node sweeper = null;

        private static final int SHORT_SWEEP_PROBES = 4;
        private static final int LONG_SWEEP_PROBES = 16;
        //構造方法,將迭代器加入連結串列
        Itrs(Itr initial) {
            register(initial);
        }
        //清除迭代器已經被回收的Node節點
        void doSomeSweeping(boolean tryHarder) {
            // assert lock.getHoldCount() == 1;
            // assert head != null;
            int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
            Node o, p;
            final Node sweeper = this.sweeper;
            boolean passedGo;   // to limit search to one full sweep
            //如果為null,從頭開始清理
            if (sweeper == null) {
                o = null;
                p = head;
                passedGo = true;
            } else {
                o = sweeper;
                p = o.next;
                passedGo = false;
            }

            for (; probes > 0; probes--) {
                if (p == null) {
                    if (passedGo)
                        break;
                    o = null;
                    p = head;
                    passedGo = true;
                }
                final Itr it = p.get();
                final Node next = p.next;
                if (it == null || it.isDetached()) {
                    // found a discarded/exhausted iterator
                    //這裡只要發現了需要清理的node,就重置probes,讓迴圈一直進行下去
                    probes = LONG_SWEEP_PROBES; // "try harder"
                    // unlink p
                    //移除節點
                    p.clear();
                    p.next = null;
                    if (o == null) {
                        head = next;
                        if (next == null) {
                            // We've run out of iterators to track; retire
                            itrs = null;
                            return;
                        }
                    }
                    else
                        o.next = next;
                } else {
                    o = p;
                }
                p = next;
            }

            this.sweeper = (p == null) ? null : o;
        }

        /**
         * Adds a new iterator to the linked list of tracked iterators.
         */
        void register(Itr itr) {
            // assert lock.getHoldCount() == 1;
            //將加入的節點放在連結串列的頭部
            head = new Node(itr, head);
        }

        //移除節點
        void removedAt(int removedIndex) {
            for (Node o = null, p = head; p != null;) {
                final Itr it = p.get();
                final Node next = p.next;
                if (it == null || it.removedAt(removedIndex)) {
                    // unlink p
                    // assert it == null || it.isDetached();
                    p.clear();
                    p.next = null;
                    if (o == null)
                        head = next;
                    else
                        o.next = next;
                } else {
                    o = p;
                }
                p = next;
            }
            if (head == null)   // no more iterators to track
                itrs = null;
        }

        /**
         * Called whenever the queue becomes empty.
         *
         * Notifies all active iterators that the queue is empty,
         * clears all weak refs, and unlinks the itrs datastructure.
         */
        void queueIsEmpty() {
            // assert lock.getHoldCount() == 1;
            for (Node p = head; p != null; p = p.next) {
                Itr it = p.get();
                if (it != null) {
                    p.clear();
                    it.shutdown();
                }
            }
            head = null;
            itrs = null;
        }

        /**
         * Called whenever an element has been dequeued (at takeIndex).
         */
        void elementDequeued() {
            // assert lock.getHoldCount() == 1;
            if (count == 0)
                queueIsEmpty();
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
    }        

Itr內部類

從上面的分析可知,Itrs用來管理Itr,將Itr包裝到一個Node節點中,然後節點中有一個指標,最終構成一個連結串列,其中Itr使用了弱引用包裝,方便垃圾回收。Itr類是ArrayBlockingQueue內部的一個類,實現了Iterator介面,下面看一下這個類。

屬性

private class Itr implements Iterator<E> {

        /** Index to look for new nextItem; NONE at end,迭代器的迭代位置*/
        private int cursor;

        /** Element to be returned by next call to next(); null if none,下一個元素的值,這個值是佇列中的值 */
        private E nextItem;

        /** Index of nextItem; NONE if none, REMOVED if removed elsewhere,下一個元素位置 */
        private int nextIndex;

        /** Last element returned; null if none or not detached. */
        private E lastItem;

        /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
        private int lastRet;
}

構造方法

Itr() {
            // assert lock.getHoldCount() == 0;
            lastRet = NONE;
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
                //如果佇列中沒有元素
                if (count == 0) {
                    // assert itrs == null;
                    cursor = NONE;
                    nextIndex = NONE;
                    prevTakeIndex = DETACHED;
                } else {
                    //初始化賦值,takeIndex為消費者消費的位置
                    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
                    prevTakeIndex = takeIndex;
                    //這裡就是下一個待消費資料
                    nextItem = itemAt(nextIndex = takeIndex);
                    //初始化遊標,初始化為takeIndex的下一個位置
                    cursor = incCursor(takeIndex);
                    if (itrs == null) {
                       //itrs為連結串列的首節點,如果首節點為null,說明當前佇列沒有迭代器
                        itrs = new Itrs(this);
                    } else {
                        //如果有的,就把當前迭代器節點放到連結串列的最前面
                        itrs.register(this); // in this order
                        //清除連結串列中無效的迭代器節點
                        itrs.doSomeSweeping(false);
                    }
                    prevCycles = itrs.cycles;
                    // assert takeIndex >= 0;
                    // assert prevTakeIndex == takeIndex;
                    // assert nextIndex >= 0;
                    // assert nextItem != null;
                }
            } finally {
                lock.unlock();
            }
        }

從上面可以看出,構造方法,就是為裡面的一些屬性進行賦值,這些屬性都是為了控制迭代過程的。

常用方法

        public boolean hasNext() {
            // assert lock.getHoldCount() == 0;
            //直接看屬性的變數中有沒有,有就返回true
            if (nextItem != null)
                return true;
            noNext();
            return false;
        }

        public E next() {
            // assert lock.getHoldCount() == 0;
            //直接從變數中拿,不從佇列中獲取
            final E x = nextItem;
            if (x == null)
                throw new NoSuchElementException();
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
                if (!isDetached())
                    incorporateDequeues();
                // assert nextIndex != NONE;
                // assert lastItem == null;
                lastRet = nextIndex;
                final int cursor = this.cursor;
                if (cursor >= 0) {
                    //重新更新nextItem,如果先執行next方法,之後執行了兩次take方法
                    //第二次即便take已經把第二個元素取出了,這個時候再呼叫next,依然為
                    //執行take方法之前的第二個元素
                    nextItem = itemAt(nextIndex = cursor);
                    // assert nextItem != null;
                    //更新遊標
                    this.cursor = incCursor(cursor);
                } else {
                    nextIndex = NONE;
                    nextItem = null;
                }
            } finally {
                lock.unlock();
            }
            return x;
        }

總結

整個來說,迭代器原理就是使用一些變數控制到迭代的位置,當佇列中的元素髮生變更的時候,迭代器的控制變數跟著變動就可以,由於ArrayBlockingQueue採用了加鎖的方式,所以迭代的過程中佇列中刪除元素是沒有影響的,因為會自動修復迭代器的控制變數,而且只有一個變數修改,所以不會出現執行緒不安全的問題。一個佇列可以有多個迭代器,迭代器被封裝在Node中,最後構成一個連結串列,當佇列中的元素更新的時候,方便統一更新迭代器的控制變數。