1. 程式人生 > >線程的基本協作和生產者消費者

線程的基本協作和生產者消費者

producer rand override div runnable type return sum col

協作基礎(wait/notify)

Java的根父類是Object,Java在Object類而非Thread類中,定義了一些線程協作的基本方法,使得每個對象都可以調用這些方法,這些方法有兩類,一類是wait,另一類是notify。

wait方法主要有兩個:

public final void wait() throws InterruptedException
public final native void wait(long timeout) throws InterruptedException;

一個帶時間參數,單位是毫秒,表示最多等待這麽長時間,參數為0表示無限期等待。一個不帶時間參數,表示無限期等待,實際就是調用wait(0)。在等待期間都可以被中斷,如果被中斷,會拋出InterruptedException。

wait實際上做了什麽呢?每個對象都有一把鎖和一個鎖等待隊列,一個線程在進入synchronized代碼塊時,會嘗試獲取鎖,獲取不到的話會把當前線程加入等待隊列中。其實,除了用於鎖的等待隊列,每個對象還有另一個等待隊列,表示條件隊列,該隊列用於線程間的協作。調用wait就會把當前線程放到條件隊列上並阻塞,表示當前線程執行不下去了,它需要等待一個條件,這個條件它自己改變不了,需要其他線程改變。當其他線程改變了條件後,應該調用Object的notify方法:

public final native void notify();
public final native void notifyAll();

notify做的事情就是從條件隊列中選一個線程,將其從隊列中移除並喚醒,notifyAll和notify的區別是,它會移除條件隊列中所有的線程並全部喚醒。

wait/notify方法只能在synchronized代碼塊內被調用,如果調用wait/notify方法時,當前線程沒有持有對象鎖,會拋出異常java.lang.IllegalMonitorStateException。

wait的具體過程是:

    1. 把當前線程放入條件等待隊列,釋放對象鎖,阻塞等待,線程狀態變為WAITING或TIMED_WAITING
    2. 等待時間到或被其他線程調用notify/notifyAll從條件隊列中移除,這時,要重新競爭對象鎖
      • 如果能夠獲得鎖,線程狀態變為RUNNABLE,並從wait調用中返回
      • 否則,該線程加入對象鎖等待隊列,線程狀態變為BLOCKED,只有在獲得鎖後才會從wait調用中返回

線程從wait調用中返回後,不代表其等待的條件就一定成立了,它需要重新檢查其等待的條件,一般的調用模式是:

synchronized (obj) {
    while (條件不成立)
        obj.wait();
    ... // 條件滿足後的操作
}

生產者/消費者模式

下面來看一個生產者和消費者的例子:

/**
 * @author 沈默哥
 * 
 */
public class MyProducerConsumerDemo {
  static class GoodsQueue {
    private int size;
    private Queue<String> que = new ArrayDeque<String>();

    public GoodsQueue(int size) {// 維護一個有界隊列,傳入隊列的最大容量
      super();
      this.size = size;
    }

    public synchronized void put(String e) throws InterruptedException {
      while (que.size() == size) {
        System.out.println("隊列已滿,生產者等待");
        wait();
      }
      que.add(e);
      System.out.println("生產者生產:" + e);
      notify();
    }

    public synchronized String take() throws InterruptedException {
      while (que.size() == 0) {
        System.out.println("隊列為空,消費者等待");
        wait();
      }
      String e = que.poll();
      System.out.println("消費者消費" + e);
      notify();
      return e;
    }
  }

  static class Producer extends Thread {
    GoodsQueue que;
    Random rad = new Random();

    public Producer(GoodsQueue que) {
      super();
      this.que = que;
    }

    @Override
    public void run() {
      int i = 0;
      try {
        while (true) {
          String e = String.valueOf(i);
          que.put(e);
          i++;
          Thread.sleep(rad.nextInt(1000));// 生產者休息準備下一次生產
        }
      } catch (InterruptedException e1) {
      }
    }
  }

  static class Consumer extends Thread {
    GoodsQueue que;
    Random rad = new Random();

    public Consumer(GoodsQueue que) {
      super();
      this.que = que;
    }

    @Override
    public void run() {
      try {
        while (true) {
          que.take();
          Thread.sleep(rad.nextInt(1000));// 消費者休息準備下一次消費
        }
      } catch (InterruptedException e) {
      }
    }
  }

  public static void main(String[] args) throws InterruptedException {
    GoodsQueue que = new GoodsQueue(1);
    Producer pro = new Producer(que);
    Consumer con = new Consumer(que);
    con.start();
    Thread.sleep(500);
    pro.start();
  }
}

線程的基本協作和生產者消費者