生產者消費者模式ArrayBlockingQueue
阿新 • • 發佈:2018-11-22
tar per 函數 nbsp cte for stack int 方法
package concurrent._interrupt; import java.math.BigInteger; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Demo3 { public static void main(String[] args) throws InterruptedException { BlockingQueue<BigInteger> bq = newArrayBlockingQueue<BigInteger>(8); Producter producter = new Producter(bq); Customer customer = new Customer(bq); //先在裏面放入兩個 bq.put(new BigInteger("10")); bq.put(new BigInteger("20")); producter.start(); customer.start(); } } class Producter extendsThread{ private BlockingQueue<BigInteger> bq; public Producter(BlockingQueue<BigInteger> bq){ super("Producter"); this.bq = bq; } @Override public void run() { try { send(); } catch (InterruptedException e) { System.out.println("send函數的sleep被中斷"); e.printStackTrace(); } } private void send() throws InterruptedException { BigInteger bigInteger = BigInteger.ONE; //關於bq的add和put函數 //add(e)//隊列未滿時,返回true;隊列滿則拋出IllegalStateException(“Queue full”)異常——AbstractQueue //put(e)//隊列未滿時,直接插入沒有返回值;隊列滿時會阻塞等待,一直等到隊列未滿時再插入。 for(;;){ Thread.sleep(2000); bq.put(bigInteger=bigInteger.add(bigInteger)); } } } class Customer extends Thread{ private BlockingQueue<BigInteger> bq; public Customer(BlockingQueue<BigInteger> bq){ super("Customer"); this.bq = bq; } @Override public void run() { try { for(;;){ get(); } } catch (InterruptedException e) { System.out.println("get函數的take()被中斷"); e.printStackTrace(); } } private void get() throws InterruptedException { Thread.sleep(1000); BigInteger bigInteger = bq.take(); System.out.println(bigInteger); } }
結果:
10 20 2 4 8 16 32 64
中斷take()方法
結果:
10 20 2 4 8 16 32 get函數的take()被中斷 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403) at concurrent._interrupt.Customer.get(Demo3.java:77) at concurrent._interrupt.Customer.run(Demo3.java:67)
再次修改中斷方法:
將get空轉,等待標誌位的改變
class Customer extends Thread{ private BlockingQueue<BigInteger> bq; public Customer(BlockingQueue<BigInteger> bq){ super("Customer"); this.bq = bq; } @Override public void run() { try { for(;!isInterrupted();){ get(); } } catch (InterruptedException e) { System.out.println("get函數的take()被中斷"); e.printStackTrace(); } finally { System.out.println("customer的run方法結束"); } } private void get() throws InterruptedException { //BigInteger bigInteger = bq.take(); //System.out.println(bigInteger); } }
結果顯示:
customer的run方法結束
再次修改代碼:
class Customer extends Thread{ boolean t = true; private BlockingQueue<BigInteger> bq; public Customer(BlockingQueue<BigInteger> bq){ super("Customer"); this.bq = bq; } @Override public void run() { try { //如果沒有被中斷就死循環 for(;!isInterrupted();){ get(); //如果被中斷了就重置 if(isInterrupted()){ //將打印的東西改變一下 t=false; //重置標誌位 interrupted(); } } } catch (InterruptedException e) { System.out.println("get函數的take()被中斷"); e.printStackTrace(); } finally { System.out.println("customer的run方法結束"); } } private void get() throws InterruptedException { //BigInteger bigInteger = bq.take(); //System.out.println(bigInteger); //剛開始的時候打印true //被中斷,然後再重置標誌後,打印false System.out.println(t); } }
結果:
前三秒:顯示true
完了主函數請求中斷消費者,消費者在死循環裏面檢測到中斷的請求之後,將請求通過interrupted()函數,重置一下。
這時候又可以執行死循環了。
三秒結束後打印了false
生產者消費者模式ArrayBlockingQueue