生產者-消費者模式實現(ReentarntLock)
阿新 • • 發佈:2022-04-06
1. 介紹
在JAVA多執行緒程式設計中,生產者-消費者模式有多種實現方式,例如:
-
synchronized
+wait
/notify
- 阻塞佇列
BolckingQueue
-
ReentrantLock
+兩個Condition
條件
在這篇隨筆中主要考慮利用ReentarntLock
和Condition
實現, 重點是利用中斷實現了生產者和消費者執行緒的執行終止
2. 程式碼實現
類檔案簡介:
-
Container
提供了向共享佇列入隊和出隊的方法 -
Consumer
封裝了消費者任務 -
Producer
封裝生產者任務 -
MainTest
執行測試
Container
類實現:
public class Container<T> { private final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; private final Queue<T> sharedQueue; private final int capacity; public Container( Queue<T> sharedQueue, int capacity) { if(sharedQueue == null){ throw new NullPointerException("sharedQueue is null"); } if (capacity < 1){ throw new IllegalArgumentException("capacity less one"); } this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); this.sharedQueue = sharedQueue; this.capacity = capacity; } /** * 生產者生產操作 * @param element 生產的元素 * @throws InterruptedException 如果當前執行緒被中斷 */ public void put(T element) throws InterruptedException { try { lock.lockInterruptibly(); while (sharedQueue.size() == capacity){ notFull.await(); } sharedQueue.offer(element); System.out.println("生產者執行緒-" + Thread.currentThread().getId()+" 生產: "+element+" 佇列元素數量: " + sharedQueue.size()); notEmpty.signal(); }finally { try{ lock.unlock(); }catch (IllegalMonitorStateException e){} } } /** * 消費者操作 * @return 返回獲得的元素 * @throws InterruptedException 如果當前執行緒被中斷 */ public T take() throws InterruptedException { try { lock.lockInterruptibly(); while (sharedQueue.isEmpty()){ notEmpty.await(); } T element = sharedQueue.poll(); System.out.println("消費者執行緒-" + Thread.currentThread().getId()+" 消費 "+ element +" 佇列元素數量: " + sharedQueue.size()); notFull.signal(); return element; }finally{ try{ lock.unlock(); } catch (IllegalMonitorStateException e){} } } }
對於Container
類有一下幾點說明:
- 加鎖時使用
lock.lockInterruptibly()
而不是lock.lock()
,使得在競爭鎖期間可以響應中斷。 -
take()
和put()
方法向外丟擲InterruptedException
是為了在外部呼叫時跳出死迴圈,從而實現執行緒的退出。InterruptedException
異常是由方法內部lockInterruptibly()
和await()
方法丟擲的。 - 在
finally
語句塊中對lock.unlock()
捕獲了IllegalMonitorStateException
異常。這是因為當執行緒在呼叫lockInterruptibly
後競爭鎖失敗,進入自選阻塞狀態,如果在此時被中斷,則在函式返回前執行lock.unlock()
IllegalMonitorStateException
異常,所以進行了捕獲。另外,當執行緒呼叫await()
方法進入阻塞狀態時,被中斷返回時,是保證一定該執行緒獲得鎖的(),所以不會此種情況導致lock.unlock()
丟擲IllegalMonitorStateException
異常。 -
await()
方法返回執行緒一定獲得該條件所屬的鎖(OpenJDK8 Condition#await() 部分註釋)In all cases, before this method can return the current thread must re-acquire the lock associated with this condition. When the thread returns it is guaranteed to hold this lock.
Consumer
和Producer
類實現
兩個類實現比較類似就一起介紹
-
Consumer
類:
public class Consumer implements Runnable{
private final Container<Integer> container;
public Consumer(Container<Integer> container) {
this.container = container;
}
@Override
public void run() {
try{
while (true){
// 模擬消費耗時
// Thread.sleep(new Random().nextInt(100)+50);
Integer element = container.take();
// doSomething
}
}// InterruptedException 異常丟擲有兩處, Thread.sleep()方法、container.take()方法
catch (InterruptedException e){
System.out.println("消費者執行緒-" + Thread.currentThread().getId()+" 執行完畢");
}
}
}
Producer
類:
public class Producer implements Runnable{
private final Container<Integer> container;
public Producer(Container<Integer> container) {
this.container = container;
}
@Override
public void run() {
try{
while (true){
Random random = new Random();
//sleep模擬生產產品的耗時
// Thread.sleep(new Random().nextInt(100)+50);
int element = random.nextInt(1000);
container.put(element);
}
}// InterruptedException異常丟擲有兩處: Thread.sleep()方法、container.put()方法
catch (InterruptedException e){
System.out.println("生產者執行緒-" + Thread.currentThread().getId()+" 執行完畢");
}
}
}
測試類MainTest
實現
MainTest
程式碼:
public class MainTest {
public static void main(String[] args){
Container<Integer> container = new Container<>(new ArrayDeque<>(), 5);
//為了使所有提交任務都能及時執行,使用newCachedThreadPool()方法建立執行緒池
ExecutorService service = Executors.newCachedThreadPool();
int consumerCount = 5; //消費者執行緒數
for (int i = 0; i < consumerCount; i++) {
service.execute(new Consumer(container));
}
int producerCount = 4; //生產者執行緒數
for (int i = 0; i < producerCount; i++) {
service.execute(new Producer(container));
}
//阻塞主執行緒 一段時間
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//關閉執行緒池
service.shutdownNow();
boolean isTerminated = false;
while (!isTerminated){
try {
isTerminated = service.awaitTermination(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("主執行緒退出");
}
}
MainTest
中需要注意的是關閉執行緒池呼叫的函式是ExecutorService.shutdownNow()
而不是ExecutorService.shutdown()
。呼叫shutdownNow()
後執行緒池進入STOP
狀態,此時會中斷所以執行緒(包括正在執行的執行緒),清空執行緒池的工作佇列;而呼叫shutdown()
後執行緒池進入SHUTDOWN
狀態,在此狀態下只會中斷空閒執行緒,不中斷正在執行的執行緒,所以如果呼叫shutdown()
在該程式會一直執行,不會停下來。
3. 執行結果
- 其中一次執行結果(部分)為:
生產者執行緒-19 生產: 686 佇列元素數量: 2
生產者執行緒-19 生產: 865 佇列元素數量: 3
生產者執行緒-19 生產: 346 佇列元素數量: 4
生產者執行緒-19 生產: 191 佇列元素數量: 5
消費者執行緒-13 消費 899 佇列元素數量: 4
消費者執行緒-13 消費 686 佇列元素數量: 3
消費者執行緒-13 消費 865 佇列元素數量: 2
消費者執行緒-16 消費 346 佇列元素數量: 1
消費者執行緒-16 消費 191 佇列元素數量: 0
生產者執行緒-20 生產: 56 佇列元素數量: 1
生產者執行緒-20 生產: 203 佇列元素數量: 2
生產者執行緒-20 生產: 902 佇列元素數量: 3
.......
.......
.......
生產者執行緒-17 生產: 180 佇列元素數量: 1
生產者執行緒-17 生產: 550 佇列元素數量: 2
生產者執行緒-17 生產: 925 佇列元素數量: 3
消費者執行緒-13 消費 180 佇列元素數量: 2
消費者執行緒-14 消費 550 佇列元素數量: 1
生產者執行緒-17 執行完畢
生產者執行緒-18 生產: 722 佇列元素數量: 2
生產者執行緒-20 生產: 998 佇列元素數量: 3
消費者執行緒-12 消費 925 佇列元素數量: 2
消費者執行緒-14 執行完畢
生產者執行緒-18 執行完畢
消費者執行緒-13 執行完畢
生產者執行緒-20 執行完畢
消費者執行緒-15 消費 722 佇列元素數量: 1
消費者執行緒-12 執行完畢
生產者執行緒-19 生產: 971 佇列元素數量: 2
消費者執行緒-16 消費 998 佇列元素數量: 1
消費者執行緒-15 執行完畢
生產者執行緒-19 執行完畢
消費者執行緒-16 執行完畢
主執行緒退出
4. 補充測試
在MainTest
測試中使用了執行緒池管理執行緒,並中斷執行緒。如果不使用執行緒池而直接使用new Thread()
的方法生產執行緒,則可以按照如下程式碼MainTest2
測試:
public class MainTest2 {
public static void main(String[] args) {
Container<Integer> container = new Container<>(new ArrayDeque<>(), 5);
//定義消費者執行緒
Runnable consumer = new Consumer(container);
int consumerCount = 5;
List<Thread> consumerThreads = new LinkedList<>();
for (int i = 0; i < consumerCount; i++) {
Thread thread = new Thread(consumer);
consumerThreads.add(thread);
thread.start();
}
//定義生產者執行緒
Runnable producer= new Producer(container);
int producerCount = 4;
List<Thread> producerThreads = new LinkedList<>();
for (int i = 0; i < producerCount; i++) {
Thread thread = new Thread(producer);
producerThreads.add(thread);
thread.start();
}
//阻塞主執行緒 一段時間
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//通過主執行緒依次中斷生產者和消費者執行緒
for (Thread consumerThread : consumerThreads) {
consumerThread.interrupt();
}
for (Thread producerThread : producerThreads) {
producerThread.interrupt();
}
//等待執行緒全部執行結束後主執行緒才退出
boolean isRunning = true;
while (isRunning){
isRunning = false;
for (Thread consumerThread : consumerThreads) {
isRunning = isRunning || consumerThread.isAlive();
}
for (Thread producerThread : producerThreads) {
isRunning = isRunning || producerThread.isAlive();
}
}
System.out.println("主執行緒退出");
}
}
5. 參考:
本文主要目的是記錄學習過程,加深對知識點理解; 如有行文有誤,望指正。