Java 執行緒工具類
1. CountDownLatch
CountDownLatch
首先定義任務次數,並呼叫await()
方法等待任務完成。呼叫countDown()
方法表明已經完成一項任務,當任務全部完成後,繼續await()
方法後的任務。
public class CountDownLatchThread extends Thread { CountDownLatch latch; public CountDownLatchThread(CountDownLatch latch) { this.latch = latch; } public void run() { try { System.out.println(currentThread().getName() + " start"); Thread.sleep(1000); latch.countDown(); System.out.println(currentThread().getName() + " end"); } catch (InterruptedException e) { } } public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); for (int i = 0; i < 2; i++) { new CountDownLatchThread(latch).start(); } System.out.println(Thread.currentThread().getName() + " start"); latch.await(); System.out.println(Thread.currentThread().getName() + " end"); } }
輸出
main start
Thread-0 start
Thread-1 start
Thread-0 end
Thread-1 end
main end
2. CyclicBarrier
CyclicBarrier
會等待指定執行緒的await()
方法呼叫結束。如果指定了三個執行緒CyclicBarrier(3)
,但只有2個執行緒呼叫了await()
方法,這兩個執行緒將一直等待下去。
public class CyclicBarrierThread extends Thread { CyclicBarrier barrier; public CyclicBarrierThread(CyclicBarrier barrier) { this.barrier = barrier; } public void run() { try { System.out.println(currentThread().getName() + " start"); Thread.sleep(1000); barrier.await(); System.out.println(currentThread().getName() + " end"); } catch (Exception e) { } } public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { new CyclicBarrierThread(barrier).start(); } } }
輸出
Thread-0 start
Thread-2 start
Thread-1 start
Thread-2 end
Thread-1 end
Thread-0 end
可以呼叫CyclicBarrier
的另一個建構函式CyclicBarrier(int, Runnable)
,在await()
方法後最先呼叫Runnable.run()
方法。
public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { System.out.println("In Runnable"); } }); for (int i = 0; i < 3; i++) { new CyclicBarrierThread(barrier).start(); } }
輸出
Thread-1 start
Thread-0 start
Thread-2 start
In Runnable
Thread-1 end
Thread-0 end
Thread-2 end
3. ThreadPoolExecutor執行緒池
執行緒池處理流程
- 核心執行緒池裡執行緒是否都在執行任務。如果不是,則建立一個新的工作執行緒來執行任務。
- 執行緒池工作佇列是否已滿。如果工作佇列沒有滿,則把新提交任務儲存在這個這個工作佇列。
- 執行緒池的執行緒是否都處於工作狀態。如果沒有,建立一個新的工作執行緒來執行任務。
- 飽和策略處理任務。
ThreadPoolExecutor
建構函式
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize
(執行緒池基本大小),建立新的執行緒來執行任務,直到執行緒池中數量大於基本大小。 -
workQueue
(任務佇列),用來儲存等待執行的阻塞佇列。ArrayBlockingQueue
,基於陣列結構的阻塞佇列LinkedBlockingQueue
,基於連結串列結構的阻塞佇列SynchronousQueue
,不儲存元素的阻塞佇列,每個插入操作必須等待另一個移除操作。PriorityBlockingQueue
,一個擁有優先順序的阻塞佇列。
-
maximumPoolSize
(執行緒池最大數量),執行緒池允許建立的最大數量,如果佇列已滿,並且建立的執行緒小於最大執行緒數,則執行緒池建立新的執行緒執行任務。 -
keepAliveTime
(執行緒活動保持時間),執行緒池空閒時,保持存活的時間。 -
TimeUnit
(執行緒活動保持時間的單位),可選DAYS
(天)、HOURS
(小時)、MINUTES
(分鐘)、SECONDS
(秒)等。 -
threadFactory
,設定建立執行緒的工廠public interface ThreadFactory { Thread newThread(Runnable r); }
-
handler
(飽和策略),當佇列和執行緒池都滿了,必須採取策略處理新提交任務AbortPolicy
,直接丟擲異常。CallerRunsPolicy
,在呼叫者所線上程執行任務。DiscardOldestPolicy
,丟棄佇列裡最近的一個任務。DiscardPolicy
,不處理,丟棄掉。
向執行緒池提交任務
execute(Runnable command)
,提交後不需要返回值,無法判斷任務是否執行成功。submit(Runnable task)
,提交後需要返回值,並通過Future
的get()
方法獲取返回值。
4. Executors框架
Executors
框架生成執行緒池
-
FixedThreadPool,建立固定執行緒數的執行緒池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
-
SingleThreadExecutor,建立單個執行緒的執行緒池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
-
CachedThreadPool,大小無界的執行緒池,適用於執行很多的短期非同步任務。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
-
ScheduledThreadPool,適合於多個後臺執行緒執行週期任務
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }