11.並發包阻塞隊列之LinkedBlockingQueue
在上文《10.並發包阻塞隊列之ArrayBlockingQueue》中簡要解析了ArrayBlockingQueue部分源碼,在本文中同樣要介紹的是Java並發包中的阻塞隊列LinkedBlockingQueue。ArrayBlockingQueue隊列是由數組實現,而LinkedBlockingQueue隊列的實現則是鏈表(單向鏈表)實現,所以在LinkedBlockingQueue有一個Node內部類來表示鏈表的節點。
static final class Node<E> { E item;//入隊元素 Node<E> next;//指向後繼節點Node(E x) { item = x; } }
同樣它也有3個構造方法,與ArrayBlockingQueue略有不同。
1 public LinkedBlockingQueue() { 2 this(Integer.MAX_VALUE)//默認構造容量為int型的最大值隊列 3 } 4 public LinkedBlockingQueue(int capacity) { 5 if (capacity <= o) throw new IllegalArgumentException(); 6 this.capacity = capacity; 7 last = head = new Node<E>(null);//頭指針和尾指針指向頭節點(null) 8 } 9 public LinkedBlockingQueue(Collection<? extends E> c ) { 10 this(Integer.MAX_VALUE); 11 final ReentrantLock putLock = this.putLock; 12 putLock.lock();//這裏和ArrayBlockingQueue也會獲取鎖,但它同樣不是為了互斥操作,同樣也是為了保證其可見性。13 try { 14 int n = 0; 15 for (E e : c) { 16 if (e == null) 17 throw new NullPointerException(); 18 if (n == capacity) 19 throw new IllegalStateException("Queue full"); 20 enqueue(new Node<E>(e));//入隊 21 ++n; 22 } 23 count.set(n); 24 } finally { 25 putLock.unlock(); 26 } 27 }
在第12行中獲取鎖是為了保證可見性,這個的原因我認為是,線程T1是實例化LinkedBlockingQueue對象,T2是對實例化的LinkedBlockingQueue對象做入隊操作(當然要保證T1和T2的執行順序),如果不對它進行加鎖操作(加鎖會保證其可見性,也就是寫回主存),T1的集合c有可能只存在T1線程維護的緩存中,並沒有寫回主存,T2中實例化的LinkedBlockingQueue維護的緩存以及主存中並沒有集合c,此時就因為可見性造成數據不一致的情況,引發線程安全問題。?
在了解完LinkedBlockingQueue的構造方法後,我們回過頭來看LinkedBlockingQueue的兩個成員變量:
private final ReentrantLock takeLock = new ReentrantLock(); private final ReentrantLock putLock = new ReentrantLock();
可見LinkedBlockingQueue中有兩個鎖,一個是為了鎖住入隊操作,一個是為了鎖住出隊操作。而在ArrayBlockingQueue中則只有一個鎖,同時鎖住隊列的入隊、出隊操作。
private final Condition notEmpty = takeLock.newCondition(); private final Condition notFull = putLock.newCondition();
這兩個成員變量則是線程等待隊列,一個是出隊鎖上的等待隊列,一個是入隊鎖上的等待隊列。在ArrayBlockingQueue也有兩個等待隊列,一個是非空等待隊列,另一個則是非滿等待隊列,在這一點上兩者一致。
隊列元素的插入
? |
拋出異常 |
返回值(非阻塞) |
一定時間內返回值 |
返回值(阻塞) |
插入 |
add(e)//隊列未滿時,返回true;隊列滿則拋出IllegalStateException(“Queue full”)異常——AbstractQueue |
offer(e)//隊列未滿時,返回true;隊列滿時返回false。非阻塞立即返回。 |
offer(e, time, unit)//設定等待的時間,如果在指定時間內還不能往隊列中插入數據則返回false,插入成功返回true。 |
put(e)//隊列未滿時,直接插入沒有返回值;隊列滿時會阻塞等待,一直等到隊列未滿時再插入。 |
LinkedBlockingQueue中並沒有像ArrayBlockingQueue那樣重寫了AbstractQueue的add方法而直接調用父類的add方法,所以LinkedBlockingQueue#add方法與ArrayBlockingQueue#add一樣,都是直接調用其AbstractQueue。
//AbstractQueue#add,這是一個模板方法,只定義add入隊算法骨架,成功時返回true,失敗時拋出IllegalStateException異常,具體offer實現交給子類實現。 public boolean add(E e) { if (offer(e))//offer方法由Queue接口定義 return true; else throw new IllegalStateException(); }
1 //LinkedBlockingQueue#offer 2 public boolean offer(E e) { 3 if (e == null) throw new NullPointerException(); 4 final AtomicInteger count = this.count;//原子型int變量,線程安全,指向隊列數據量引用 5 if (count.get() == capacity) //當數據量等於隊列容量時,無法入隊,返回false 6 return false; 7 int c = -1; 8 Node<E> node = new Node(e); 9 final ReentrantLock putLock = this.putLock;//插入鎖 10 putLock.lock();//獲得插入鎖 11 try { 12 if (count.get() < capacity) { 13 enqueuer(node);//入隊 14 c = count.getAndIncrement();//隊列數據總數自增+1後返回 15 if (c + 1 < capacity) 16 notFull.signal();//喚醒非滿等待隊列上的線程 17 } 18 } finally { 19 putLock.unlock(); 20 } 21 if (c == 0) 22 signalNotEmpty();//隊列中剛好有一個數據,喚醒非空等待隊列 23 return c >= 0 24 }
在第10行是獲取插入鎖,和ArrayBlockingQueue只有一個鎖不同的是,LinkedBlockingQueue分為入隊鎖和出隊鎖,也就是說對於ArrayBlockingQueue同時只能有一個線程對它進行入隊或者出隊操作,而對於LinkedBlockingQueue來說同時能有兩個線程對隊列進行入隊或者出隊操作。
前兩個add和offer方法都是非阻塞的,對於put方法則是阻塞的,線程會一直阻塞直到線程非空或者非滿,但是它在阻塞時能被線程中斷返回。
//LinkedBlockingQueue#put public void put(E e) throws InterruptedException { if (e == null) throws new NullPointerException(); int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterrupted();//能被線程中斷地獲取鎖 try { while (count.get() == capacity) {//隊列數據量等於隊列容量 notFull.await();//休眠非滿等待隊列上的線程 } enqueuer(node);//入隊 c = count.getAndIncrement();//隊列數據總數自增+1後返回 if (c + 1 < capacity)//還沒有達到隊列容量 notFull.signal();//喚醒非滿等待隊列上的線程 } finally { putLock.unlock(); } if (c == 0) signalNotEmpty();//喚醒非空等待隊列上的線程 }
隊列插入的最後一個方法來看上面出現的enqueue入隊方法。
private void enqueuer(Node<E> node) { last = last.next = node;//將LinkedBlockingQueue中指向隊尾的last.next指向新加入的node節點 }
隊列元素的刪除
拋出異常? |
返回值(非阻塞)? |
一定時間內返回值? |
返回值(阻塞)? |
remove()//隊列不為空時,返回隊首值並移除;隊列為空時拋出NoSuchElementException()異常——AbstractQueue? |
poll()//隊列不為空時返回隊首值並移除;隊列為空時返回null。非阻塞立即返回。? |
poll(time, unit)//設定等待的時間,如果在指定時間內隊列還未孔則返回null,不為空則返回隊首值? |
take(e)//隊列不為空返回隊首值並移除;當隊列為空時會阻塞等待,一直等到隊列不為空時再返回隊首值。? |
//AbstractQueue#remove,同樣這也是一個模板方法,定義刪除隊列元素的算法骨架,具體實現由子類來實現poll方法 public E remove() { E x = poll();//poll方法由Queue接口定義 if (x != null) return x; else throw new NoSuchElementException(); }
//LinkedBlockingQueue#poll public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock();//獲取出隊鎖 try { if (count.get() > 0) {//隊列不為空 x = dequeuer();//出隊
c = count.getAndDecrement();//隊列數據自減-1返回 if ( c > 1) notEmpty.signal();//喚醒非空等待隊列上的線程 } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull();//喚醒非滿等待隊列上的線程 return x; }
前兩個remove和poll方法都是非阻塞的,對於take方法則是阻塞的,線程會一直阻塞直到線程非空或者非滿,但是它在阻塞時能被線程中斷返回。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; take.lockInterruptibly();//可被線程中斷返回地獲取鎖 try { while (count.get() == 0) {//隊列數據為空 notEmpty.await();//休眠非空等待隊列上的線程 } x = dequeuer();//此時非空等待隊列上的線程被喚醒,隊列數據不為空,出隊 c = count.getAndDecrement(); if (c > 1) notEmpty.signal();//喚醒非空等待隊列上的線程 } finally { takeLock.unlock(); } if (c == capacity) signalNotFull();//喚醒非滿等待隊列 return x; }
隊列出隊的最後一個方法來看上面出現的dequeue入隊方法。
private E dequeue() { Node<E> h = head;//頭節點,為空 Node<E> first = h.next; h.next = h;//此時沒有節點指向頭節點,便於GC head = first; E x = first.item; first.item = null; return x; }
最後一個方法size。
public int size() { return count.get();//和ArrayBlockingQueue類似,與ConcurrentLinkedQueue不同,沒有遍歷整個隊列,而是直接返回count變量。此處的count是AtomicInteger變量。 }
11.並發包阻塞隊列之LinkedBlockingQueue