1. 程式人生 > >多執行緒學習-03

多執行緒學習-03

執行緒通訊

生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題(英語:Bounded-buffer problem),是一個多執行緒同步問題的經典案例。
該問題描述了兩個(多個)共享固定大小緩衝區的執行緒——即所謂的“生產者”和“消費者”——在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。
要解決該問題,就必須讓生產者在緩衝區滿時等待(wait),等到下次消費者消耗了緩衝區中的資料的時候,生產者才能被喚醒(notify),開始往緩衝區新增資料。同樣,也可以讓消費者在緩衝區空時進入等待(wait),等到生產者往緩衝區新增資料之後,再喚醒消費者(notify)。通常採用執行緒間通訊的方法解決該問題。
依據執行緒間的通訊方式解決這類的生產者和消費者的問題的模式,叫做生產者與消費者設計模式。
例如:

示例程式碼:如果一個生產者、一個消費者

package com.thread.communication;

public class TestCommunication {

	public static void main(String[] args) {
		HouseWare h = new HouseWare(10);
		Worker w = new Worker(h);
		Customer c = new Customer(h);
		new Thread(w).start();
		new Thread(c).start();
	}
}

class HouseWare {
	private int num;

	public HouseWare(int total) {
		this.num = total;
	}

	public void put() {
		num++;
		System.out.println("工人生產了一臺電視機,現在庫存為:" + num);
	}

	public void take() {
		num--;
		System.out.println("消費者買走了一臺電視機,現在庫存為:" + num);
	}
}

class Worker implements Runnable {
	private HouseWare h;

	public Worker(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		for (int i = 1; i <= 50; i++) {
			h.put();
		}
	}
}

class Customer implements Runnable {
	private HouseWare h;

	public Customer(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		for (int i = 1; i <= 50; i++) {
			h.take();
		}
	}
}

問題:

執行緒安全問題
透支消費問題
倉庫容量有限問題

解決辦法(一):synchronized+wait+notify

 執行緒安全問題:同步
 透支消費問題:執行緒通訊
 倉庫容量有限問題:執行緒通訊
wait() 與 notify() 和 notifyAll(),Java.lang.Object提供的這三個方法。
 1.public final void wait():該執行緒釋出對此監視器的所有權(即釋放鎖)並等待,直到其他執行緒通過呼叫 notify 方法,或 notifyAll 方法通知在此物件的監視器上等待的執行緒醒來。然後該執行緒將等到重新獲得對監視器的所有權後才能繼續執行。 
 2.public final void notify():喚醒在此物件監視器(鎖)上等待的單個執行緒。如果所有執行緒都在此物件上等待,則會選擇喚醒其中一個執行緒。選擇是任意性的,並在對實現做出決定時發生。
 3.public final void notifyAll():喚醒在此物件監視器(鎖)上等待(wait)的所有執行緒。
特別注意:
這三個方法只有在synchronized方法或synchronized程式碼塊中才能使用,否則會報java.lang.IllegalMonitorStateException異常。
因為這三個方法必須有鎖物件呼叫,而任意物件都可以作為synchronized的同步鎖,因此這三個方法只能在Object類中宣告。
package com.thread.communication;

public class TestCommunication {

	public static void main(String[] args) {
		HouseWare h = new HouseWare();
		Worker w = new Worker(h);
		Customer c = new Customer(h);
		new Thread(w).start();
		new Thread(c).start();
	}
}

class HouseWare {
	private final int MAX = 10;
	private int num;

	public synchronized void put() {
		if(num>=MAX){
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num++;
		System.out.println("工人生產了一臺電視機,現在庫存為:" + num);
		this.notify();
	}

	public synchronized void take() {
		if(num<=0){
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num--;
		System.out.println("消費者買走了一臺電視機,現在庫存為:" + num);
		this.notify();
	}
}

class Worker implements Runnable {
	private HouseWare h;

	public Worker(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		for (int i = 1; i <= 50; i++) {
			h.put();
		}
	}
}

class Customer implements Runnable {
	private HouseWare h;

	public Customer(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		for (int i = 1; i <= 50; i++) {
			h.take();
		}
	}
}

解決辦法(二):Lock+Condition

 執行緒安全問題:同步
 透支消費問題:執行緒通訊
 倉庫容量有限問題:執行緒通訊
而現在用Lock時,用的鎖是Lock物件。而Lock介面中並沒有直接操作等待喚醒的方法,而是將這些方式又單獨封裝到了一個物件中。這個物件就是Condition,將Object中的三個方法進行單獨的封裝。並提供了功能一致的方法 await()、signal()、signalAll()體現新版本物件的好處。
Condition 例項實質上被繫結到一個鎖上。要為特定 Lock 例項獲得 Condition 例項,請使用其 newCondition() 方法。 
package com.thread.communication;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class TestCommunication3 {

	public static void main(String[] args) {
		HouseWare h = new HouseWare();
		Worker w = new Worker(h);
		Customer c = new Customer(h);
		new Thread(w).start();
		new Thread(c).start();
	}
}

class HouseWare {
	private final int MAX = 10;
	private int num;
	private final ReentrantLock lock = new ReentrantLock(); 
	private final Condition full  = lock.newCondition(); //此處用一個Condition也可以,但為了語義更清晰,可以使用兩個Condition物件
	private final Condition empty = lock.newCondition(); 

	public void put() {
		lock.lock();
		if(num>=MAX){
			try {
				full.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num++;
		System.out.println("工人生產了一臺電視機,現在庫存為:" + num);
		empty.signal();
		lock.unlock();
	}

	public void take() {
		lock.lock();
		if(num<=0){
			try {
				empty.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num--;
		System.out.println("消費者買走了一臺電視機,現在庫存為:" + num);
		full.signal();
		lock.unlock();
	}
}

class Worker implements Runnable {
	private HouseWare h;

	public Worker(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true) {
			h.put();
		}
	}
}

class Customer implements Runnable {
	private HouseWare h;

	public Customer(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true){
			h.take();
		}
	}
}

多個生產者與多個消費者

如果多個生產者,多個消費者,仍然會有
 透支消費問題
 倉庫容量有限問題
package com.thread.communication;

public class TestCommunication2 {

	public static void main(String[] args) {
		HouseWare h = new HouseWare();
		Worker w1 = new Worker(h);
		Worker w2 = new Worker(h);
		Customer c1 = new Customer(h);
		Customer c2 = new Customer(h);
		new Thread(w1,"A").start();
		new Thread(w2,"B").start();
		new Thread(c1,"C").start();
		new Thread(c2,"D").start();
	}
}

class HouseWare {
	private final int MAX = 10;
	private int num;

	public synchronized void put() {
		if(num>=MAX){
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num++;
		System.out.println("工人"+Thread.currentThread().getName()+"生產了一臺電視機,現在庫存為:" + num);
		this.notify();
	}

	public synchronized void take() {
		if(num<=0){
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num--;
		System.out.println("消費者"+Thread.currentThread().getName()+"買走了一臺電視機,現在庫存為:" + num);
		this.notify();
	}
}

class Worker implements Runnable {
	private HouseWare h;

	public Worker(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true) {
			h.put();
		}
	}
}

class Customer implements Runnable {
	private HouseWare h;

	public Customer(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true) {
			h.take();
		}
	}
}

解決辦法(一):synchronized+wait+notifyAll

1、notify()改為notifyAll()
2、被喚醒後,再次判斷當前條件是否滿足,再執行業務程式碼
package com.thread.communication;

public class TestCommunication2 {

	public static void main(String[] args) {
		HouseWare h = new HouseWare();
		Worker w1 = new Worker(h);
		Worker w2 = new Worker(h);
		Customer c1 = new Customer(h);
		Customer c2 = new Customer(h);
		new Thread(w1,"A").start();
		new Thread(w2,"B").start();
		new Thread(c1,"C").start();
		new Thread(c2,"D").start();
	}
}
class HouseWare {
	private final int MAX = 10;
	private int num;

	public synchronized void put() {
		while(num>=MAX){//醒來後繼續判斷條件,不滿足繼續睡
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		//被喚醒後,結束該方法,重寫呼叫該方法
		}
		num++;
		System.out.println("工人"+Thread.currentThread().getName()+"生產了一臺電視機,現在庫存為:" + num);
		this.notifyAll();
		
	}

	public synchronized void take() {
		while(num<=0){//醒來後繼續判斷條件,不滿足繼續睡
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		}
		num--;
		System.out.println("消費者"+Thread.currentThread().getName()+"買走了一臺電視機,現在庫存為:" + num);
		this.notifyAll();
	}
}

class Worker implements Runnable {
	private HouseWare h;

	public Worker(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true) {
			h.put();
		}
	}
}

class Customer implements Runnable {
	private HouseWare h;

	public Customer(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true) {
			h.take();
		}
	}
}

解決辦法(二):Lock+Condition(可以指定專門的執行緒甦醒或者睡眠)可以是執行緒有順序的執行

package com.thread.communication;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class TestCommunication4 {

	public static void main(String[] args) {
		HouseWare h = new HouseWare();
		Worker w1 = new Worker(h);
		Worker w2 = new Worker(h);
		Customer c1 = new Customer(h);
		Customer c2 = new Customer(h);
		new Thread(w1,"A").start();
		new Thread(w2,"B").start();
		new Thread(c1,"C").start();
		new Thread(c2,"D").start();
	}
}

class HouseWare {
	private final int MAX = 10;
	private int num;
	private final ReentrantLock lock = new ReentrantLock(); 
	private final Condition full  = lock.newCondition(); 
	private final Condition empty = lock.newCondition(); 

	public void put() {
		lock.lock();
		while(num>=MAX){
			try {
				full.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num++;
		System.out.println("工人"+Thread.currentThread().getName()+"生產了一臺電視機,現在庫存為:" + num);
		empty.signalAll();
		lock.unlock();
	}

	public void take() {
		lock.lock();
		while(num<=0){
			try {
				empty.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		num--;
		System.out.println("消費者"+Thread.currentThread().getName()+"買走了一臺電視機,現在庫存為:" + num);
		full.signalAll();
		lock.unlock();
	}
}

class Worker implements Runnable {
	private HouseWare h;

	public Worker(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true) {
			h.put();
		}
	}
}

class Customer implements Runnable {
	private HouseWare h;

	public Customer(HouseWare h) {
		super();
		this.h = h;
	}

	@Override
	public void run() {
		while(true){
			h.take();
		}
	}
}

單例模式(懶漢式)執行緒安全問題 

class Single3{
	private static Single3 instance;
	private Single3(){}
	public static Single3 getInstance(){
		if(instance==null){
			synchronized (Single3.class) {
				if(instance==null){
					instance = new Single3();
				}
			}
		}
		return instance;
	}
	/*//可以實現,但效率還可以提升
	public synchronized static Single3 getInstance(){
		if(instance==null){
			instance = new Single3();
		}
		return instance;
	}*/
}