佇列+多執行緒例項
阿新 • • 發佈:2019-01-09
原文章連結:https://blog.csdn.net/cai_chinasoft/article/details/51566632
第一步:建立一個無邊界自動回收的執行緒池,在此用 JDK提供的ExecutorService類
此執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。
- package com.thread.test;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- publicclass ThreadPool {
- privatestatic ExecutorService threadPool = null;
- publicstatic ExecutorService getThreadPool(){
- if(threadPool==null){
- threadPool = Executors.newCachedThreadPool();
- }
- return threadPool;
- }
- }
第二步:使用單例模式建立一個無界佇列,並提供入隊的方法
無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
- package com.thread.test;
- import java.util.concurrent.LinkedBlockingQueue;
- publicclass TaskQueue {
- privatestatic LinkedBlockingQueue queues = null;
- publicstatic LinkedBlockingQueue getTaskQueue(){
- if(queues==null){
- queues = new LinkedBlockingQueue();
- System.out.println("初始化 佇列");
- }
- return queues;
- }
- publicstaticvoid add(Object obj){
- if(queues==null)
- queues = getTaskQueue();
- queues.offer(obj);
- System.out.println("-------------------------------");
- System.out.println("入隊:"+obj);
- }
- }
第三步:提供一個入隊的執行緒,實際使用中的生產者
- package com.thread.test;
- publicclass Produce implements Runnable {
- privatestaticvolatileint i=0;
- privatestaticvolatileboolean isRunning=true;
- publicvoid run() {
- while(isRunning){
- TaskQueue.add(Integer.valueOf(i+""));
- Produce.i++;
- try {
- Thread.sleep(1*1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
第四步:提供一個出隊的執行緒,實際使用中的消費者
- package com.thread.test;
- publicclass Consumer implements Runnable {
- privatestatic Consumer consumer;
- publicstaticvolatileboolean isRunning=true;
- publicvoid run() {
- while(Thread.currentThread().isInterrupted()==false && isRunning)
- {
- try {
- System.out.println("出隊"+TaskQueue.getTaskQueue().take());
- Thread.sleep(1*1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- publicstatic Consumer getInstance(){
- if(consumer==null){
- consumer = new Consumer();
- System.out.println("初始化消費執行緒");
- }
- return consumer;
- }
- }
第五步:啟動生產消費策略
- package com.thread.test;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- publicclass Test {
- publicstaticvoid main(String[] args) {
- ExecutorService threadPool = ThreadPool.getThreadPool();
- Produce consumer2 = new Produce();
- threadPool.execute(consumer2);
- Consumer consumer=Consumer.getInstance();
- threadPool.execute(consumer);
- }
- }