1. 程式人生 > >Java ExecutorService四種線程池的簡單使用

Java ExecutorService四種線程池的簡單使用

html 設置 打印 null nis nbsp catch ext pre

  我們都知道創建一個線程可以繼承Thread類或者實現Runnable接口,實際Thread類就是實現了Runnable接口。

  到今天才明白後端線程的作用:我們可以開啟線程去執行一些比較耗時的操作,類似於前臺的ajax異步操作,比如說用戶上傳一個大的文件,我們可以獲取到文件之後開啟一個線程去操作該文件,但是可以提前將結果返回去,如果同步處理有可能太耗時,影響系統可用性。

1、new Thread的弊端

原生的開啟線程執行異步任務的方式:

new Thread(new Runnable() {

    @Override
    public void run() {
        
// TODO Auto-generated method stub } }).start();

弊端如下:

  • 每次new Thread新建對象性能差。
  • 線程缺乏統一管理,可能無限制新建線程,相互之間競爭,及可能占用過多系統資源導致死機或oom。
  • 缺乏更多功能,如定時執行、定期執行、線程中斷。

相比new Thread,Java提供的四種線程池的好處在於:

  • 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
  • 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
  • 提供定時執行、定期執行、單線程、並發數控制等功能。

2.Java線程池的使用

Java通過Executors提供四種線程池,分別為:
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

 可以用ExecutorService接受返回值,ExecutorService是繼承Executor的一個接口,ThreadPoolExecutor繼承自AbstractExecutorService,AbstractExecutorService實現ExecutorService接口。一般也用作一個類的靜態成員變量,所有實例共用一個ExecutorService對象。

MyThread線程類:繼承Thread並且重寫run方法,run方法中間隔一秒打印一次線程名字

package cn.qlq.threadTest;

/**
 * 原生的線程類Thread的使用方法
 * 
 * @author Administrator
 *
 */
public class MyThread extends Thread {
    /**
     * 更改線程名字
     * 
     * @param threadName
     */
    public MyThread(String threadName) {
        this.setName(threadName);
    }

    @Override
    public void run() {
        for(int i=0;i<5;i++){
            System.out.println(getName()+"-------"+i);
            try {
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

1.FixedThreadPool的用法

  創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。

創建方法:

    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

查看源碼:(使用了阻塞隊列,超過池子容量的線程會在隊列中等待)

技術分享圖片

測試代碼:

package cn.qlq.threadTest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolTest {
    
    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);
    
    public static void main(String[] args) {
        for(int i = 0;i < 3;i++){
            batchTaskPool.execute(new MyThread("mt"+i));
        }
    }
    
}

結果:

mt1-------0
mt0-------0
mt0-------1
mt1-------1
mt1-------2
mt0-------2
mt1-------3
mt0-------3
mt1-------4
mt0-------4
mt2-------0
mt2-------1
mt2-------2
mt2-------3
mt2-------4

解釋:

  池子容量大小是2,所以mt0和mt1可以加入到線程池子中,mt2只是暫時的加到等待隊列,等mt0或者mt1執行完成之後從隊列移除之後mt2就有機會執行。。。。。。。。

定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

2.CachedThreadPool

  創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

創建方法:

private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();

查看源碼:(使用了同步隊列)

技術分享圖片

測試代碼:

package cn.qlq.threadTest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolTest {
    
    private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();
    
    public static void main(String[] args) {
        for(int i = 0;i < 3;i++){
            batchTaskPool.execute(new MyThread("mt"+i));
        }
    }
    
}

結果:

mt0-------0
mt1-------0
mt2-------0
mt0-------1
mt1-------1
mt2-------1
mt0-------2
mt1-------2
mt2-------2
mt1-------3
mt0-------3
mt2-------3
mt1-------4
mt0-------4
mt2-------4

3.SingleThreadExecutor用法

  創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。類似於單線程執行的效果一樣。

創建方法:

    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();

查看源碼;使用的阻塞隊列

技術分享圖片

測試代碼:

package cn.qlq.threadTest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorTest {
    
    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();
    
    public static void main(String[] args) {
        for(int i = 0;i < 3;i++){
            batchTaskPool.execute(new MyThread("mt"+i));
        }
    }
    
}

結果:

mt0-------0
mt0-------1
mt0-------2
mt0-------3
mt0-------4
mt1-------0
mt1-------1
mt1-------2
mt1-------3
mt1-------4
mt2-------0
mt2-------1
mt2-------2
mt2-------3
mt2-------4

4.ScheduledThreadPool用法------可以實現任務調度功能

  創建一個定長線程池(會指定容量初始化大小),支持定時及周期性任務執行。可以實現一次性的執行延遲任務,也可以實現周期性的執行任務。

創建方法:

    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

查看源碼:(使用了延遲隊列)

技術分享圖片

技術分享圖片

測試代碼:

package cn.qlq.threadTest;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolTest {

    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            //第一次執行是在3s後執行(延遲任務)
            batchTaskPool.schedule(new MyThread("my" + i), 3, TimeUnit.SECONDS);
            //第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位
            batchTaskPool.scheduleAtFixedRate(new MyThread("my" + i), 3,7, TimeUnit.SECONDS);
            //第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位
            batchTaskPool.scheduleWithFixedDelay(new MyThread("my" + i), 3,5, TimeUnit.SECONDS);
        }
    }

}

schedule是一次性的任務,可以指定延遲的時間。
scheduleAtFixedRate已固定的頻率來執行某項計劃(任務)
scheduleWithFixedDelay相對固定的延遲後,執行某項計劃 (這個就是第一個任務執行完5s後再次執行,一般用這個方法任務調度)

關於二者的區別:

  scheduleAtFixedRate :這個是按照固定的時間來執行,簡單來說:到點執行
  scheduleWithFixedDelay:這個呢,是等上一個任務結束後,在等固定的時間,然後執行。簡單來說:執行完上一個任務後再執行

舉例子

scheduledThreadPool.scheduleAtFixedRate(new TaskTest("執行調度任務3"),0, 1, TimeUnit.SECONDS); //這個就是每隔1秒,開啟一個新線程
scheduledThreadPool.scheduleWithFixedDelay(new TaskTest("第四個"),0, 3, TimeUnit.SECONDS); //這個就是上一個任務執行完,3秒後開啟一個新線程

當然實現任務調度還可以采用quartz框架來實現,更加的靈活。參考:https://www.cnblogs.com/qlqwjy/p/8723358.html

例如我系統中使用的一個ExcutorService的例子:

/**
 * 同步釘釘組織結構和人員的Action
 * 
 * @author Administrator
 *
 */
@Namespace("/sync")
public class SyncGroupAndUserAndBaseInfoAction extends DMSActionSupport {

    /**
     * serialID
     */
    private static final long serialVersionUID = 3526083465788431949L;
    
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

    private static Logger logger = LoggerFactory.getLogger(SyncGroupAndUserAndBaseInfoAction.class);

    @Autowired
    private GroupAndUserService groupService;

    @Autowired
    private BaseInfoService baseInfoService;

    /**
     * 同步基本信息的數據
     * 
     * @return
     */
    @Action(value = "syncGroupAndUser")
    public String syncGroupAndUser() {
        long startTime = System.currentTimeMillis();
        logger.info("manual sync groups and users!");

        String accessToken = FetchDataUtils.getAccessToken();
        if (StringUtils.isBlank(accessToken)) {
            setPreJs("accessToken is null!");
            return "js";
        }

        String groupStr = FetchDataUtils.getGroupStr(accessToken);
        if (StringUtils.isBlank(groupStr)) {
            setPreJs("groupStr is null");
            return "js";
        }
        
        
        Set<String> dingGroupIds = FetchDataUtils.getGroupIds(groupStr);// 釘釘同步回來的組
        //新開一個線程去獲取釘釘用戶和組織
        batchDisposeDingGroupAndUser(dingGroupIds,groupStr,accessToken);
        

        Map<String,Object> response = new HashMap<String,Object>();
        response.put("success", true);
        response.put("message", "success sync datas!");
        setPreJs(APIUtils.getJsonResultFromMap(response));
        
        long endTime = System.currentTimeMillis();
        logger.info("同步釘釘組織結構和用戶完成-----用時:{}ms",(endTime-startTime));
        return "js";
    }

    private void batchDisposeDingGroupAndUser(final Set<String> dingGroupIds, final String groupStr,final String accessToken) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                groupService.batchDisposeGroups(groupStr, dingGroupIds);
                groupService.fetchAndDisposeUsers(accessToken, dingGroupIds);                
            }
        };
        batchTaskPool.execute(run);
    }
    
}

註意:

  batchDisposeDingGroupAndUser()方法的形參必須聲明為final,否則編譯錯誤。

Java ExecutorService四種線程池的簡單使用