線程的基本協作和生產者消費者
阿新 • • 發佈:2018-08-05
producer rand override div runnable type return sum col
協作基礎(wait/notify)
Java的根父類是Object,Java在Object類而非Thread類中,定義了一些線程協作的基本方法,使得每個對象都可以調用這些方法,這些方法有兩類,一類是wait,另一類是notify。
wait方法主要有兩個:
public final void wait() throws InterruptedException public final native void wait(long timeout) throws InterruptedException;
一個帶時間參數,單位是毫秒,表示最多等待這麽長時間,參數為0表示無限期等待。一個不帶時間參數,表示無限期等待,實際就是調用wait(0)。在等待期間都可以被中斷,如果被中斷,會拋出InterruptedException。
wait實際上做了什麽呢?每個對象都有一把鎖和一個鎖等待隊列,一個線程在進入synchronized代碼塊時,會嘗試獲取鎖,獲取不到的話會把當前線程加入等待隊列中。其實,除了用於鎖的等待隊列,每個對象還有另一個等待隊列,表示條件隊列,該隊列用於線程間的協作。調用wait就會把當前線程放到條件隊列上並阻塞,表示當前線程執行不下去了,它需要等待一個條件,這個條件它自己改變不了,需要其他線程改變。當其他線程改變了條件後,應該調用Object的notify方法:
public final native void notify(); public final native void notifyAll();
notify做的事情就是從條件隊列中選一個線程,將其從隊列中移除並喚醒,notifyAll和notify的區別是,它會移除條件隊列中所有的線程並全部喚醒。
wait/notify方法只能在synchronized代碼塊內被調用,如果調用wait/notify方法時,當前線程沒有持有對象鎖,會拋出異常java.lang.IllegalMonitorStateException。
wait的具體過程是:
- 把當前線程放入條件等待隊列,釋放對象鎖,阻塞等待,線程狀態變為WAITING或TIMED_WAITING
- 等待時間到或被其他線程調用notify/notifyAll從條件隊列中移除,這時,要重新競爭對象鎖
-
- 如果能夠獲得鎖,線程狀態變為RUNNABLE,並從wait調用中返回
- 否則,該線程加入對象鎖等待隊列,線程狀態變為BLOCKED,只有在獲得鎖後才會從wait調用中返回
線程從wait調用中返回後,不代表其等待的條件就一定成立了,它需要重新檢查其等待的條件,一般的調用模式是:
synchronized (obj) { while (條件不成立) obj.wait(); ... // 條件滿足後的操作 }
生產者/消費者模式
下面來看一個生產者和消費者的例子:
/** * @author 沈默哥 * */ public class MyProducerConsumerDemo { static class GoodsQueue { private int size; private Queue<String> que = new ArrayDeque<String>(); public GoodsQueue(int size) {// 維護一個有界隊列,傳入隊列的最大容量 super(); this.size = size; } public synchronized void put(String e) throws InterruptedException { while (que.size() == size) { System.out.println("隊列已滿,生產者等待"); wait(); } que.add(e); System.out.println("生產者生產:" + e); notify(); } public synchronized String take() throws InterruptedException { while (que.size() == 0) { System.out.println("隊列為空,消費者等待"); wait(); } String e = que.poll(); System.out.println("消費者消費" + e); notify(); return e; } } static class Producer extends Thread { GoodsQueue que; Random rad = new Random(); public Producer(GoodsQueue que) { super(); this.que = que; } @Override public void run() { int i = 0; try { while (true) { String e = String.valueOf(i); que.put(e); i++; Thread.sleep(rad.nextInt(1000));// 生產者休息準備下一次生產 } } catch (InterruptedException e1) { } } } static class Consumer extends Thread { GoodsQueue que; Random rad = new Random(); public Consumer(GoodsQueue que) { super(); this.que = que; } @Override public void run() { try { while (true) { que.take(); Thread.sleep(rad.nextInt(1000));// 消費者休息準備下一次消費 } } catch (InterruptedException e) { } } } public static void main(String[] args) throws InterruptedException { GoodsQueue que = new GoodsQueue(1); Producer pro = new Producer(que); Consumer con = new Consumer(que); con.start(); Thread.sleep(500); pro.start(); } }
線程的基本協作和生產者消費者