java實現生產者-消費者模型
阿新 • • 發佈:2019-01-11
1.方式一:使用synchronize以及wait()、notify() /notifyAll() 的配合使用
package producercomsumer;
/**
* 描述:
*
* @author: fangchangtan
* @version 建立時間:2019年1月7日 下午7:19:26
*/
public class ChangtanTest {
public static void main(String[] args) {
String lock = "lock";
for (int i = 0; i < 3; i++) {
Thread thread1 = new Consumer(lock);
thread1.start();
}
for (int i = 0; i < 3; i++) {
Thread thread2 = new Producer(lock);
thread2.start();
}
}
}
class Consumer extends Thread {
private String lock;
public Consumer(String lock) {
this.lock = lock;
}
public void getValue() {
try {
synchronized (lock) {
if (ValueObject.value == null) {
lock.wait();
}
System.out.println("Consumer:" + ValueObject.value);
ValueObject.value = null;
lock.notifyAll();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
getValue();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class Producer extends Thread {
private String lock;
public Producer(String lock) {
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock) {
if (ValueObject.value != null) {
lock.wait();
}
ValueObject.value = System.currentTimeMillis() + "";
System.out.println("Producer:" + ValueObject.value);
lock.notifyAll();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
setValue();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class ValueObject {
public static String value;
}
方式二、使用lock鎖、配合await() 、signal() 方式
package producercomsumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 描述:
*
* @author: fangchangtan
* @version 建立時間:2019年1月7日 下午7:30:26
*/
public class TankTest {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread thread1 = new Consumer1(lock, condition);
Thread thread2 = new Producer1(lock, condition);
thread1.start();
thread2.start();
}
}
class Consumer1 extends Thread {
private ReentrantLock lock;
private Condition condition;
public Consumer1(ReentrantLock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
public void getValue() {
try {
lock.lock();
if (ValueObject.value == null) {
condition.await();
}
System.out.println("Consumer:" + ValueObject.value);
ValueObject.value = null;
condition.signal();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public void run() {
while (true) {
getValue();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class Producer1 extends Thread {
private ReentrantLock lock;
private Condition condition;
public Producer1(ReentrantLock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
public void setValue() {
try {
lock.lock();
if (ValueObject.value != null) {
condition.await();
}
ValueObject.value = System.currentTimeMillis() + "";
System.out.println("Producer:" + ValueObject.value);
condition.signal();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public void run() {
while (true) {
setValue();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
方式三:使用java自身提供的同步阻塞佇列
略…(晚上有示例,自身參考吧)
自身在使用中遇到的問題:
1.類檔案MyMain.java.。主函式實現多執行緒生產和消費
package producercomsumer;
/**
* 描述:
* @author: fangchangtan
* @version 建立時間:2019年1月7日 下午6:57:48
*/
public class MyMain {
private static String lock = "lock";
// private static String value = null;
public static void main(String[] args) {
ProducerX producerX = new ProducerX(lock);
ComsumerX comsumerX = new ComsumerX(lock);
producerX.start();
comsumerX.start();
}
}
class ValueObject {
public static String value;
}
生產者ProducerX :
package producercomsumer;
import java.util.Random;
/**
* 描述:
*
* @author: fangchangtan
* @version 建立時間:2019年1月7日 下午6:56:16
*/
public class ProducerX extends Thread{
private String lock;
private Random random = new Random();
public ProducerX(String lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
try {
synchronized (lock) {
if (ValueObject.value != null) {
lock.wait();
}
ValueObject.value = String.valueOf(random.nextInt(100));
System.out.println("ProducerX: " + ValueObject.value);
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費者ComsumerX:
package producercomsumer;
import com.sun.org.apache.xalan.internal.lib.ExsltBase;
/**
* 描述:
*
* @author: fangchangtan
* @version 建立時間:2019年1月7日 下午6:57:17
*/
public class ComsumerX extends Thread{
private String lock;
public ComsumerX(String lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
try {
synchronized (lock) {
if (ValueObject.value == null) {
lock.wait();
}
System.out.println("ComsumerX: " + ValueObject.value);
ValueObject.value = null;
// System.out.println("ComsumerX: " + str);
lock.notify();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
(以上的程式碼是沒問題的,但是自身在之前的實現過程中出現很多異常情況)
自身在使用存在的問題:
1.自身使用字串變數String value;來儲存生產的訊息物件,導致的結果是,生產者生產完之後無法喚醒消費者消費資料。是因為,消費者和生產者操作的不是同一個訊息容器或者物件,即:String型別的物件是通過值來傳遞的,因此做不到多執行緒共享同一個物件,此處後期改為: ValueObject.value來作為訊息快取的容器,可以正常的生產和消費。