condition與ReentrantLock的使用
阿新 • • 發佈:2020-07-28
condition與ReentrantLock模擬生產者消費者
package com.dwz.condition; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ConditionExample { private final static ReentrantLock lock = newReentrantLock(); private final static Condition condition = lock.newCondition(); private static int data = 0; private static volatile boolean noUse = true; private static void buildData() { try { lock.lock(); //synchronized key word #monitor enterwhile (noUse) { condition.await(); //monitor.await() } data++; Optional.of("P:" + data).ifPresent(System.out::println); TimeUnit.SECONDS.sleep(1); noUse = true; condition.signal(); //monitor.notify()} catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); //synchronized end #monitor end } } private static void useData() { try { lock.lock(); while (!noUse) { condition.await(); } TimeUnit.SECONDS.sleep(1); Optional.of("C:" + data).ifPresent(System.out::println); noUse = false; condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { new Thread(() -> { while(true) { buildData(); } }).start(); new Thread(() -> { while(true) { useData(); } }).start(); } }
三個問題:
1.not use the condition only use the lock?
2.the producer get the lock but invoke await method and not jump out the lock statement block
why the consumer can get the lock still?
3.not use the lock only use condition?
問題一演示案例:
package com.dwz.condition; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 三個問題: * 1.not use the condition only use the lock? * 2.the producer get the lock but invoke await method and not jump out the lock statement block * why the consumer can get the lock still? * 3.not use the lock only use condition? */ public class ConditionExample2 { private final static ReentrantLock lock = new ReentrantLock(); private final static Condition condition = lock.newCondition(); private static int data = 0; private static volatile boolean noUse = true; private static void buildData() { try { lock.lock(); data++; Optional.of("P:" + data).ifPresent(System.out::println); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void useData() { try { lock.lock(); TimeUnit.SECONDS.sleep(1); Optional.of("C:" + data).ifPresent(System.out::println); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { new Thread(() -> { while(true) { buildData(); } }).start(); new Thread(() -> { while(true) { useData(); } }).start(); } }
結果:只是用lock不能保證生產者與消費者一一對應
問題三演示案例:
package com.dwz.condition; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ConditionExample3 { private final static ReentrantLock lock = new ReentrantLock(); private final static Condition condition = lock.newCondition(); private static int data = 0; private static volatile boolean noUse = true; private static void buildData() { try { // lock.lock(); while (noUse) { condition.await(); } data++; Optional.of("P:" + data).ifPresent(System.out::println); TimeUnit.SECONDS.sleep(1); noUse = true; condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // lock.unlock(); } } private static void useData() { try { // lock.lock(); while (!noUse) { condition.await(); } TimeUnit.SECONDS.sleep(1); Optional.of("C:" + data).ifPresent(System.out::println); noUse = false; condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // lock.unlock(); } } public static void main(String[] args) { new Thread(() -> { while(true) { buildData(); } }).start(); new Thread(() -> { while(true) { useData(); } }).start(); } }
結果:Exception in thread "Thread-0" java.lang.IllegalMonitorStateException
結果表明:lock和condition要配合使用
回顧普通生產者消費者:
package com.dwz.condition; import java.util.concurrent.TimeUnit; public class ComBetweenThread { private static int data = 0; private static volatile boolean noUse = true; private final static Object MONITOR = new Object(); public static void main(String[] args) { new Thread(() -> { for(;;) { buildData(); } }) .start(); new Thread(() -> { for(;;) { useData(); } }) .start(); } private static void buildData() { synchronized(MONITOR) { while(noUse) { try { MONITOR.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } data++; System.out.println("P=>" + data); noUse = true; MONITOR.notifyAll(); } } private static void useData() { synchronized(MONITOR) { while(!noUse) { try { MONITOR.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("C=>" + data); noUse = false; MONITOR.notifyAll(); } } }
使用兩個condition實現多個生產者消費者
package com.dwz.condition; import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; /** * 使用兩個condition實現多個生產者消費者 */ public class ConditionExample4 { private final static ReentrantLock lock = new ReentrantLock(); private final static Condition PRODUCE_COND = lock.newCondition(); private final static Condition CONSUME_COND = lock.newCondition(); private final static LinkedList<Long> TIMESTAMP_POOL = new LinkedList<>(); private final static int MAX_CAPACITY = 100;//pool的最大容量 public static void main(String[] args) throws InterruptedException { IntStream.range(0, 6).boxed().forEach(ConditionExample4::beginProduce); IntStream.range(0, 13).boxed().forEach(ConditionExample4::beginConsume); } private static void beginProduce(int i) { new Thread(() -> { for(;;) { produce(); sleep(1); } }, "P-" + i) .start(); } private static void beginConsume(int i) { new Thread(() -> { for(;;) { consume(); sleep(2); } }, "C-" + i) .start(); } private static void produce() { try { lock.lock(); while(TIMESTAMP_POOL.size() >= MAX_CAPACITY) { PRODUCE_COND.await(); } long value = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + "-P-" + value); TIMESTAMP_POOL.addLast(value); CONSUME_COND.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void consume() { try { lock.lock(); while(TIMESTAMP_POOL.isEmpty()) { CONSUME_COND.await(); } Long value = TIMESTAMP_POOL.removeFirst(); System.out.println(Thread.currentThread().getName() + "-C-" + value); PRODUCE_COND.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void sleep(long seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }