1. 程式人生 > >java多執行緒和佇列例項

java多執行緒和佇列例項

第一步:建立一個無邊界自動回收的執行緒池,在此用 JDK提供的ExecutorService類

此執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。


package com.thread.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
	private static ExecutorService threadPool = null;
	public static ExecutorService getThreadPool(){
		if(threadPool==null){
			threadPool = Executors.newCachedThreadPool();
		}
		return 	threadPool;
	}

}

第二步:使用單例模式建立一個無界佇列,並提供入隊的方法

無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。

package com.thread.test;

import java.util.concurrent.LinkedBlockingQueue;

public class TaskQueue {
	private static  LinkedBlockingQueue queues = null;
	
	public static LinkedBlockingQueue getTaskQueue(){
		if(queues==null){
			queues =  new LinkedBlockingQueue();
			System.out.println("初始化 佇列");
		}
		return queues;
	}
	
	public static void add(Object obj){
		if(queues==null)
			queues =  getTaskQueue();
		queues.offer(obj);
		System.out.println("-------------------------------");
		System.out.println("入隊:"+obj);
	}
}

第三步:提供一個入隊的執行緒,實際使用中的生產者
package com.thread.test;

public class Produce implements Runnable {
	private static volatile int i=0;
	private static volatile boolean isRunning=true;

	public void run() {
		while(isRunning){
			TaskQueue.add(Integer.valueOf(i+""));
			Produce.i++;
			try {
				Thread.sleep(1*1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}

}

第四步:提供一個出隊的執行緒,實際使用中的消費者
package com.thread.test;

public class Consumer implements Runnable {
	private static Consumer consumer;
	
	public static volatile boolean isRunning=true;
	public void run() {
		while(Thread.currentThread().isInterrupted()==false && isRunning)  
        {  
			try {
				System.out.println("出隊"+TaskQueue.getTaskQueue().take());
				Thread.sleep(1*1000);  
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
        }
		
	}
	public static Consumer getInstance(){
		if(consumer==null){
			consumer = new Consumer();
			System.out.println("初始化消費執行緒");
		}
		return consumer;
	}

}

第五步:啟動生產消費策略
package com.thread.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

public class Test {
	
	public static void main(String[] args) {
		ExecutorService threadPool = ThreadPool.getThreadPool();
		Produce consumer2 = new Produce();
		threadPool.execute(consumer2);
		Consumer consumer=Consumer.getInstance();
		threadPool.execute(consumer);
	}

}