純Java——簡易高並發框架
轉自:https://blog.csdn.net/MonkeyDCoding/article/details/81369610
0.源代碼
github-簡易高並發框架
註:本篇博客知識來自於網課。
1.問題來源以及w
對於一個題庫系統。考試組要有批量的離線文檔要生成。題庫組批量的題目要進行排重,且要根據條件批量修改題目內容。對於
痛點:
批量任務完成緩慢
所有的問題都圍繞著“查詢”,即查詢進度影響總體性能
我們希望盡量使用友好(如果用多線程來提高性能,我們希望能屏蔽細節)
因此我們需要一個可以提供查詢進度通用的框架。
2.我們該怎麽做?
這裏先要明確“任務”(Task)和“工作”(Job)的關系。對於一個工作,他內部可能須有許多的任務,任務是他的子元素(屬性、字段)。
用並發安全的類確保每個工作的屬性和工作下的每個任務信息,也意味著工作和任務的註冊機制。
需要並發安全的類保存每個任務的處理結果(TaskResult)。
需要提供查詢接口,供外部的使用。
這裏我們不處理對於工作的檢查。有興趣的可以實現。
3.總體流程
這裏不按照流程講解,而是按照類關系從下而上講解。
4.目錄結構
5.TaskResultType
package me.hcFramework.pool.vo;
//這個類只是用來作為標誌的信息。
public enum TaskResultType {
SUCCESS, //表示任務成功
FAILSURE, //表示任務失敗
EXCEPTION; //表示發生了異常,這裏我們不去詳盡判斷,只用這個標示來籠統表示
6.TaskResult
package me.hcFramework.pool.vo;
/**
*
* @param <R> 業務方法處理後的業務結果數據的類型
*
* 對屬性使用final修飾是為了使其不可改
*/
public class TaskResult<R> {
//用戶業務是否成功完成
private final TaskResultType resultType;
//業務方法處理後的業務結果數據
private final R returnType;
//如果失敗,則失敗原因
private final String reason;
//針對任務失敗的構造方法
public TaskResult(TaskResultType resultType , R returnType , String reason) {
this.returnType = returnType;
this.reason = reason;
}
//針對任務成功的構造方法
public TaskResult(TaskResultType resultType , R returnType) {
this.resultType = resultType;
this.returnType = returnType;
this.reason = "success";
}
//因為我們希望字段不可改,設置為了final。所以只提供getters
public TaskResultType getResultType() {
return resultType;
}
public R getReturnType() {
return returnType;
}
public String getReason() {
return reason;
}
@Override
public String toString() {
return "TaskResult [resultType=" + resultType + ", returnType=" + returnType + ", reason=" + reason + "]";
}
}
在這裏其實可以發生一點小改動。即:把錯誤信息歸並到TaskResultType中。這樣一個TaskResultType包括成功,錯誤/異常以及其原因就完整了。這裏不過多介紹。
7.JobInfo
package me.hcFramework.pool.vo;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 可以看作是一堆Task的打包+信息控制
* 與TaskResult一樣,一旦設置好了就不許再次更改
*/
public class JobInfo<R> {
//唯一性標誌
private final String jobName;
//任務處理器,要求業務人員實現接口
private final ITaskProcessor<?, ?> taskProcessor;
//工作(Job)中任務(Task)的數量
private final int jobLength;
//以下兩個類保證操作原子性
//任務總執行成功個數
private AtomicInteger successCount;
//已執行的任務總數
private AtomicInteger taskProcessCount;
//每個任務的處理結果,供查詢調用
private LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
public JobInfo(String jobName , int jobLength , ITaskProcessor<?,?> taskProcessor) {
this.jobName = jobName;
this.jobLength = jobLength;
this.taskProcessor = taskProcessor;
this.successCount = new AtomicInteger(0);
this.taskProcessCount = new AtomicInteger(0);
this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);
}
//提供工作的整體進度信息
public String getTotalProcess() {
return "success[" + successCount.get()+"]/current[" + taskProcessCount.get() + "],Total=[" + jobLength + "]";
}
//取得工作中每個任務的詳情
public List<TaskResult<R>> getTaskDetail() {
List<TaskResult<R>> taskDetailList = new LinkedList<TaskResult<R>>();
TaskResult<R> taskResult;
//pollFirst()方法返回雙端隊列的第一個元素,返回的元素會從列表中移除
while((taskResult = taskDetailQueue.pollFirst()) != null) {
taskDetailList.add(taskResult);
}
return taskDetailList;
}
//放入工作詳情,只需要保證最終個數正確即可,不需要加鎖
public void addTaskResult(TaskResult<R> result) {
if(TaskResultType.SUCCESS == result.getResultType()) {
successCount.getAndIncrement();
}
taskProcessCount.getAndIncrement();
taskDetailQueue.add(result);
}
public String getJobName() {
return jobName;
}
public ITaskProcessor<?, ?> getTaskProcessor() {
return taskProcessor;
}
public int getJobLength() {
return jobLength;
}
public int getSuccessCount() {
return successCount.get();
}
public int getTaskProcessCount() {
return taskProcessCount.get();
}
@Override
public String toString() {
return "JobInfo [jobName=" + jobName + ", taskProcessor=" + taskProcessor + ", jobLength=" + jobLength
+ ", successCount=" + successCount + ", taskProcessCount=" + taskProcessCount + ", taskDetailQueue="
+ taskDetailQueue + "]";
}
}
關於LinkedBlockingDeque的說明:他是線程安全的。他是雙端隊列,任何一端都可以進行元素的出入。
8.ITaskProcessor
package me.hcFramework.pool.vo;
/**
* 定義接口,所有需要完成的任務都需要實現此接口進行
*
* @param <T> 業務方法需要的數據
* @param <R> 業務方法處理後的業務結果數據的類型
*/
public interface ITaskProcessor<T ,R> {
TaskResult<R> taskExecute(T data);
}
9.真正的黑箱子:PendingJobPool
package me.hcFramework.pool;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import me.hcFramework.pool.vo.ITaskProcessor;
import me.hcFramework.pool.vo.JobInfo;
import me.hcFramework.pool.vo.TaskResult;
import me.hcFramework.pool.vo.TaskResultType;
/**
*
* 這是框架主體類
*/
public class PendingJobPool {
//key = 每個工作的名字 jobInfo.jobName
//工作的存放容器,用於完成工作的註冊
private static ConcurrentHashMap<String,JobInfo<?>> jobInfoMap =
new ConcurrentHashMap<String,JobInfo<?>>();
//單例模式組合拳:類內部實例化+私有構造方法+靜態get方法
private static PendingJobPool pool = new PendingJobPool();
private PendingJobPool(){
}
public static PendingJobPool getPool() {
//這裏是為了完善邏輯,且為日後框架加入檢查功能預留空間
//當然這裏也可成為後續版本AOP的切點
//checkJob.initCheck(jobInfoMap);
return pool;
}
//根據工作名稱,拿工作的實體
@SuppressWarnings("unchecked")
public <R> JobInfo<R> getJob(String jobName) {
JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
if(null == jobInfo) {
throw new RuntimeException(jobName + "是非法任務!");
}
return jobInfo;
}
//獲得處理詳情,這裏不對與jobName作出檢查
public <R> List<TaskResult<R>> getTaskDetail(String jobName) {
JobInfo<R> jobInfo = getJob(jobName);
return jobInfo.getTaskDetail();
}
//獲得處理進度
public String getTaskProgess(String jobName) {
return getJob(jobName).getTotalProcess();
}
//獲得當前已處理多少個任務
public int getDoneCount(String jobName) {
return getJob(jobName).getTaskProcessCount();
}
/**
* 註冊方法:註冊工作(job)
* @param jobName 名字
* @param jobLength 工作中任務的長度
* @param taskProcessor 業務處理器
*/
public <R> void registerJob(String jobName , int jobLength , ITaskProcessor<?,?> taskProcessor) {
JobInfo<R> jobInfo = new JobInfo<R>(jobName, jobLength, taskProcessor);
//putIfAbsent()如果map中沒有該工作,則放入且返回null;如果已有會返回對象
if(jobInfoMap.putIfAbsent(jobName, jobInfo) != null) {
throw new RuntimeException(jobName + "已經註冊過了");
}
}
/**
* 提交任務
* @param jobName 任務所對應的工作名
* @param t 任務數據
*/
public <T ,R> void putTask(String jobName , T t) {
JobInfo<R> jobInfo = getJob(jobName);
PendingTask<T ,R> task = new PendingTask<T ,R>(jobInfo , t);
taskExecutor.execute(task);
}
//取得當前機器上的CPU數量
private static final int THREAD_COUNTS = Runtime.getRuntime().availableProcessors();
//阻塞隊列,線程池使用,用以存放待處理的任務
private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5000);
//線程池,固定大小,有界隊列
private static ExecutorService taskExecutor = new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60, TimeUnit.SECONDS, taskQueue);
public void closePool() {
taskExecutor.shutdown();
}
//交給我們框架執行的任務
private static class PendingTask<T , R> implements Runnable {
private JobInfo<R> jobInfo;
private T processData;
public PendingTask(JobInfo<R> jobInfo , T processData) {
this.jobInfo = jobInfo;
this.processData = processData;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
ITaskProcessor<T, R> taskProcessor = (ITaskProcessor<T, R>) jobInfo.getTaskProcessor();
TaskResult<R> result = null;
try{
result = taskProcessor.taskExecute(processData);
if(result== null) {
result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "is null");
}else if(result.getResultType() == null) {
//如果你看懂這個判斷,就會覺得很厲害同時又會感到羞辱
if(result.getReason() == null) {
result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "reason is null");
} else {
result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "type is null");
}
}
} catch (Exception e) {
result = new TaskResult<R>(TaskResultType.EXCEPTION, null ,
"task exception" + e.getMessage());
} finally {
jobInfo.addTaskResult(result);
}
}
}
}
如果讀者了解Spring的實現,會知道bean的註冊過程其實也就是放入了Map中。或者讀者也曾經開發過一些需要註冊功能的應用,無疑都是使用了Map。除了Map的高性能,真的可以說是:聰明人都只用一種聰明法。
10.測試
自己實現ITaskProcessor接口
public class MyTask implements ITaskProcessor<Integer, Integer>{
@Override
public TaskResult<Integer> taskExecute(Integer data) {
Random r = new Random();
int flag = r.nextInt(500);
try {
Thread.sleep(flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(flag <= 300) {//正常處理的情況
Integer returnValue = data.intValue() + flag;
return new TaskResult<Integer>(TaskResultType.SUCCESS, returnValue);
} else if(flag > 300 && flag <= 400) {//處理失敗的情況
return new TaskResult<Integer>(TaskResultType.FAILSURE, -1 , "Failsure");
} else {
try {
throw new RuntimeException("異常發生了!!");
} catch(Exception e) {
return new TaskResult<Integer>(TaskResultType.EXCEPTION, -1 ,e.getMessage());
}
}
}
}
Test類
public class AppTest {
private final static String JOB_NAME="計算數值";
//private final static String JOB_OTHER_NAME = "字符串";
private final static int JOB_LENGTH = 150;
private static class QueryResult implements Runnable {
private PendingJobPool pool;
private String jobName;
public QueryResult(PendingJobPool pool , String jobName) {
this.pool = pool;
this.jobName = jobName;
}
@Override
public void run() {
while(pool.getDoneCount(jobName) <= JOB_LENGTH) {
List<TaskResult<String>> taskDetail = pool.getTaskDetail(jobName);
if(!taskDetail.isEmpty()) {
System.out.println(pool.getTaskProgess(jobName));
System.out.println(taskDetail);
}
if(pool.getDoneCount(jobName) == JOB_LENGTH) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
PendingJobPool pool = PendingJobPool.getPool();
MyTask myTask = new MyTask();
pool.registerJob(JOB_NAME, JOB_LENGTH, myTask);
Random r = new Random();
for(int i = 0 ; i < JOB_LENGTH ; i++) {
pool.putTask(JOB_NAME, r.nextInt(1000));
}
new Thread(new QueryResult(pool, JOB_NAME)).start();
}
}
Test類中實現了一個用來查詢的線程。
---------------------
作者:MonkeyDCoding
來源:CSDN
原文:https://blog.csdn.net/MonkeyDCoding/article/details/81369610
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
純Java——簡易高並發框架