多執行緒學習-03
阿新 • • 發佈:2018-11-11
執行緒通訊
生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題(英語:Bounded-buffer problem),是一個多執行緒同步問題的經典案例。 該問題描述了兩個(多個)共享固定大小緩衝區的執行緒——即所謂的“生產者”和“消費者”——在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。 要解決該問題,就必須讓生產者在緩衝區滿時等待(wait),等到下次消費者消耗了緩衝區中的資料的時候,生產者才能被喚醒(notify),開始往緩衝區新增資料。同樣,也可以讓消費者在緩衝區空時進入等待(wait),等到生產者往緩衝區新增資料之後,再喚醒消費者(notify)。通常採用執行緒間通訊的方法解決該問題。 依據執行緒間的通訊方式解決這類的生產者和消費者的問題的模式,叫做生產者與消費者設計模式。 例如:
示例程式碼:如果一個生產者、一個消費者
package com.thread.communication; public class TestCommunication { public static void main(String[] args) { HouseWare h = new HouseWare(10); Worker w = new Worker(h); Customer c = new Customer(h); new Thread(w).start(); new Thread(c).start(); } } class HouseWare { private int num; public HouseWare(int total) { this.num = total; } public void put() { num++; System.out.println("工人生產了一臺電視機,現在庫存為:" + num); } public void take() { num--; System.out.println("消費者買走了一臺電視機,現在庫存為:" + num); } } class Worker implements Runnable { private HouseWare h; public Worker(HouseWare h) { super(); this.h = h; } @Override public void run() { for (int i = 1; i <= 50; i++) { h.put(); } } } class Customer implements Runnable { private HouseWare h; public Customer(HouseWare h) { super(); this.h = h; } @Override public void run() { for (int i = 1; i <= 50; i++) { h.take(); } } }
問題:
執行緒安全問題
透支消費問題
倉庫容量有限問題
解決辦法(一):synchronized+wait+notify
執行緒安全問題:同步 透支消費問題:執行緒通訊 倉庫容量有限問題:執行緒通訊 wait() 與 notify() 和 notifyAll(),Java.lang.Object提供的這三個方法。 1.public final void wait():該執行緒釋出對此監視器的所有權(即釋放鎖)並等待,直到其他執行緒通過呼叫 notify 方法,或 notifyAll 方法通知在此物件的監視器上等待的執行緒醒來。然後該執行緒將等到重新獲得對監視器的所有權後才能繼續執行。 2.public final void notify():喚醒在此物件監視器(鎖)上等待的單個執行緒。如果所有執行緒都在此物件上等待,則會選擇喚醒其中一個執行緒。選擇是任意性的,並在對實現做出決定時發生。 3.public final void notifyAll():喚醒在此物件監視器(鎖)上等待(wait)的所有執行緒。 特別注意: 這三個方法只有在synchronized方法或synchronized程式碼塊中才能使用,否則會報java.lang.IllegalMonitorStateException異常。 因為這三個方法必須有鎖物件呼叫,而任意物件都可以作為synchronized的同步鎖,因此這三個方法只能在Object類中宣告。
package com.thread.communication;
public class TestCommunication {
public static void main(String[] args) {
HouseWare h = new HouseWare();
Worker w = new Worker(h);
Customer c = new Customer(h);
new Thread(w).start();
new Thread(c).start();
}
}
class HouseWare {
private final int MAX = 10;
private int num;
public synchronized void put() {
if(num>=MAX){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.println("工人生產了一臺電視機,現在庫存為:" + num);
this.notify();
}
public synchronized void take() {
if(num<=0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
System.out.println("消費者買走了一臺電視機,現在庫存為:" + num);
this.notify();
}
}
class Worker implements Runnable {
private HouseWare h;
public Worker(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
h.put();
}
}
}
class Customer implements Runnable {
private HouseWare h;
public Customer(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
h.take();
}
}
}
解決辦法(二):Lock+Condition
執行緒安全問題:同步
透支消費問題:執行緒通訊
倉庫容量有限問題:執行緒通訊
而現在用Lock時,用的鎖是Lock物件。而Lock介面中並沒有直接操作等待喚醒的方法,而是將這些方式又單獨封裝到了一個物件中。這個物件就是Condition,將Object中的三個方法進行單獨的封裝。並提供了功能一致的方法 await()、signal()、signalAll()體現新版本物件的好處。
Condition 例項實質上被繫結到一個鎖上。要為特定 Lock 例項獲得 Condition 例項,請使用其 newCondition() 方法。
package com.thread.communication;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestCommunication3 {
public static void main(String[] args) {
HouseWare h = new HouseWare();
Worker w = new Worker(h);
Customer c = new Customer(h);
new Thread(w).start();
new Thread(c).start();
}
}
class HouseWare {
private final int MAX = 10;
private int num;
private final ReentrantLock lock = new ReentrantLock();
private final Condition full = lock.newCondition(); //此處用一個Condition也可以,但為了語義更清晰,可以使用兩個Condition物件
private final Condition empty = lock.newCondition();
public void put() {
lock.lock();
if(num>=MAX){
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.println("工人生產了一臺電視機,現在庫存為:" + num);
empty.signal();
lock.unlock();
}
public void take() {
lock.lock();
if(num<=0){
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
System.out.println("消費者買走了一臺電視機,現在庫存為:" + num);
full.signal();
lock.unlock();
}
}
class Worker implements Runnable {
private HouseWare h;
public Worker(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true) {
h.put();
}
}
}
class Customer implements Runnable {
private HouseWare h;
public Customer(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true){
h.take();
}
}
}
多個生產者與多個消費者
如果多個生產者,多個消費者,仍然會有
透支消費問題
倉庫容量有限問題
package com.thread.communication;
public class TestCommunication2 {
public static void main(String[] args) {
HouseWare h = new HouseWare();
Worker w1 = new Worker(h);
Worker w2 = new Worker(h);
Customer c1 = new Customer(h);
Customer c2 = new Customer(h);
new Thread(w1,"A").start();
new Thread(w2,"B").start();
new Thread(c1,"C").start();
new Thread(c2,"D").start();
}
}
class HouseWare {
private final int MAX = 10;
private int num;
public synchronized void put() {
if(num>=MAX){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.println("工人"+Thread.currentThread().getName()+"生產了一臺電視機,現在庫存為:" + num);
this.notify();
}
public synchronized void take() {
if(num<=0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
System.out.println("消費者"+Thread.currentThread().getName()+"買走了一臺電視機,現在庫存為:" + num);
this.notify();
}
}
class Worker implements Runnable {
private HouseWare h;
public Worker(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true) {
h.put();
}
}
}
class Customer implements Runnable {
private HouseWare h;
public Customer(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true) {
h.take();
}
}
}
解決辦法(一):synchronized+wait+notifyAll
1、notify()改為notifyAll()
2、被喚醒後,再次判斷當前條件是否滿足,再執行業務程式碼
package com.thread.communication;
public class TestCommunication2 {
public static void main(String[] args) {
HouseWare h = new HouseWare();
Worker w1 = new Worker(h);
Worker w2 = new Worker(h);
Customer c1 = new Customer(h);
Customer c2 = new Customer(h);
new Thread(w1,"A").start();
new Thread(w2,"B").start();
new Thread(c1,"C").start();
new Thread(c2,"D").start();
}
}
class HouseWare {
private final int MAX = 10;
private int num;
public synchronized void put() {
while(num>=MAX){//醒來後繼續判斷條件,不滿足繼續睡
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
//被喚醒後,結束該方法,重寫呼叫該方法
}
num++;
System.out.println("工人"+Thread.currentThread().getName()+"生產了一臺電視機,現在庫存為:" + num);
this.notifyAll();
}
public synchronized void take() {
while(num<=0){//醒來後繼續判斷條件,不滿足繼續睡
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
System.out.println("消費者"+Thread.currentThread().getName()+"買走了一臺電視機,現在庫存為:" + num);
this.notifyAll();
}
}
class Worker implements Runnable {
private HouseWare h;
public Worker(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true) {
h.put();
}
}
}
class Customer implements Runnable {
private HouseWare h;
public Customer(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true) {
h.take();
}
}
}
解決辦法(二):Lock+Condition(可以指定專門的執行緒甦醒或者睡眠)可以是執行緒有順序的執行
package com.thread.communication;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestCommunication4 {
public static void main(String[] args) {
HouseWare h = new HouseWare();
Worker w1 = new Worker(h);
Worker w2 = new Worker(h);
Customer c1 = new Customer(h);
Customer c2 = new Customer(h);
new Thread(w1,"A").start();
new Thread(w2,"B").start();
new Thread(c1,"C").start();
new Thread(c2,"D").start();
}
}
class HouseWare {
private final int MAX = 10;
private int num;
private final ReentrantLock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
private final Condition empty = lock.newCondition();
public void put() {
lock.lock();
while(num>=MAX){
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.println("工人"+Thread.currentThread().getName()+"生產了一臺電視機,現在庫存為:" + num);
empty.signalAll();
lock.unlock();
}
public void take() {
lock.lock();
while(num<=0){
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
System.out.println("消費者"+Thread.currentThread().getName()+"買走了一臺電視機,現在庫存為:" + num);
full.signalAll();
lock.unlock();
}
}
class Worker implements Runnable {
private HouseWare h;
public Worker(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true) {
h.put();
}
}
}
class Customer implements Runnable {
private HouseWare h;
public Customer(HouseWare h) {
super();
this.h = h;
}
@Override
public void run() {
while(true){
h.take();
}
}
}
單例模式(懶漢式)執行緒安全問題
class Single3{
private static Single3 instance;
private Single3(){}
public static Single3 getInstance(){
if(instance==null){
synchronized (Single3.class) {
if(instance==null){
instance = new Single3();
}
}
}
return instance;
}
/*//可以實現,但效率還可以提升
public synchronized static Single3 getInstance(){
if(instance==null){
instance = new Single3();
}
return instance;
}*/
}