1. 程式人生 > >執行緒的基本協作機制

執行緒的基本協作機制

上一篇文章主要講述了執行緒的基本概念以及執行緒中常用的方法

接下來講述執行緒中的基本協作機制:

     1.生產消費模型;

     2.同時開始;

     3.等待結束;

     4.集合點。

首先介紹Object類中的wait/notify方法:

public  final  void  wait()  throws  InterruptedException;

public  final  void  wait(long timeout)  throws  InterruptedException;

上述的兩個方法中一個帶引數,一個不帶引數。

帶時間引數,單位是毫秒,表示最多等待這麼長的時間,引數為0表示無限期等待;

不帶時間引數,表示無限期等待;

除了用於鎖的等待佇列,每個執行緒還有另一個等待佇列,表示條件佇列;當呼叫wait方法就會把當前執行緒放入該佇列中,他需要等待一個條件,這個太條件他自己改變不了,需要其他執行緒改變嗎。

當執行緒在等待的過程中被中斷,會丟擲異常(參考上一篇文章);

public  final  native  void  notify();

public  final  native  void  notifyAll();

當執行緒在呼叫wait方法之後,需要等待一個條件,當其他條件改變後,需要呼叫notify方法;

notify就是從條件佇列中選中一個執行緒將其從佇列移除並喚醒;

notifyAll會移除所有的執行緒並喚醒它們。

看下面的例子:

public class Main extends Thread  {

    private  volatile boolean flag = false;
    @Override
    public void run() {
        try {
            synchronized (this) {
                while (!flag) {
                    wait();
                }
            }
            System.out.println("flag");
        }catch (InterruptedException e){

        }
    }
    public  synchronized void setFlag(){
        this.flag = true;
        notify();
    }
    public static void main(String[] args) throws InterruptedException {
        Main main = new Main();
        main.start();
        Thread.sleep(1000);
        System.out.println("flag1");
        main.setFlag();
    }
}

上面程式碼中有兩個執行緒,主執行緒和創造出來的執行緒;被創造的執行緒等待變數變為true,在不為true的時候,一直在wait(),主執行緒把該變數變為true,並呼叫notify方法。

wait/notify方法只能在synchronized程式碼塊內被呼叫;

雖然是在synchronized方法內,但呼叫wait方法的時候,執行緒會釋放物件鎖;

wait/notify方法被不同的執行緒呼叫,但共享相同的鎖和條件等待佇列,圍繞一個共享的條件變數進行協作。

1.生產者/消費者模型

生產者往一個容器內放資料,如果滿了就wait(),消費者從容器內取資料,如果空了就wait()。

 class MyBlockingQueue<E>{
    private Queue<E> queue = null;
    private int limit;
    public MyBlockingQueue(int limit){
        this.limit=limit;
        queue=new ArrayDeque<>(limit);
    }
    public synchronized void put(E e) throws InterruptedException {
        while (queue.size()==limit){
            wait();
        }
        queue.add(e);
        notifyAll();
    }
    public synchronized E take() throws InterruptedException {
        while (queue.isEmpty()){
            wait();
        }
        E e = queue.poll();
        notifyAll();
        return e;
    }
}

上面程式碼相當於一個倉庫,limit相當於倉庫的容量,倉庫滿了,生產者就wait,倉庫空了,消費者就wait。

當倉庫不為滿的時候,生產者向倉庫中放資料;

class Producer implements Runnable{
     MyBlockingQueue<String> queue;

    public Producer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        int num = 0;
        try{
            while (true){
                String task = String.valueOf(num);
                queue.put(task);
                System.out.println("produce task"+task);
                num++;
                Thread.sleep((int)(Math.random()*100));
            }
        }catch (InterruptedException e){
        }
    }
}

上面為一個簡單的生產者模型,迴圈向倉庫裡生產資料;

當倉庫為空的時候,消費者不再向倉庫取資料:

class Consumer implements Runnable{
    MyBlockingQueue<String> queue;

    public Consumer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try{
            while (true){
                String task = queue.take();
                System.out.println("hadle task"+task);
                Thread.sleep((int)(Math.random()*100));
            }
        }catch (InterruptedException e){

        }
    }
}

上面為一個簡單的消費者模型,迴圈向倉庫取資料。

 public static void main(String[] args)  {
        MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }

啟動該模型後,會發現生產者和消費者交替出現。

2.同時開始

相當於比賽場上,運動員聽到裁判的槍聲然後一起出發的過程;

class FireFlag{
    private volatile boolean fired = false;
    public synchronized void waitForFire() throws InterruptedException {
        while (!fired){
            wait();
        }
    }
    public synchronized void setFired(){
        this.fired=true;
        notifyAll();
    }
}

上述程式碼相當於一把訊號槍的作用,每個執行緒在fired為false的時候,都必須等待,當槍響的時候所有執行緒就同時執行;

class Racer implements Runnable{
    FireFlag fireFlag;

    public Racer(FireFlag fireFlag) {
        this.fireFlag = fireFlag;
    }
    @Override
    public void run() {
        try {
            this.fireFlag.waitForFire();
            System.out.println("start run"+Thread.currentThread().getName());
        } catch (InterruptedException e) {

        }
    }
}

上面的程式碼是運動員的角色,每一個執行緒都要以“槍”來變換自身的狀態;


    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        FireFlag fireFlag = new FireFlag();
        Thread[]threads = new Thread[num];
        for(int i = 0;i<num;i++){
            threads[i] = new Thread(new Racer(fireFlag));
            threads[i].start();
        }
        Thread.sleep(1000);
        fireFlag.setFired();
    }

主執行緒相當於裁判,由它來確定槍響不響。

上面建立了十個執行緒,在槍未響的時候,都在賽道上等待,當主執行緒呼叫了setFired()後,也就是槍響的時候,所有執行緒被喚醒,執行接下來的操作。

3.等待結束

class MyLatch{
    private int count;

    public MyLatch(int count) {
        this.count = count;
    }
    public synchronized void await() throws InterruptedException {
        while(count>0){
            wait();
        }
    }
    public synchronized void countDown(){
        count--;
        if(count<=0){
            notifyAll();
        }
    }
}
class Worker implements Runnable{
    MyLatch latch;

    public Worker(MyLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            Thread.sleep((int)(Math.random()*1000));
            this.latch.countDown();
        } catch (InterruptedException e) {

        }
    }
}
public class Main  {

    public static void main(String[] args) throws InterruptedException {
      int workerNum = 100;
      MyLatch latch = new MyLatch(workerNum);
      Thread[] threads = new Thread[workerNum];
      for(int i =0;i<workerNum;i++){
          threads[i]=new Thread(new Worker(latch));
          threads[i].start();
      }
      latch.await();
        System.out.println("collect  worker  results");
    }
}

上面就是建立了100個執行緒,每個執行緒都會sleep一會兒,當sleep結束之後,會進行count--,然後主執行緒會呼叫await方法,然後當count為0的時候,主執行緒就算可以知道所有的執行緒結束了,就可以接著往下繼續執行。

是不是感覺跟join方法差不多,不過這個是不需要呼叫多次join方法來進行等待。

4.集合點

就相當於組隊旅遊的例子,大家先自己忙自己的事,之後再統一匯合。

class AssemblePoint{
    private int n;

    public AssemblePoint(int n) {
        this.n = n;
    }
    public synchronized void await() throws InterruptedException {
        if(n>0){
            n--;
            if(n==0){
                notifyAll();
            }else {
                while (n!=0){
                    wait();
                }
            }
        }
    }
}
public class Main  {
    static class Tourist implements Runnable{
        AssemblePoint ap;

        public Tourist(AssemblePoint ap) {
            this.ap = ap;
        }

        @Override
        public void run() {
            try {
                Thread.sleep((int)(Math.random()*1000));
                ap.await();
                System.out.println("arrivaed");
            } catch (InterruptedException e) {

            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num =10;
        Thread [] to = new Thread[num];
        AssemblePoint ap = new AssemblePoint(num);
        for(int i = 0;i<num;i++){
            to[i]=new Thread(new Tourist(ap));
            to[i].start();
        }

    }
}

上述程式碼,建立了是個執行緒,然後每個執行緒都會執行自己的任務,執行完之後同意呼叫await方法到達集合點,進行等待,當所有執行緒都執行完了自己的任務,就會被喚醒。這個機制跟上述的等待結束機制有些相似,但卻又不一樣,上述等待機制是主執行緒等待由它建立的所有執行緒都結束為目的;集合點機制,就像是一個小分隊裡面自覺等待隊友集合完畢,很相似,但是又很不一樣。

以上就是我對執行緒間基本的協作機制的一點了解,其實執行緒還可以組合起來做很多事情,就如同一個遊戲還有無限種方法等著我們去開發出來。

好了,不說了,敲程式碼了!!!