Java生產消費模型—ArrayBlockingQueue詳解
背景需求
生產消費模型是線程協作關系中十分常見的一種。通常,一個(多個)線程負責生產,一個(多個)線程可以從生產的列表中獲取並消費;生產的內容可以按需求設計,可以是一個Integer,可以是String,可以Object,也可以是任意類型的對象,只要有生產消費的需求。
例如,廚師負責生產美食,放在桌子上,服務員負責取走(消費)美食。這裏,廚師就扮演著生產者的身份,美食是生產的內容,服務員就扮演著消費者的身份。
下面用這個廚師與服務員的案例來分析下生產消費模型需要實現哪些功能才能滿足需求:
如何實現這個需求
若要實現以上的需求,我們該考慮哪些方面呢?
(1)廚師是廚師,負責做美食;服務員負責消費美食。廚師與服務員可以同時運行(兩個獨立線程
(2)廚師與服務員作為兩個獨立線程,必須有一個約定好的公共區域:廚師把生產好的美食往這個區域放,服務員從這個區域取。並且,廚師與服務員並不想和對方接觸過多(低耦合),只想和這個公共區域(桌子)打交道。
(3)通常,先生產的內容應該被先消費(先做的美食先送給顧客,防止涼了),符合FIFO特性。若要選取某種數據結構的容器作為公共區域,Queue是最佳方案(符合FIFO特性)。
(4)並發有危險:廚師和服務員都在這個公共區域(Queue)中操作,同時操作可能存在問題。例如服務員正在從區域A拿盤子時,廚師把新的盤子也往區域A放,會發生碰撞;又如,同一個盤子可能有多個服務員過來爭搶;也可能,多個廚師做好了美食把盤子往同一個區域放,也會發生碰撞。
因此,需要實現並發的保護:廚師(生產者)往桌子(Queue)上放盤子(生產)之前,先獲取鎖,以保證他在操作共享區域(Queue)時沒有其他廚師或者服務員過來爭搶導致發生沖突;在放完之後,釋放掉鎖,讓其他的廚師或者服務員操作。服務員操作時也是一個道理,要先獲取鎖,操作完成之後要釋放鎖。
(5)阻塞的需求:若桌子(Queue)空了,服務員該怎麽辦呢?是每隔幾秒鐘過來看一下桌子?不好,因為這樣太累(輪訓方式開銷大,並不知道什麽時候Queue中才會有新的盤子)。
比較好的方案是:在桌子上放一個BB機(Queue中實現條件變量),和廚師約定好:若桌子空了,服務員可以去睡覺,等廚師做好飯了,通過BB機呼叫一下服務員(喚醒消費線程)(若Queue消費完畢,消費線程可以阻塞等待【 隊列非空】的條件,當生產線程有新的生產內容,把內容放進Queue之後,通過條件變量喚醒消費線程
同理,也可能出現相反的場景:服務員比較少,端盤子比較慢,而廚師比較多,做飯比較快(生產速度快於消費速度)。這時,若桌子無限大(無界隊列),那廚師會一直往桌子上放,導致桌子上盤子越來越多;而若桌子大小有限(有界隊列),那麽當桌子放滿了之後,那就沒地兒放了,咋辦?
可以用一樣的方式,再在隊列內部添加一個條件變量,當隊列滿了,生產者則等待該隊列【隊列未滿】條件的發生,同時休眠等待。當消費者消費一次之後,觸發【隊列未滿】的條件,這時生產者可以被喚醒繼續工作。
Java類庫中成熟的設計-ArrayBlockingQueue
為了滿足無數場景下以上類似的需求,jdk中加入了該線程安全阻塞FIFO隊列的實現類:ArrayBlockingQueue,繼承關系如下:
首先,BlockingQueue最基礎的是個集合Collection;
同時,實現了Queue的接口,因此具備普通Queue的特性,可以offer/add以添加元素至隊列尾部,可以poll以從隊列頭部取內容,可以peek查看隊列頭的元素。
同時,實現了BlockingQueue的接口,在Queue基礎上實現的特性:
(1)一個是線程安全,可以並發offer,可以並發poll,可以並發同時offer和poll,內部是加鎖ReentrantLock實現的;
(2)另一個,就是阻塞功能。
>> 當調用blockingQueue.put(E e)接口想將元素入隊列時,若隊列未滿,則直接入隊列(enqueue);
若隊列已滿,則notFull.await()休眠等待條件變量【notFull隊列未滿】的發生,才喚醒線程繼續生產。
>> 當調用blockingQueue.take()接口時想從隊列中取隊列頭的元素時,若隊列為空,則直接取走(dequeue);
若隊列已空,則notEmpty.await()休眠等到條件變量【notEmpry隊列未滿】的發生,才喚醒線程繼續消費。
源碼解讀
下面,帶著以上這些概念的基礎,看下源碼實現。
首先,成員:
/** The queued items */ final Object[] items; //保存生產內容對象 /** items index for next take, poll, peek or remove */ int takeIndex; //數組下一個要消費位置 /** items index for next put, offer, or add */ int putIndex; //數組中下一個要生產存放的位置 /** Number of elements in the queue */ int count; //當前總共存放的內容對象數量 /** Main lock guarding all access */ final ReentrantLock lock; //並發操作的互斥,讀取、寫入之前都要獲取該鎖 /** Condition for waiting takes */ private final Condition notEmpty; //隊列非空的條件變量,用於喚醒因隊列空掉而阻塞的消費者線程 /** Condition for waiting puts */ private final Condition notFull; //隊列非滿的條件變量,用於喚醒因隊列已滿導致阻塞的生產者線程
從以上的成員可以看得出來,數據是存放在數組Object[] items,並用putIndex指示下一個將要存放的位置,用getIndex存放下一個將要取元素的位置。
例如,假設items容量為5
在存入之前,應該是這樣:
<<operation0>> 0 1 2 3 4 null null null null null putIndex=0 takeIndex=0
存了一個‘A‘之後,應該是這樣: putIndex++
<<operation1>> 0 1 2 3 4 ‘A‘ null null null null putIndex=1 getIndex=0
再存入一個‘B‘之後,應該是這樣:putIndex++
<<operation2>> 0 1 2 3 4 ‘A‘ ‘B’ null null null putIndex=2 getIndex=0
取一個元素出來,應該是這樣:對頭的元素‘A‘被取出來了,getIndex++
<<operation3>> 0 1 2 3 4 null ‘B’ null null null putIndex=2 getIndex=1
再存入2個元素:
<<operation4>> 0 1 2 3 4 null ‘B’ ‘C‘ ‘d‘ null putIndex=3 getIndex=1
此時putIndex已經到頭(4),若要再存入,則循環到0:
<<operaion5>> 0 1 2 3 4 null ‘B’ ‘C‘ ‘d‘ ‘E‘ putIndex=0 getIndex=1
此時,若再存入一個,則滿了
<<operation6>> 0 1 2 3 4 ‘F‘ ‘B’ ‘C‘ ‘d‘ ‘E‘ putIndex=1 getIndex=1
會發現,putIndex已經趕上了getIndex,沒有空間了,那麽生產者就會阻塞並等待【隊列非滿】條件變量的發生。
等到消費者再取一個元素出來,就會觸發【隊列非滿】條件變量,讓生產者線程喚醒繼續生產。
<<operation7>> 0 1 2 3 4 ‘F‘ null ‘C‘ ‘d‘ ‘E‘ putIndex=1 getIndex=2
下面貼出部分源碼,對應上述思路:
take(), put()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
offer(), add()
再貼一下其他類似接口的源碼:
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer, add與put職責類型,區別在於:
offer若因隊列滿了直接返回false,比較溫和;而add因隊列滿了會拋出異常,比較強制;而put若隊列滿了,會阻塞等待知道隊列有位置了再插入元素。
poll()
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
poll()與take()類似,區別在於:
poll時若隊列為空,那麽直接返回null;而take時,若隊列為空,會阻塞直到隊列不為空了,再返回隊列中的數據;
Java生產消費模型—ArrayBlockingQueue詳解