springboot之多工並行+執行緒池處理
阿新 • • 發佈:2019-01-06
最近專案中做到一個關於批量發簡訊的業務,如果使用者量特別大的話,不能使用單執行緒去發簡訊,只能嘗試著使用多工來完成!我們的專案使用到了方式二,即Future的方案
Java 執行緒池
Java通過Executors提供四種執行緒池,分別為: newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。 newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。 newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。 newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。 優點 重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。 可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。 提供定時執行、定期執行、單執行緒、併發數控制等功能。
方式一(CountDownLatch)
public class StatsDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
final static String startTime = sdf.format(new Date());
/**
* IO密集型任務 = 一般為2*CPU核心數(常出現於執行緒中:資料庫資料互動、檔案上傳下載、網路資料傳輸等等)
* CPU密集型任務 = 一般為CPU核心數+1(常出現於執行緒中:複雜演算法)
* 混合型任務 = 視機器配置和複雜度自測而定
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors();
/**
* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
* TimeUnit unit,BlockingQueue<Runnable> workQueue)
* corePoolSize用於指定核心執行緒數量
* maximumPoolSize指定最大執行緒數
* keepAliveTime和TimeUnit指定執行緒空閒後的最大存活時間
* workQueue則是執行緒池的緩衝佇列,還未執行的執行緒會在佇列中等待
* 監控佇列長度,確保佇列有界
* 不當的執行緒池大小會使得處理速度變慢,穩定性下降,並且導致記憶體洩露。如果配置的執行緒過少,則佇列會持續變大,消耗過多記憶體。
* 而過多的執行緒又會 由於頻繁的上下文切換導致整個系統的速度變緩——殊途而同歸。佇列的長度至關重要,它必須得是有界的,這樣如果執行緒池不堪重負了它可以暫時拒絕掉新的請求。
* ExecutorService 預設的實現是一個無界的 LinkedBlockingQueue。
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000));
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
//使用execute方法
executor.execute(new Stats("任務A", 1000, latch));
executor.execute(new Stats("任務B", 1000, latch));
executor.execute(new Stats("任務C", 1000, latch));
executor.execute(new Stats("任務D", 1000, latch));
executor.execute(new Stats("任務E", 1000, latch));
latch.await();// 等待所有人任務結束
System.out.println("所有的統計任務執行完成:" + sdf.format(new Date()));
}
static class Stats implements Runnable {
String statsName;
int runTime;
CountDownLatch latch;
public Stats(String statsName, int runTime, CountDownLatch latch) {
this.statsName = statsName;
this.runTime = runTime;
this.latch = latch;
}
public void run() {
try {
System.out.println(statsName+ " do stats begin at "+ startTime);
//模擬任務執行時間
Thread.sleep(runTime);
System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
latch.countDown();//單次任務結束,計數器減一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
結果
方式二(Future)
重點是和springboot整合,採用註解bean方式生成ThreadPoolTaskExecutor
@Bean
//spring依賴包
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class GlobalConfig {
/**
* 預設執行緒池執行緒池
*
* @return Executor
*/
@Bean
public ThreadPoolTaskExecutor defaultThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心執行緒數目
executor.setCorePoolSize(16);
//指定最大執行緒數
executor.setMaxPoolSize(64);
//佇列中最大的數目
executor.setQueueCapacity(16);
//執行緒名稱字首
executor.setThreadNamePrefix("defaultThreadPool_");
//rejection-policy:當pool已經達到max size的時候,如何處理新任務
//CALLER_RUNS:不在新執行緒中執行任務,而是由呼叫者所在的執行緒來執行
//對拒絕task的處理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執行緒空閒後的最大存活時間
executor.setKeepAliveSeconds(60);
//載入
executor.initialize();
return executor;
}
}
使用
//通過註解引入配置
@Resource(name = "defaultThreadPool")
private ThreadPoolTaskExecutor executor;
//使用Future方式執行多工
//生成一個集合
List<Future> futures = new ArrayList<>();
//獲取後臺全部有效運營人員的集合
List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);
for (AdminUserMsgResponse response : adminUserDOList) {
//併發處理
if (response.getMobile() != null) {
Future<?> future = executor.submit(() -> {
//傳送簡訊
mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent());
});
futures.add(future);
}
}
//查詢任務執行的結果
for (Future<?> future : futureList) {
while (true) {//CPU高速輪詢:每個future都併發輪循,判斷完成狀態然後獲取結果,這一行,是本實現方案的精髓所在。即有10個future在高速輪詢,完成一個future的獲取結果,就關閉一個輪詢
if (future.isDone()&& !future.isCancelled()) {//獲取future成功完成狀態,如果想要限制每個任務的超時時間,取消本行的狀態判斷+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超時異常使用即可。
Integer i = future.get();//獲取結果
System.out.println("任務i="+i+"獲取完成!"+new Date());
list.add(i);
break;//當前future獲取結果完畢,跳出while
} else {
Thread.sleep(1);//每次輪詢休息1毫秒(CPU納秒級),避免CPU高速輪循耗空CPU---》新手別忘記這個
}
}
}