1. 程式人生 > >Java並發包--LinkedBlockQueue

Java並發包--LinkedBlockQueue

HA sky 調用 spa sse jdk1.7 gpo nod new

轉載請註明出處:http://www.cnblogs.com/skywang12345/p/3503458.html

LinkedBlockingQueue介紹

LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列。該隊列按 FIFO(先進先出)排序元素,新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。

此外,LinkedBlockingQueue還是可選容量的(防止過度膨脹),即可以指定隊列的容量。如果不指定,默認容量大小等於Integer.MAX_VALUE。

LinkedBlockingQueue原理和數據結構

LinkedBlockingQueue的數據結構,如下圖所示:

技術分享圖片

說明
1. LinkedBlockingQueue繼承於AbstractQueue,它本質上是一個FIFO(先進先出)的隊列。
2. LinkedBlockingQueue實現了BlockingQueue接口,它支持多線程並發。當多線程競爭同一個資源時,某線程獲取到該資源之後,其它線程需要阻塞等待。
3. LinkedBlockingQueue是通過單鏈表實現的。
(01) head是鏈表的表頭。取出數據時,都是從表頭head處插入。
(02) last是鏈表的表尾。新增數據時,都是從表尾last處插入。
(03) count是鏈表的實際大小,即當前鏈表中包含的節點個數。
(04) capacity是列表的容量,它是在創建鏈表時指定的。
(05) putLock是插入鎖,takeLock是取出鎖;notEmpty是“非空條件”,notFull是“未滿條件”。通過它們對鏈表進行並發控制。
LinkedBlockingQueue在實現“多線程對競爭資源的互斥訪問”時,對於“插入”和“取出(刪除)”操作分別使用了不同的鎖。對於插入操作,通過“插入鎖putLock”進行同步;對於取出操作,通過“取出鎖takeLock”進行同步。
此外,插入鎖putLock和“非滿條件notFull”相關聯,取出鎖takeLock和“非空條件notEmpty”相關聯。通過notFull和notEmpty更細膩的控制鎖。

     -- 若某線程(線程A)要取出數據時,隊列正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向隊列中插入了數據之後,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。 此外,線程A在執行取操作前,會獲取takeLock,在取操作執行完畢再釋放takeLock。
     -- 若某線程(線程H)要插入數據時,隊列已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之後,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。 此外,線程H在執行插入操作前,會獲取putLock,在插入操作執行完畢才釋放putLock。

關於ReentrantLock 和 Condition等更多的內容,可以參考:
(01) Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock
(02) Java多線程系列--“JUC鎖”03之 公平鎖(一)
(03) Java多線程系列--“JUC鎖”04之 公平鎖(二)
(04) Java多線程系列--“JUC鎖”05之 非公平鎖
(05) Java多線程系列--“JUC鎖”06之 Condition條件

LinkedBlockingQueue函數列表

技術分享圖片
// 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingQueue。
LinkedBlockingQueue()
// 創建一個容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含給定 collection 的元素,元素按該 collection 叠代器的遍歷順序添加。
LinkedBlockingQueue(Collection<? extends E> c)
// 創建一個具有給定(固定)容量的 LinkedBlockingQueue。
LinkedBlockingQueue(int capacity)

// 從隊列徹底移除所有元素。
void clear()
// 移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在隊列中的元素上按適當順序進行叠代的叠代器。
Iterator<E> iterator()
// 將指定元素插入到此隊列的尾部(如果立即可行且不會超出此隊列的容量),在成功時返回 true,如果此隊列已滿,則返回 false。
boolean offer(E e)
// 將指定元素插入到此隊列的尾部,如有必要,則等待指定的時間以使空間變得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。
E peek()
// 獲取並移除此隊列的頭,如果此隊列為空,則返回 null。
E poll()
// 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 將指定元素插入到此隊列的尾部,如有必要,則等待空間變得可用。
void put(E e)
// 返回理想情況下(沒有內存和資源約束)此隊列可接受並且不會被阻塞的附加元素數量。
int remainingCapacity()
// 從此隊列移除指定元素的單個實例(如果存在)。
boolean remove(Object o)
// 返回隊列中的元素個數。
int size()
// 獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。
E take()
// 返回按適當順序包含此隊列中所有元素的數組。
Object[] toArray()
// 返回按適當順序包含此隊列中所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()
技術分享圖片

LinkedBlockingQueue源碼分析(JDK1.7.0_40版本)

LinkedBlockingQueue.java的完整源碼如下:

技術分享圖片 View Code


下面從LinkedBlockingQueue的創建,添加,刪除,遍歷這幾個方面對它進行分析。

1. 創建

下面以LinkedBlockingQueue(int capacity)來進行說明。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

說明
(01) capacity是“鏈式阻塞隊列”的容量。
(02) head和last是“鏈式阻塞隊列”的首節點和尾節點。它們在LinkedBlockingQueue中的聲明如下:

技術分享圖片
// 容量
private final int capacity;
// 當前數量
private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head; // 鏈表的表頭
private transient Node<E> last; // 鏈表的表尾
// 用於控制“刪除元素”的互斥鎖takeLock 和 鎖對應的“非空條件”notEmpty
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// 用於控制“添加元素”的互斥鎖putLock 和 鎖對應的“非滿條件”notFull
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
技術分享圖片

鏈表的節點定義如下:

static class Node<E> {
    E item;         // 數據
    Node<E> next;   // 下一個節點的指針

    Node(E x) { item = x; }
}


2. 添加

下面以offer(E e)為例,對LinkedBlockingQueue的添加方法進行說明。

技術分享圖片
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    // 如果“隊列已滿”,則返回false,表示插入失敗。
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    // 新建“節點e”
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    // 獲取“插入鎖putLock”
    putLock.lock();
    try {
        // 再次對“隊列是不是滿”的進行判斷。
        // 若“隊列未滿”,則插入節點。
        if (count.get() < capacity) {
            // 插入節點
            enqueue(node);
            // 將“當前節點數量”+1,並返回“原始的數量”
            c = count.getAndIncrement();
            // 如果在插入元素之後,隊列仍然未滿,則喚醒notFull上的等待線程。
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        // 釋放“插入鎖putLock”
        putLock.unlock();
    }
    // 如果在插入節點前,隊列為空;則插入節點後,喚醒notEmpty上的等待線程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}
技術分享圖片

說明:offer()的作用很簡單,就是將元素E添加到隊列的末尾。

enqueue()的源碼如下:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

enqueue()的作用是將node添加到隊列末尾,並設置node為新的尾節點!

signalNotEmpty()的源碼如下:

技術分享圖片
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
技術分享圖片

signalNotEmpty()的作用是喚醒notEmpty上的等待線程。

3. 取出

下面以take()為例,對LinkedBlockingQueue的取出方法進行說明。

技術分享圖片
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 獲取“取出鎖”,若當前線程是中斷狀態,則拋出InterruptedException異常
    takeLock.lockInterruptibly();
    try {
        // 若“隊列為空”,則一直等待。
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 取出元素
        x = dequeue();
        // 取出元素之後,將“節點數量”-1;並返回“原始的節點數量”。
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 釋放“取出鎖”
        takeLock.unlock();
    }
    // 如果在“取出元素之前”,隊列是滿的;則在取出元素之後,喚醒notFull上的等待線程。
    if (c == capacity)
        signalNotFull();
    return x;
}
技術分享圖片

說明:take()的作用是取出並返回隊列的頭。若隊列為空,則一直等待。

dequeue()的源碼如下:

技術分享圖片
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
技術分享圖片

dequeue()的作用就是刪除隊列的頭節點,並將表頭指向“原頭節點的下一個節點”。

signalNotFull()的源碼如下:

技術分享圖片
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
技術分享圖片

signalNotFull()的作用就是喚醒notFull上的等待線程。

4. 遍歷

下面對LinkedBlockingQueue的遍歷方法進行說明。

public Iterator<E> iterator() {
  return new Itr();
}

iterator()實際上是返回一個Iter對象。

Itr類的定義如下:

技術分享圖片
private class Itr implements Iterator<E> {
    // 當前節點
    private Node<E> current;
    // 上一次返回的節點
    private Node<E> lastRet;
    // 當前節點對應的值
    private E currentElement;

    Itr() {
        // 同時獲取“插入鎖putLock” 和 “取出鎖takeLock”
        fullyLock();
        try {
            // 設置“當前元素”為“隊列表頭的下一節點”,即為隊列的第一個有效節點
            current = head.next;
            if (current != null)
                currentElement = current.item;
        } finally {
            // 釋放“插入鎖putLock” 和 “取出鎖takeLock”
            fullyUnlock();
        }
    }

    // 返回“下一個節點是否為null”
    public boolean hasNext() {
        return current != null;
    }

    private Node<E> nextNode(Node<E> p) {
        for (;;) {
            Node<E> s = p.next;
            if (s == p)
                return head.next;
            if (s == null || s.item != null)
                return s;
            p = s;
        }
    }

    // 返回下一個節點
    public E next() {
        fullyLock();
        try {
            if (current == null)
                throw new NoSuchElementException();
            E x = currentElement;
            lastRet = current;
            current = nextNode(current);
            currentElement = (current == null) ? null : current.item;
            return x;
        } finally {
            fullyUnlock();
        }
    }

    // 刪除下一個節點
    public void remove() {
        if (lastRet == null)
            throw new IllegalStateException();
        fullyLock();
        try {
            Node<E> node = lastRet;
            lastRet = null;
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (p == node) {
                    unlink(p, trail);
                    break;
                }
            }
        } finally {
            fullyUnlock();
        }
    }
}
技術分享圖片

LinkedBlockingQueue示例

技術分享圖片
 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   LinkedBlockingQueue是“線程安全”的隊列,而LinkedList是非線程安全的。
 6  *
 7  *   下面是“多個線程同時操作並且遍歷queue”的示例
 8  *   (01) 當queue是LinkedBlockingQueue對象時,程序能正常運行。
 9  *   (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingQueueDemo1 {
14 
15     // TODO: queue是LinkedList對象時,程序會出錯。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingQueue<String>();
18     public static void main(String[] args) {
19     
20         // 同時啟動兩個線程對queue進行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “線程名” + "-" + "序號"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通過“Iterator”遍歷queue。
47                 printAll();
48             }
49         }
50     }
51 }
技術分享圖片

(某一次)運行結果

技術分享圖片
tb1, ta1, 
tb1, ta1, ta2, 
tb1, ta1, ta2, ta3, 
tb1, ta1, ta2, ta3, ta4, 
tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5, 
ta4, tb1, ta5, ta1, ta6, 
ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2, 
ta5, ta6, tb2, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,
技術分享圖片

結果說明
示例程序中,啟動兩個線程(線程ta和線程tb)分別對LinkedBlockingQueue進行操作。以線程ta而言,它會先獲取“線程名”+“序號”,然後將該字符串添加到LinkedBlockingQueue中;接著,遍歷並輸出LinkedBlockingQueue中的全部元素。 線程tb的操作和線程ta一樣,只不過線程tb的名字和線程ta的名字不同。
當queue是LinkedBlockingQueue對象時,程序能正常運行。如果將queue改為LinkedList時,程序會產生ConcurrentModificationException異常。

Java並發包--LinkedBlockQueue