18.並發類容器MQ
阿新 • • 發佈:2017-08-08
executors ice com blocking weak 消費者 integer ktr 生產
null
package demo7.MQ;
public class QueueData {
private int id;
private String name;
private String taskCode;
public QueueData() {
}
public QueueData(int id, String name, String taskCode) {
this.id = id;
this.name = name;
this.taskCode = taskCode;
}
public int
getId() {return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTaskCode() {
return taskCode;
}
public void setTaskCode
(String taskCode) {this.taskCode = taskCode;
}
}
package demo7.MQ;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Provider implements Runnable {
//共享緩存區
private BlockingQueue
<QueueData> queue;//多線程釋放啟動?
private volatile boolean isRunning = true;
//ID生成器
private static AtomicInteger count = new AtomicInteger();
//生產隨機對象
private static Random random = new Random();
public Provider(BlockingQueue<QueueData> queue) {
this.queue = queue;
}
@Override
public void run() {
while (isRunning){
try {
//隨機休眠 - 1000 表示讀取數據、生產數據的耗時
Thread.sleep(random.nextInt(1000));
//incrementAndGet 進行累加
int id = count.incrementAndGet();
QueueData queueData = new QueueData(id,"任務"+String.valueOf(id),String.valueOf(id).hashCode()+"");
System.err.println("線程:"+Thread.currentThread().getName()+"\t生產task:"+queueData.getName()+"\t"+queueData.getId());
if (!queue.offer(queueData,2, TimeUnit.SECONDS)){
System.err.println("!!!!!!!!!生產數據失敗 error");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning=false;
}
}
package demo7.MQ;
import java.util.Random;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<QueueData> queue;
public Consumer(BlockingQueue<QueueData> queue) {
this.queue = queue;
}
private static Random random = new Random();
@Override
public void run() {
while (true){
try {
//take:無阻塞
QueueData queueData = this.queue.take();
Thread.sleep(random.nextInt(1000));
System.err.println("線程:"+Thread.currentThread().getName()+"\t消費task->:"+queueData.getName()+"\t"+queueData.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package demo7.MQ;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MainMQ {
/**
* 生產者、消費者(多線程模式)
* 1.生產、消費:通常由2類線程,即若幹了生產者的線程、若幹個消費者的線程、
* 2.生產者線程負責提交用戶請求、消費者線程負責處理生產者提交的任務請求
* 3.生產者、消費者之間通過共享內存緩存進行通信
*/
public static void main(String[] args) {
//1.內存緩存區
BlockingQueue<QueueData> queueData = new LinkedBlockingQueue<QueueData>();
//2.生產者
Provider p1 = new Provider(queueData);
Provider p2 = new Provider(queueData);
Provider p3 = new Provider(queueData);
//3.消費者
Consumer c1 = new Consumer(queueData);
Consumer c2 = new Consumer(queueData);
Consumer c3 = new Consumer(queueData);
//創建【線程池】運行,可以創建n個線程,沒有任務的時候不創建線程,空閑線程存活時間為60s(默認)
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(p1);
executorService.execute(p2);
executorService.execute(p3);
executorService.execute(c1);
executorService.execute(c2);
executorService.execute(c3);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
null
18.並發類容器MQ