Java執行緒(十):ThreadPoolExecutor+BlockingQueue執行緒池示例
阿新 • • 發佈:2018-12-31
首先定義擴充套件執行緒池ExtThreadPoolExecutor
ExtThreadPoolExecutor作用是對執行緒池的增強,如在初始化執行緒池時、線上程執行前、執行後等處可新增自定義邏輯。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExtThreadPoolExecutor extends ThreadPoolExecutor{
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
init();
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
init();
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
init();
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
init();
}
private void init(){
System.out.println("ExtThreadPoolExecutor init......");
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("beforeExecute...... begin" );
super.beforeExecute(t, r);
System.out.println("beforeExecute...... end" );
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("afterExecute...... begin" );
super.afterExecute(r, t);
System.out.println("afterExecute...... end" );
}
}
定義任務佇列WorkQueue
通過BlockingQueue存放任務執行緒,該處使用生產者、消費者模式。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class WorkQueue {
private volatile static BlockingQueue<WorkEvent> queue;
private WorkQueue(){}
/**
* 初始化佇列,延遲初始化,其實也可使用內部類單例模式
*/
private static void init(){
if(queue == null){
System.out.println("WorkQueue.queue null init........");
synchronized (WorkQueue.class) {
System.out.println("WorkQueue.queue after synchronized still null init........");
if (queue == null) {
queue = new LinkedBlockingDeque<WorkEvent>();
}
}
}
}
public static void putWorkEvent(WorkEvent workEvent){
init();
try {
queue.put(workEvent);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("WorkQueue.putWorkEvent fail........");
}
}
public static BlockingQueue<WorkEvent> getQueue() {
return queue;
}
}
業務處理
public class EventHandler {
/**
* 處理業務
* @param workEvent
*/
public static void handle(WorkEvent workEvent){
System.out.println("正在處理,workNo=[" + workEvent.getWorkNo() + "]");
}
}
工作執行緒
消費者端,阻塞接收訊息,並將訊息傳給實際需要者。
public class WorkThread implements Runnable{
@Override
public void run() {
while (true) {
try {
WorkEvent workEvent = WorkQueue.getQueue().take();
System.out.println("ThreadName[" + Thread.currentThread().getName() + "], 獲取到workEvent,workNo=[" + workEvent.getWorkNo() + "], ready handle");
EventHandler.handle(workEvent);
System.out.println("ThreadName[" + Thread.currentThread().getName() + "], 獲取到workEvent,workNo=[" + workEvent.getWorkNo() + "], finish handle");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
訊息實體
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
public class WorkEvent implements Serializable{
private static final long serialVersionUID = -1739230985770176506L;
/**
* 任務編號
*/
private String workNo;
/**
* 執行次數
*/
private AtomicInteger num;
public WorkEvent(String workNo) {
this.workNo = workNo;
this.num = new AtomicInteger(0);
}
public String getWorkNo() {
return workNo;
}
public void setWorkNo(String workNo) {
this.workNo = workNo;
}
public AtomicInteger getNum() {
return num;
}
public void setNum() {
this.num.incrementAndGet();
}
}
呼叫示例:
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class StartWork {
public static void main(String[] args) {
System.out.println("準備放任務執行緒");
int workNum = 6;
for (int i = 0; i < workNum; i++) {
WorkEvent workEvent = new WorkEvent("任務執行緒" + i);
WorkQueue.putWorkEvent(workEvent);
}
// 初始化執行緒池
ExtThreadPoolExecutor executor = new ExtThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// 先準備工作執行緒
System.out.println("準備五個工作執行緒");
executor.execute(new WorkThread());
executor.execute(new WorkThread());
executor.execute(new WorkThread());
executor.execute(new WorkThread());
executor.execute(new WorkThread());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10s後 。。。 準備放任務執行緒");
for (int i = 0; i < workNum; i++) {
WorkEvent workEvent = new WorkEvent("10s 後 任務執行緒" + i);
WorkQueue.putWorkEvent(workEvent);
}
}
}
結果示例
程式碼大體流程:訊息定義成實體WorkEvent,放入WorkQueue中,然後由ExtThreadPoolExecutor執行緒池開啟接收端執行緒WorkThread,由WorkThread獲取訊息,並通知實際需要者EventHandler,EventHandler處理訊息。