1. 程式人生 > >java 併發——執行緒

java 併發——執行緒

一、前言

前一篇文章總結了對 java 併發中的內建鎖的理解,這篇文章來說說執行緒 ,併發與執行緒總有剪不斷理還亂的關係。關於 java 執行緒的基本概念、執行緒與程序的關係以及如何建立執行緒,想必大家都很清楚了。之前總結過,存疑新同學的傳送門:Java 多執行緒

二、執行緒框架

執行緒的三種建立方式:

我們知道,java 執行緒的三種建立方式:

  1. 繼承自 Thread 類建立執行緒;
new Thread(){
    @Override
    public void run() {
        super.run();
    }
}.start()
  1. 實現Runnable介面建立執行緒;
new Thread(new Runnable() {
    @Override
    public void run() {
        
    }
}).start();
  1. 使用 Callable 和 Future 建立執行緒
new Thread(new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return null;
        }
    })
).start();

看原始碼

先看 Runnable 介面原始碼:

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

函式式介面,只有一個 run() 方法。
再看 Thread 類原始碼,我們發現,Thread 類實現了 Runnable 介面:

public
class Thread implements Runnable {
    
    ...

    /* What will be run. */
    private Runnable target;

    ...

    /* Java thread status for tools,
     * initialized to indicate thread 'not yet started'
     */
    private volatile int threadStatus = 0;

    ...

    /**
     * Causes this thread to begin execution; the Java Virtual Machine
     * calls the <code>run</code> method of this thread.
     * <p>
     * The result is that two threads are running concurrently: the
     * current thread (which returns from the call to the
     * <code>start</code> method) and the other thread (which executes its
     * <code>run</code> method).
     * <p>
     * It is never legal to start a thread more than once.
     * In particular, a thread may not be restarted once it has completed
     * execution.
     *
     * @exception  IllegalThreadStateException  if the thread was already
     *               started.
     * @see        #run()
     * @see        #stop()
     */
    public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

我們看到,Thread 類有一個 Runnable 型別的 target 域。Thread 的 run 方法就是呼叫的 target 的 run 方法。而啟動執行緒則需要呼叫執行緒的 start 方法。在 Thread 類中,還有一個 volatile 修飾的 threadStatus 域,用來表示執行緒的狀態,初始值為0,當我們重複呼叫執行緒的 start 方法時,會丟擲 java.lang.IllegalThreadStateException 的異常。

當我們需要獲取執行緒中方法執行的返回值時,使用 FutureTask 和 Callable 的方式建立。看 Thread 原始碼可知,Thread 類構造方法可傳入 Runnable 物件,方式三,這裡傳入 FutureTask 物件,可以猜想: FutureTask 一定是實現了 Runnable 介面。而 FutureTask 的構造方法又傳入了 Callable 物件,我們重寫了 call 方法。我們看看相關類的原始碼,梳理一下。

java.lang.Runnable

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

java.util.concurrent.Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

java.util.concurrent.Callable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

java.util.concurrent.RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

java.util.concurrent.FutureTask

public class FutureTask<V> implements RunnableFuture<V> {
    
    ...

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;

    ...

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }


    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }


    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    /**
     * Executes the computation without setting its result, and then
     * resets this future to initial state, failing to do so if the
     * computation encounters an exception or is cancelled.  This is
     * designed for use with tasks that intrinsically execute more
     * than once.
     *
     * @return {@code true} if successfully run and reset
     */
    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

}

java.util.concurrent.Executors

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    /**
     * A callable that runs given task and returns given result
     */
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

結論:

  • FutureTask 實現了 RunnableFuture 介面,而 RunnableFuture 繼承了 Runnable 和 Future 介面。 這樣應證了之前的猜想 —— FutureTask 實現了 Runnable 介面。
  • FutureTask 有一個 Callable 的域 callable。FutureTask 有兩個構造方法,一個傳入 Callable ,賦值給了域, 另一個一個傳入 Runnable, 最終呼叫 Executors.callable() 方法,返回了一個 Callable 物件,並且 Call 方法,呼叫的就是 run 方法。再講 callable 物件賦值給了域。且兩個構造方法的引數都使用了 @NotNull 修飾。
  • FutureTask 最後呼叫的 run 方法,實際又是呼叫的域 Callable 物件的 call 方法。這就是面向物件中的多型的思想,熟悉設計模式的同學也應該能看出來,這裡真是介面卡模式的實現。最後通過 FutureTask 的 get 方法可以獲取執行緒體執行後的返回值。
    使用形式:
new Thread(new FutureTask(new Callable(){
    @override
    public void call(){
        xxx
    }
})).start();

或者

new Thread(new FutureTask(new Runnable(){
    @override
    public void run(){
        xxx
    }
},"hello,world")).start();

看過原始碼之後,我感覺這第二種方式一般不建議寫,除非返回的結果跟執行緒體執行無關。FutureTask 表示的計算如果是通過 Callable 來實現額,相當於一種可生成結果的 Runnable ,並且可以處於 3 中狀態:執行等待、正在執行、執行完成。 其中執行完成包括可能的結束方式,正常執行,由於取消而結束和由於異常而結束等。當FutureTask處於執行完成狀態後,它會永遠停止在這個狀態。

需要注意的是: Future.get 這個方法取決於任務的狀態。如果任務已經完成,那麼 get 立即返回結果。否則 get 將阻塞知道任務進入完成狀態,然後返回結果或者丟擲異常。 FutureTask 保證將計算結果從執行計算的執行緒安全地釋出到獲取這個結果的執行緒。

三、執行緒池

使用執行緒池來管理執行緒

上面說的建立和啟動執行緒的本質幾乎一樣:new Thread(Runnable r).start() ,通過這種方式建立的執行緒稱之為“野執行緒”,當執行緒體執行完之後執行緒就銷燬了,再加上執行緒的建立,銷燬和執行緒的排程,都是需要系統資源的開銷。想象一下,在高併發場景下,不對執行緒數量加以控制,無限制建立執行緒,當達到系統性能的閾值,系統必然崩潰。所以建立野執行緒的這種方式實際專案中一般不用,而是使用執行緒池來管理執行緒。

執行緒池的優點:

  • 可重用存在的執行緒,減少物件建立、消亡的開銷,效能佳
  • 可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞
  • 提供定時執行、定期執行、單執行緒、併發數控制等功能

執行緒池框架

從網上扒來一張 java 執行緒池的框架圖:

java 類庫中,任務執行的主要抽象,不是 Thread, 而是 Executor , 看看 Executor 介面:

package java.util.concurrent;

public interface Executor {
    void execute(java.lang.Runnable runnable);
}

ExecutorService 介面繼承了 Executor 介面,擴充了一些方法。執行緒池的核心實現類是 ThreadPoolExecutorScheduledThreadPoolExecutor,前者用來執行被提交的任務,而ScheduledThreadPoolExecutor 可以在給定的延遲後執行任務,或者定期執行命令。

ThreadPoolExecutor 提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

引數最多的構造方法的引數說明:

  • corePoolSize:核心執行緒數。
  • maximumPoolSize:最大執行緒數。
  • keepAliveTime:執行緒存活時間。當執行緒數大於core數,那麼超過該時間的執行緒將會被終結。
  • unit:keepAliveTime的單位。java.util.concurrent.TimeUnit類存在靜態靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS
  • workQueue:Runnable的阻塞佇列。若執行緒池已經被佔滿,則該佇列用於存放無法再放入執行緒池中的Runnable
  • threadFactory -執行建立新執行緒的工廠
  • handler -處理程式時因為達到執行緒邊界和佇列容量,使用的執行受阻

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor ,同樣有四個類似的構造方法,就不列舉了。

Executors 是一個工廠類,提供了一些靜態的方法操作執行緒池。通常建立執行緒池,我們不直接用呼叫 ThreadPoolExecutorScheduledThreadPoolExecutor 的構造方法,而是通過 Executors 類的五個靜態工廠方法建立。

newFixedThreadPool(...)
newSingleThreadExecutor(...)
newCachedThreadPool(...)
newScheduledThreadPool(...)
newSingleThreadScheduledExecutor()



newSingleThreadExecutor
建立單執行緒的執行緒池 這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。
返回單執行緒的Executor,將多個任務交給此Exector時,這個執行緒處理完一個任務後接著處理下一個任務,若該執行緒出現異常,將會有一個新的執行緒來替代。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

說明:LinkedBlockingQueue會無限的新增需要執行的Runnable。


newFixedThreadPool
建立一個包含指定數目執行緒的執行緒池,如果任務數量多於執行緒數目,那麼沒有執行的任務必須等待,直到有任務完成為止。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大小。任務執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}



newCachedThreadPool
建立一個可快取的執行緒池,執行緒池可以自動的擴充套件執行緒池的容量,核心執行緒數量為0.如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

SynchronousQueue是個特殊的佇列。 SynchronousQueue佇列的容量為0。當試圖為SynchronousQueue新增Runnable,則執行會失敗。只有當一邊從SynchronousQueue取資料,一邊向SynchronousQueue新增資料才可以成功。SynchronousQueue僅僅起到資料交換的作用,並不儲存執行緒。但newCachedThreadPool()方法沒有執行緒上限。Runable新增到SynchronousQueue會被立刻取出。
根據使用者的任務數建立相應的執行緒來處理,該執行緒池不會對執行緒數目加以限制,完全依賴於JVM能建立執行緒的數量,可能引起記憶體不足。


newScheduledThreadPool
建立一個指定大小的定時任務排程的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}



newSingleThreadScheduledExecutor
建立一個單執行緒的定時任務排程執行緒池,此執行緒池支援定時以及週期性執行任務的需求。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

四、執行緒輔助類

閉鎖 CountDownLatch

CountDownLatch 的概念

想起了多執行緒下載,將整個檔案分成了多段,然後多個執行緒提供下載,每個執行緒下載一段。當所有的下載執行緒都執行完之後,主執行緒通知使用者,檔案下載完成了。這裡存在一個問題,主執行緒如何知道所有的下載執行緒都執行完了?
解決思路有很多種,比如我們可以定義一個計數的變數,初始值為下載執行緒的數量,每個執行緒執行完,計數變數值 -1,計數器的值為 0 ,我們就知道所有的下載執行緒都執行完了。這裡,我們可能需要對計數器進行相應的同步操作,確保任何時候讀取它的狀態都是正確的。
幸運的是,java 提供了一個類似計算器的工具類,可以達到此目的。——CountDownLatch 類。
CountDownLatch 位於 java.util.concurrent 包下。是一個同步工具類,用來協調多個執行緒之間的同步,
CountDownLatch 能夠使一個執行緒在等待另外一些執行緒完成各自工作之後,再繼續執行。使用一個計數器進行實現。計數器初始值為執行緒的數量。當每一個執行緒完成自己任務後,計數器的值就會減一。當計數器的值為0時,表示所有的執行緒都已經完成了任務,然後在CountDownLatch上等待的執行緒就可以恢復執行任務。

CountDownLatch 的用法

CountDownLatch類只提供了一個構造器:

public CountDownLatch(int count)  //引數count為計數值

CountDownLatch 類中有 3 個重要方法:

  1. public void await() throws InterruptedException //呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行
  2. public boolean await(long timeout, TimeUnit unit) throws InterruptedException //和await()類似,只不過等待一定的時間後count值還沒變為0的話就會繼續執行
  3. public void countDown() //將count值減1

CountDownLatch 用法舉例:

public class Test {
    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);

        new Thread(){
            public void run() {
                try {
                    System.out.println("子執行緒"+Thread.currentThread().getName()+"正在執行");
                    Thread.sleep(2000);
                    System.out.println("子執行緒"+Thread.currentThread().getName()+"執行完畢");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread(){
            public void run() {
                try {
                    System.out.println("子執行緒"+Thread.currentThread().getName()+"正在執行");
                    Thread.sleep(3000);
                    System.out.println("子執行緒"+Thread.currentThread().getName()+"執行完畢");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        try {
            System.out.println("等待2個子執行緒執行完畢...");
            latch.await();
            System.out.println("2個子執行緒已經執行完畢");
            System.out.println("繼續執行主執行緒");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

執行結果:

執行緒Thread-0正在執行
執行緒Thread-1正在執行
等待2個子執行緒執行完畢...
執行緒Thread-0執行完畢
執行緒Thread-1執行完畢
2個子執行緒已經執行完畢
繼續執行主執行緒

CountDownLatch的不足

CountDownLatch是一次性的,計數器的值只能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當CountDownLatch使用完畢後,它不能再次被使用。

柵欄 CyclicBarrier

CyclicBarrier的概念

柵欄(有的書籍中稱為同步屏障)CyclicBarrier,字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫 await 方法告訴 CyclicBarrier 我已經到達了屏障,然後當前執行緒被阻塞。

CyclicBarrier的用法

CyclicBarrier類位於java.util.concurrent包下,CyclicBarrier提供2個構造器:

public CyclicBarrier(int parties, Runnable barrierAction)
 
public CyclicBarrier(int parties)

引數parties指讓多少個執行緒或者任務等待至barrier狀態;引數barrierAction為當這些執行緒都達到barrier狀態時會執行的內容。

CyclicBarrier 最重要的方法就是 await 方法,它有2個過載版本:

public int await() throws InterruptedException, BrokenBarrierException

public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException

第一個版本比較常用,用來掛起當前執行緒,直至所有執行緒都到達barrier狀態再同時執行後續任務;
第二個版本是讓這些執行緒等待至一定的時間,如果還有執行緒沒有到達barrier狀態就直接讓到達barrier的執行緒執行後續任務。

CountDownLatch 用法舉例:

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("當前執行緒"+Thread.currentThread().getName());   
            }
        });
 
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("執行緒"+Thread.currentThread().getName()+"正在寫入資料...");
            try {
                Thread.sleep(5000);      //以睡眠來模擬寫入資料操作
                System.out.println("執行緒"+Thread.currentThread().getName()+"寫入資料完畢,等待其他執行緒寫入完畢");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有執行緒寫入完畢,繼續處理其他任務...");
        }
    }
}

執行了兩次結果分別如下:


從上面輸出結果可以看出,每個寫入執行緒執行完寫資料操作之後,就在等待其他執行緒寫入操作完畢。
當所有執行緒執行緒寫入操作完畢之後,進行額外的其他操作為 CyclicBarrie 提供 Runnable 引數。當四個執行緒都到達barrier狀態後,會從四個執行緒中選擇一個執行緒去執行Runnable。然後所有執行緒就繼續進行後續的操作了。

另外需要注意的是:CyclicBarrier 是可以重用的。

訊號量 Semaphore

Semaphore 的概念

Semaphore翻譯成字面意思為 訊號量,Semaphore可以控同時訪問的執行緒個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。

Semaphore 的用法

Semaphore類位於java.util.concurrent包下,它提供了2個構造器:

public Semaphore(int permits) {          //引數permits表示許可數目,即同時可以允許多少執行緒進行訪問
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {    //這個多了一個引數fair表示是否是公平的,即等待時間越久的越先獲取許可
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore 類中比較重要的幾個方法,首先是acquire()、release()方法:

public void acquire() throws InterruptedException {  }     //獲取一個許可
public void acquire(int permits) throws InterruptedException { }    //獲取permits個許可
public void release() { }          //釋放一個許可
public void release(int permits) { }    //釋放permits個許可

acquire()用來獲取一個許可,若無許可能夠獲得,則會一直等待,直到獲得許可。
release()用來釋放許可。注意,在釋放許可之前,必須先獲獲得許可。
這4個方法都會被阻塞,如果想立即得到執行結果,可以使用下面幾個方法:

public boolean tryAcquire()    //嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException  //嘗試獲取一個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false
public boolean tryAcquire(int permits) //嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException //嘗試獲取permits個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false

另外還可以通過 availablePermits() 方法得到可用的許可數目:

使用舉例:
假若一個工廠有5臺機器,但是有8個工人,一臺機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續使用。那麼我們就可以通過Semaphore來實現:

public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人數
        Semaphore semaphore = new Semaphore(5); //機器數目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
 
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
 
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"佔用一個機器在生產...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"釋放出機器");
                semaphore.release();           
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

執行結果:

五、執行緒輔助類總結

  1. FutureTask 能夠實現某個執行緒A等待另一個執行緒執行完任務,然後執行緒A再執行。並且執行緒A可以獲取另一個執行緒執行的結果。
  2. CountDownLatch 一般用於某個執行緒A等待若干個其他執行緒執行完任務之後,它才執行。
  3. CyclicBarrier 一般用於一組執行緒互相等待至某個狀態,然後這一組執行緒再同時執行;
  4. CountDownLatch和CyclicBarrier都能夠實現執行緒之間的等待,只不過它們側重點不同。另外,CountDownLatch 不能夠重用的,而 CyclicBarrier 是可以重用的。
  5. Semaphore 其實和鎖有點類似,它一般用於控制對某組資源的訪問許可權。



    參考資料:
    《java併發程式設計實戰》
    Java併發程式設計:CountDownLatch、CyclicBarrier和 Semaphore
    併發工具類(一)等待多執行緒完成的CountDownLatch
    併發工具類(二)同步屏障CyclicBarrier
    併發工具類(三)控制併發執行緒數的Semaphore