Java ExecutorService四種線程池的簡單使用
我們都知道創建一個線程可以繼承Thread類或者實現Runnable接口,實際Thread類就是實現了Runnable接口。
到今天才明白後端線程的作用:我們可以開啟線程去執行一些比較耗時的操作,類似於前臺的ajax異步操作,比如說用戶上傳一個大的文件,我們可以獲取到文件之後開啟一個線程去操作該文件,但是可以提前將結果返回去,如果同步處理有可能太耗時,影響系統可用性。
1、new Thread的弊端
原生的開啟線程執行異步任務的方式:
new Thread(new Runnable() { @Override public void run() {// TODO Auto-generated method stub } }).start();
弊端如下:
- 每次new Thread新建對象性能差。
- 線程缺乏統一管理,可能無限制新建線程,相互之間競爭,及可能占用過多系統資源導致死機或oom。
- 缺乏更多功能,如定時執行、定期執行、線程中斷。
相比new Thread,Java提供的四種線程池的好處在於:
- 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
- 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
- 提供定時執行、定期執行、單線程、並發數控制等功能。
2.Java線程池的使用
Java通過Executors提供四種線程池,分別為:
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
可以用ExecutorService接受返回值,ExecutorService是繼承Executor的一個接口,ThreadPoolExecutor繼承自AbstractExecutorService,AbstractExecutorService實現ExecutorService接口。一般也用作一個類的靜態成員變量,所有實例共用一個ExecutorService對象。
MyThread線程類:繼承Thread並且重寫run方法,run方法中間隔一秒打印一次線程名字
package cn.qlq.threadTest; /** * 原生的線程類Thread的使用方法 * * @author Administrator * */ public class MyThread extends Thread { /** * 更改線程名字 * * @param threadName */ public MyThread(String threadName) { this.setName(threadName); } @Override public void run() { for(int i=0;i<5;i++){ System.out.println(getName()+"-------"+i); try { Thread.sleep(1*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
1.FixedThreadPool的用法
創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
創建方法:
/** * 參數是初始化線程池子的大小 */ private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);
查看源碼:(使用了阻塞隊列,超過池子容量的線程會在隊列中等待)
測試代碼:
package cn.qlq.threadTest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FixedThreadPoolTest { /** * 參數是初始化線程池子的大小 */ private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { for(int i = 0;i < 3;i++){ batchTaskPool.execute(new MyThread("mt"+i)); } } }
結果:
mt1-------0
mt0-------0
mt0-------1
mt1-------1
mt1-------2
mt0-------2
mt1-------3
mt0-------3
mt1-------4
mt0-------4
mt2-------0
mt2-------1
mt2-------2
mt2-------3
mt2-------4
解釋:
池子容量大小是2,所以mt0和mt1可以加入到線程池子中,mt2只是暫時的加到等待隊列,等mt0或者mt1執行完成之後從隊列移除之後mt2就有機會執行。。。。。。。。
定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()
2.CachedThreadPool
創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
創建方法:
private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();
查看源碼:(使用了同步隊列)
測試代碼:
package cn.qlq.threadTest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CachedThreadPoolTest { private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool(); public static void main(String[] args) { for(int i = 0;i < 3;i++){ batchTaskPool.execute(new MyThread("mt"+i)); } } }
結果:
mt0-------0
mt1-------0
mt2-------0
mt0-------1
mt1-------1
mt2-------1
mt0-------2
mt1-------2
mt2-------2
mt1-------3
mt0-------3
mt2-------3
mt1-------4
mt0-------4
mt2-------4
3.SingleThreadExecutor用法
創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。類似於單線程執行的效果一樣。
創建方法:
private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();
查看源碼;使用的阻塞隊列
測試代碼:
package cn.qlq.threadTest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SingleThreadExecutorTest { private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor(); public static void main(String[] args) { for(int i = 0;i < 3;i++){ batchTaskPool.execute(new MyThread("mt"+i)); } } }
結果:
mt0-------0
mt0-------1
mt0-------2
mt0-------3
mt0-------4
mt1-------0
mt1-------1
mt1-------2
mt1-------3
mt1-------4
mt2-------0
mt2-------1
mt2-------2
mt2-------3
mt2-------4
4.ScheduledThreadPool用法------可以實現任務調度功能
創建一個定長線程池(會指定容量初始化大小),支持定時及周期性任務執行。可以實現一次性的執行延遲任務,也可以實現周期性的執行任務。
創建方法:
private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);
查看源碼:(使用了延遲隊列)
測試代碼:
package cn.qlq.threadTest; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledThreadPoolTest { private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2); public static void main(String[] args) { for (int i = 0; i < 3; i++) { //第一次執行是在3s後執行(延遲任務) batchTaskPool.schedule(new MyThread("my" + i), 3, TimeUnit.SECONDS); //第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位 batchTaskPool.scheduleAtFixedRate(new MyThread("my" + i), 3,7, TimeUnit.SECONDS); //第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位 batchTaskPool.scheduleWithFixedDelay(new MyThread("my" + i), 3,5, TimeUnit.SECONDS); } } }
schedule是一次性的任務,可以指定延遲的時間。
scheduleAtFixedRate已固定的頻率來執行某項計劃(任務)
scheduleWithFixedDelay相對固定的延遲後,執行某項計劃 (這個就是第一個任務執行完5s後再次執行,一般用這個方法任務調度)
關於二者的區別:
scheduleAtFixedRate :這個是按照固定的時間來執行,簡單來說:到點執行
scheduleWithFixedDelay:這個呢,是等上一個任務結束後,在等固定的時間,然後執行。簡單來說:執行完上一個任務後再執行
舉例子
scheduledThreadPool.scheduleAtFixedRate(new TaskTest("執行調度任務3"),0, 1, TimeUnit.SECONDS); //這個就是每隔1秒,開啟一個新線程
scheduledThreadPool.scheduleWithFixedDelay(new TaskTest("第四個"),0, 3, TimeUnit.SECONDS); //這個就是上一個任務執行完,3秒後開啟一個新線程
當然實現任務調度還可以采用quartz框架來實現,更加的靈活。參考:https://www.cnblogs.com/qlqwjy/p/8723358.html
例如我系統中使用的一個ExcutorService的例子:
/** * 同步釘釘組織結構和人員的Action * * @author Administrator * */ @Namespace("/sync") public class SyncGroupAndUserAndBaseInfoAction extends DMSActionSupport { /** * serialID */ private static final long serialVersionUID = 3526083465788431949L; private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2); private static Logger logger = LoggerFactory.getLogger(SyncGroupAndUserAndBaseInfoAction.class); @Autowired private GroupAndUserService groupService; @Autowired private BaseInfoService baseInfoService; /** * 同步基本信息的數據 * * @return */ @Action(value = "syncGroupAndUser") public String syncGroupAndUser() { long startTime = System.currentTimeMillis(); logger.info("manual sync groups and users!"); String accessToken = FetchDataUtils.getAccessToken(); if (StringUtils.isBlank(accessToken)) { setPreJs("accessToken is null!"); return "js"; } String groupStr = FetchDataUtils.getGroupStr(accessToken); if (StringUtils.isBlank(groupStr)) { setPreJs("groupStr is null"); return "js"; } Set<String> dingGroupIds = FetchDataUtils.getGroupIds(groupStr);// 釘釘同步回來的組 //新開一個線程去獲取釘釘用戶和組織 batchDisposeDingGroupAndUser(dingGroupIds,groupStr,accessToken); Map<String,Object> response = new HashMap<String,Object>(); response.put("success", true); response.put("message", "success sync datas!"); setPreJs(APIUtils.getJsonResultFromMap(response)); long endTime = System.currentTimeMillis(); logger.info("同步釘釘組織結構和用戶完成-----用時:{}ms",(endTime-startTime)); return "js"; } private void batchDisposeDingGroupAndUser(final Set<String> dingGroupIds, final String groupStr,final String accessToken) { Runnable run = new Runnable() { @Override public void run() { groupService.batchDisposeGroups(groupStr, dingGroupIds); groupService.fetchAndDisposeUsers(accessToken, dingGroupIds); } }; batchTaskPool.execute(run); } }
註意:
batchDisposeDingGroupAndUser()方法的形參必須聲明為final,否則編譯錯誤。
Java ExecutorService四種線程池的簡單使用