1. 程式人生 > >佇列+多執行緒例項

佇列+多執行緒例項

原文章連結:https://blog.csdn.net/cai_chinasoft/article/details/51566632 

第一步:建立一個無邊界自動回收的執行緒池,在此用 JDK提供的ExecutorService類

此執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。


  1. package com.thread.test;  
  2. import java.util.concurrent.ExecutorService;  
  3. import java.util.concurrent.Executors;  
  4. publicclass ThreadPool {  
  5.     privatestatic ExecutorService threadPool = null;  
  6.     publicstatic ExecutorService getThreadPool(){  
  7.         if(threadPool==null){  
  8.             threadPool = Executors.newCachedThreadPool();  
  9.         }  
  10.         return  threadPool;  
  11.     }  
  12. }  

第二步:使用單例模式建立一個無界佇列,並提供入隊的方法

無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。

  1. package com.thread.test;  
  2. import java.util.concurrent.LinkedBlockingQueue;  
  3. publicclass TaskQueue {  
  4.     privatestatic  LinkedBlockingQueue queues = null;  
  5.     publicstatic LinkedBlockingQueue getTaskQueue(){  
  6.         if(queues==null){  
  7.             queues =  new LinkedBlockingQueue();  
  8.             System.out.println("初始化 佇列");  
  9.         }  
  10.         return queues;  
  11.     }  
  12.     publicstaticvoid add(Object obj){  
  13.         if(queues==null)  
  14.             queues =  getTaskQueue();  
  15.         queues.offer(obj);  
  16.         System.out.println("-------------------------------");  
  17.         System.out.println("入隊:"+obj);  
  18.     }  
  19. }  

第三步:提供一個入隊的執行緒,實際使用中的生產者
  1. package com.thread.test;  
  2. publicclass Produce implements Runnable {  
  3.     privatestaticvolatileint i=0;  
  4.     privatestaticvolatileboolean isRunning=true;  
  5.     publicvoid run() {  
  6.         while(isRunning){  
  7.             TaskQueue.add(Integer.valueOf(i+""));  
  8.             Produce.i++;  
  9.             try {  
  10.                 Thread.sleep(1*1000);  
  11.             } catch (InterruptedException e) {  
  12.                 e.printStackTrace();  
  13.             }  
  14.         }  
  15.     }  
  16. }  

第四步:提供一個出隊的執行緒,實際使用中的消費者
  1. package com.thread.test;  
  2. publicclass Consumer implements Runnable {  
  3.     privatestatic Consumer consumer;  
  4.     publicstaticvolatileboolean isRunning=true;  
  5.     publicvoid run() {  
  6.         while(Thread.currentThread().isInterrupted()==false && isRunning)    
  7.         {    
  8.             try {  
  9.                 System.out.println("出隊"+TaskQueue.getTaskQueue().take());  
  10.                 Thread.sleep(1*1000);    
  11.             } catch (InterruptedException e) {  
  12.                 e.printStackTrace();  
  13.             }  
  14.         }  
  15.     }  
  16.     publicstatic Consumer getInstance(){  
  17.         if(consumer==null){  
  18.             consumer = new Consumer();  
  19.             System.out.println("初始化消費執行緒");  
  20.         }  
  21.         return consumer;  
  22.     }  
  23. }  

第五步:啟動生產消費策略
  1. package com.thread.test;  
  2. import java.util.concurrent.ExecutorService;  
  3. import java.util.concurrent.LinkedBlockingQueue;  
  4. publicclass Test {  
  5.     publicstaticvoid main(String[] args) {  
  6.         ExecutorService threadPool = ThreadPool.getThreadPool();  
  7.         Produce consumer2 = new Produce();  
  8.         threadPool.execute(consumer2);  
  9.         Consumer consumer=Consumer.getInstance();  
  10.         threadPool.execute(consumer);  
  11.     }  
  12. }