1. 程式人生 > >18.並發類容器MQ

18.並發類容器MQ

executors ice com blocking weak 消費者 integer ktr 生產


  1. package demo7.MQ;
  2. public class QueueData {
  3. private int id;
  4. private String name;
  5. private String taskCode;
  6. public QueueData() {
  7. }
  8. public QueueData(int id, String name, String taskCode) {
  9. this.id = id;
  10. this.name = name;
  11. this.taskCode = taskCode;
  12. }
  13. public int
    getId() {
  14. return id;
  15. }
  16. public void setId(int id) {
  17. this.id = id;
  18. }
  19. public String getName() {
  20. return name;
  21. }
  22. public void setName(String name) {
  23. this.name = name;
  24. }
  25. public String getTaskCode() {
  26. return taskCode;
  27. }
  28. public void setTaskCode
    (String taskCode) {
  29. this.taskCode = taskCode;
  30. }
  31. }
  1. package demo7.MQ;
  2. import java.util.Random;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. public class Provider implements Runnable {
  7. //共享緩存區
  8. private BlockingQueue
    <QueueData> queue;
  9. //多線程釋放啟動?
  10. private volatile boolean isRunning = true;
  11. //ID生成器
  12. private static AtomicInteger count = new AtomicInteger();
  13. //生產隨機對象
  14. private static Random random = new Random();
  15. public Provider(BlockingQueue<QueueData> queue) {
  16. this.queue = queue;
  17. }
  18. @Override
  19. public void run() {
  20. while (isRunning){
  21. try {
  22. //隨機休眠 - 1000 表示讀取數據、生產數據的耗時
  23. Thread.sleep(random.nextInt(1000));
  24. //incrementAndGet 進行累加
  25. int id = count.incrementAndGet();
  26. QueueData queueData = new QueueData(id,"任務"+String.valueOf(id),String.valueOf(id).hashCode()+"");
  27. System.err.println("線程:"+Thread.currentThread().getName()+"\t生產task:"+queueData.getName()+"\t"+queueData.getId());
  28. if (!queue.offer(queueData,2, TimeUnit.SECONDS)){
  29. System.err.println("!!!!!!!!!生產數據失敗 error");
  30. }
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. public void stop(){
  37. this.isRunning=false;
  38. }
  39. }
  1. package demo7.MQ;
  2. import java.util.Random;
  3. import java.util.WeakHashMap;
  4. import java.util.concurrent.BlockingQueue;
  5. public class Consumer implements Runnable{
  6. private BlockingQueue<QueueData> queue;
  7. public Consumer(BlockingQueue<QueueData> queue) {
  8. this.queue = queue;
  9. }
  10. private static Random random = new Random();
  11. @Override
  12. public void run() {
  13. while (true){
  14. try {
  15. //take:無阻塞
  16. QueueData queueData = this.queue.take();
  17. Thread.sleep(random.nextInt(1000));
  18. System.err.println("線程:"+Thread.currentThread().getName()+"\t消費task->:"+queueData.getName()+"\t"+queueData.getId());
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }
  1. package demo7.MQ;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.LinkedBlockingQueue;
  6. public class MainMQ {
  7. /**
  8. * 生產者、消費者(多線程模式)
  9. * 1.生產、消費:通常由2類線程,即若幹了生產者的線程、若幹個消費者的線程、
  10. * 2.生產者線程負責提交用戶請求、消費者線程負責處理生產者提交的任務請求
  11. * 3.生產者、消費者之間通過共享內存緩存進行通信
  12. */
  13. public static void main(String[] args) {
  14. //1.內存緩存區
  15. BlockingQueue<QueueData> queueData = new LinkedBlockingQueue<QueueData>();
  16. //2.生產者
  17. Provider p1 = new Provider(queueData);
  18. Provider p2 = new Provider(queueData);
  19. Provider p3 = new Provider(queueData);
  20. //3.消費者
  21. Consumer c1 = new Consumer(queueData);
  22. Consumer c2 = new Consumer(queueData);
  23. Consumer c3 = new Consumer(queueData);
  24. //創建【線程池】運行,可以創建n個線程,沒有任務的時候不創建線程,空閑線程存活時間為60s(默認)
  25. ExecutorService executorService = Executors.newCachedThreadPool();
  26. executorService.execute(p1);
  27. executorService.execute(p2);
  28. executorService.execute(p3);
  29. executorService.execute(c1);
  30. executorService.execute(c2);
  31. executorService.execute(c3);
  32. try {
  33. Thread.sleep(2000);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. p1.stop();
  38. p2.stop();
  39. p3.stop();
  40. try {
  41. Thread.sleep(1000);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. executorService.shutdown();
  46. }
  47. }



null

18.並發類容器MQ