1. 程式人生 > >生產者消費者模式ArrayBlockingQueue

生產者消費者模式ArrayBlockingQueue

tar per 函數 nbsp cte for stack int 方法

package concurrent._interrupt;


import java.math.BigInteger;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Demo3 {


    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<BigInteger> bq = new
ArrayBlockingQueue<BigInteger>(8); Producter producter = new Producter(bq); Customer customer = new Customer(bq); //先在裏面放入兩個 bq.put(new BigInteger("10")); bq.put(new BigInteger("20")); producter.start(); customer.start(); } } class Producter extends
Thread{ private BlockingQueue<BigInteger> bq; public Producter(BlockingQueue<BigInteger> bq){ super("Producter"); this.bq = bq; } @Override public void run() { try { send(); } catch (InterruptedException e) { System.out.println(
"send函數的sleep被中斷"); e.printStackTrace(); } } private void send() throws InterruptedException { BigInteger bigInteger = BigInteger.ONE; //關於bq的add和put函數 //add(e)//隊列未滿時,返回true;隊列滿則拋出IllegalStateException(“Queue full”)異常——AbstractQueue //put(e)//隊列未滿時,直接插入沒有返回值;隊列滿時會阻塞等待,一直等到隊列未滿時再插入。 for(;;){ Thread.sleep(2000); bq.put(bigInteger=bigInteger.add(bigInteger)); } } } class Customer extends Thread{ private BlockingQueue<BigInteger> bq; public Customer(BlockingQueue<BigInteger> bq){ super("Customer"); this.bq = bq; } @Override public void run() { try { for(;;){ get(); } } catch (InterruptedException e) { System.out.println("get函數的take()被中斷"); e.printStackTrace(); } } private void get() throws InterruptedException { Thread.sleep(1000); BigInteger bigInteger = bq.take(); System.out.println(bigInteger); } }

結果:

10
20
2
4
8
16
32
64

中斷take()方法

技術分享圖片

結果:

10
20
2
4
8
16
32
get函數的take()被中斷
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
    at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
    at concurrent._interrupt.Customer.get(Demo3.java:77)
    at concurrent._interrupt.Customer.run(Demo3.java:67)

再次修改中斷方法:

將get空轉,等待標誌位的改變

class Customer extends Thread{
    private BlockingQueue<BigInteger> bq;
    public Customer(BlockingQueue<BigInteger> bq){
        super("Customer");
        this.bq = bq;
    }
    @Override
    public void run() {
        try {
            for(;!isInterrupted();){
                get();
            }

        } catch (InterruptedException e) {
            System.out.println("get函數的take()被中斷");
            e.printStackTrace();
        } finally {
            System.out.println("customer的run方法結束");

        }
    }
    private void get() throws InterruptedException {

        //BigInteger bigInteger = bq.take();
        //System.out.println(bigInteger);

    }
}

結果顯示:

customer的run方法結束

再次修改代碼:

class Customer extends Thread{
    boolean t = true;
    private BlockingQueue<BigInteger> bq;
    public Customer(BlockingQueue<BigInteger> bq){
        super("Customer");
        this.bq = bq;
    }
    @Override
    public void run() {
        try {
            //如果沒有被中斷就死循環
            for(;!isInterrupted();){
                get();
                //如果被中斷了就重置
                if(isInterrupted()){
                    //將打印的東西改變一下
                    t=false;
                    //重置標誌位
                    interrupted();
                }
            }

        } catch (InterruptedException e) {
            System.out.println("get函數的take()被中斷");
            e.printStackTrace();
        } finally {
            System.out.println("customer的run方法結束");

        }
    }
    private void get() throws InterruptedException {

        //BigInteger bigInteger = bq.take();
        //System.out.println(bigInteger);
        //剛開始的時候打印true
        //被中斷,然後再重置標誌後,打印false
        System.out.println(t);
    }
}

結果:

前三秒:顯示true

完了主函數請求中斷消費者,消費者在死循環裏面檢測到中斷的請求之後,將請求通過interrupted()函數,重置一下。

這時候又可以執行死循環了。

三秒結束後打印了false

生產者消費者模式ArrayBlockingQueue