1. 程式人生 > 實用技巧 >從零開始瞭解多執行緒 之 深入淺出AQS -- 下(Tools&CountDownLatch&CyclicBarrier&Semaphore)

從零開始瞭解多執行緒 之 深入淺出AQS -- 下(Tools&CountDownLatch&CyclicBarrier&Semaphore)

上一篇文章講到了AQS各種鎖&同步佇列的內容,這一次繼續會牽扯到AQS與及各種工具類的知識,
Tools&CountDownLatch&CyclicBarrier&Semaphore原理與應用,Atomic&Unsafe魔法類詳解

1.Semaphore

Semaphore字面意思是訊號量的意思,它的作用是控制訪問特定資源的執行緒數目

Semaphore使用(基本應用場景: 資源訪問,服務限流)

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
    permits表示許可執行緒的數量
    fair表示公平性,如果這個設為true的話,下次執行的執行緒會是等待最久的執行緒
   
public void acquire() throws InterruptedException
public void release()
tryAcquire(long timeout, TimeUnit unit)
    acquire()表示阻塞並獲取許可
    release()表示釋放許可

程式碼實現

public class SemaphoreSample {
    public static void main(String[] args) {
        // 允許兩個資源 同一時間是有兩個執行緒能進來
        Semaphore semaphore = new Semaphore(2);
        for (int i=0;i<5;i++){
            new Thread(new Task(semaphore,"semaphore-test-"+i)).start();
        }
    }
    static class Task extends Thread{
        Semaphore semaphore;
        public Task(Semaphore semaphore,String name){
            this.semaphore = semaphore;
            this.setName(name);
        }
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+":acquire() at time:"+System.currentTimeMillis());
                Thread.sleep(3000);
                    semaphore.release();
                System.out.println(Thread.currentThread().getName()+":release() at time:"+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  • Semaphore原始碼解釋

    假如當前資源有1,2 ,總共資源有2,這時候執行緒3進來了 (Semaphore(2) 兩個資源被t1,t2獲取了,還未釋放,t3進來)

  • 建立一個非公平鎖

        /**
         * 建立非公平鎖
         * @param permits
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
  • 非公平鎖例項化

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        /**
         * 建立非公平鎖的時候 設定 狀態 (呼叫父類方法)
         *   Sync(int permits) {
         *       setState(permits);
         *   }
         * 上一個文章中說道 permits 在ReentrantLock 中這個代表重入次數,
         * Semaphore 初始化的時候把設定的值傳了進來 設定到了 state中 代表初始化狀態值(已重入次數?),
         *
         * 檢視 semaphore.acquire() 方法 其中呼叫了 sync.acquireSharedInterruptibly(1) 預設傳了一個 1 進來,
         * 代表每次呼叫acquire方法將獲取一個資源
         *
         *
         * 以共享模式獲取,如果中斷則中止。
         * 首先檢查中斷狀態,然後至少呼叫一次tryAcquireShared,成功後返回。否則執行緒已排隊,可能會重複阻塞和解除阻塞
         * 呼叫 tryAcquireShared 直到成功或執行緒中斷。
         *
         *  public final void acquireSharedInterruptibly(int arg)
         *         throws InterruptedException {
         *     if (Thread.interrupted()) {
         *         throw new InterruptedException();
         *     }
         *     if (tryAcquireShared(arg) < 0) {
         *         doAcquireSharedInterruptibly(arg);
         *     }
         *  }
         *
         * @param permits
         */
        NonfairSync(int permits) {
            super(permits);
        }
    
        /**
         * 這個方法是在 AQS抽象類中 doAcquireSharedInterruptibly 呼叫的之類的方法
         * 
         * 共享式:共享式地獲取同步狀態。
         * 
         * 對於獨佔式同步元件來講,同一時刻只有一個執行緒能獲取到同步狀態,
         *    其他執行緒都得去排隊等待,其待重寫的嘗試獲取同步狀態的方法tryAcquire返回值為boolean
         *    
         * 對於共享式同步元件來講,同一時刻可以有多個執行緒同時獲取到同步狀態,這也是“共享”的意義所在。(Semaphore中便是這個)
         *
         * 1.當返回值大於0時,表示獲取同步狀態成功,同時還有剩餘同步狀態可供其他執行緒獲取;
         * 
         * 2.當返回值等於0時,表示獲取同步狀態成功,但沒有可用同步狀態了;
         *
         *3.當返回值小於0時,表示獲取同步狀態失敗。
         *
         * @param acquires
         * @return
         */
        @Override
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    
  • semaphore.acquire() 呼叫獲取資源

     -acquire
     public void acquire() throws InterruptedException {
          sync.acquireSharedInterruptibly(1);
      }
    
    
    -- acquireSharedInterruptibly
     /**
     * 以共享模式獲取,如果中斷則中止。
     * 首先檢查中斷狀態,然後至少呼叫一次{@link{tryAcquireShared},成功後返回。否則執行緒已排隊,可能會重複阻塞和解除阻塞,
     * 呼叫{@link#tryAcquireShared}直到成功或執行緒中斷。
     *
     *  @param arg獲取引數。 當前這裡代表 獲取資源值
     * 你喜歡。
     * @如果當前執行緒被中斷,則引發InterruptedException
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // 嘗試去獲取資源(這個方法具體實現再具體類中,公平鎖和非公平鎖裡面的實現邏輯不通)< 0 表示獲取失敗,
        // 獲取失敗再繼續後面的邏輯 doAcquireSharedInterruptibly
        if (tryAcquireShared(arg) < 0) {
            doAcquireSharedInterruptibly(arg);
        }
    }
    
    -- tryAcquireShared
    @Override
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
    
    -- nonfairTryAcquireShared
    /**
     * semaphore中非公平鎖獲取資源
     * @param acquires
     * @return
     */
    final int nonfairTryAcquireShared(int acquires) {
        // 自旋獲取
        for (;;) {
            // 獲取當前可用的資源數(最大值是初始化設定好的 permits)
            int available = getState();
            // 獲取之後剩餘的資源值數
            int remaining = available - acquires;  //( 比如例項程式碼中 初始值2,獲取一次:2-1)
            
            // 當返回值小於0時,表示獲取同步狀態失敗。
            // 當返回值大於0時,表示獲取同步狀態成功,同時還有剩餘同步狀態可供其他執行緒獲取
            // 數量不夠的話直接獲取失敗,或者cas演算法修改值成功就返回 剩餘值
            if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                return remaining;
        }
    }
    
    -- doAcquireSharedInterruptibly
    
     /**
     * TODO 假如當前資源有1,2 ,總共資源有2,這時候執行緒3進來了 (Semaphore(2) 兩個資源被t1,t2獲取了,還未釋放,t3進來)
     *
     * 獲取 arg 數量的共享資源
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 新增共享等待佇列節點 這裡面的內容和上一個文章類似 共享方式入隊 (上一篇文章中說到的事以獨佔方式入隊)
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            // 自旋操作直到成功
            for (;;) {
                //當前節點 獲取前一個節點 當前是t3  找到前驅節點(因為t3是首先進隊的,所以它的前驅節點是head)
                final Node p = node.predecessor();
    
                // 如果前驅節點是頭結點 嘗試獲取資源(就是等待過程中自己是隊首的才能去獲取資源) (t3是頭結點 所以再一次進去回去資源)
                if (p == head) {
                    // 嘗試獲取資源 (如果 t1,t2還未釋放,這裡還是返回-1 或其他的 (失敗))
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 如果t1或者t2釋放了 t3獲取資源成功
                        // 把node節點設定成head節點,且Node.waitStatus->Node.PROPAGATE()
                        setHeadAndPropagate(node, r);
                        // help GC
                        p.next = null;
                        failed = false;
                        return;
                    }
                }
                // 判斷當前節點時候要進行阻塞等待 (上一篇文章中中應該有介紹到),阻塞的話就LockSupport.park(this) 阻塞當前執行緒
                // 阻塞後 迴圈不跑了 需要等待喚醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) {
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }
    
    --setHeadAndPropagate(把node節點設定成head節點,且Node.waitStatus->Node.PROPAGATE)
    
    /**
     * 把node節點設定成head節點,且Node.waitStatus->Node.PROPAGATE()
     */
    private void setHeadAndPropagate(Node node, int propagate) {
    
        //h用來儲存舊的head節點
        Node h = head;
    
        //head引用指向node節點
        setHead(node);
    
        /* 這裡意思有兩種情況是需要執行喚醒操作
         * 1.propagate > 0 表示呼叫方指明瞭後繼節點需要被喚醒
         * 2.頭節點後面的節點需要被喚醒(waitStatus<0),不論是老的頭結點還是新的頭結點
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //node是最後一個節點或者 node的後繼節點是共享節點
            if (s == null || s.isShared()){
                /* 如果head節點狀態為SIGNAL,喚醒head節點執行緒,重置head.waitStatus->0
                 * head節點狀態為0(第一次新增時是0),
                 * 設定head.waitStatus->Node.PROPAGATE表示狀態需要向後繼節點傳播
                 */
                doReleaseShared();
            }
        }
    }
    
    --doReleaseShared(把當前結點設定為SIGNAL或者PROPAGATE) 
    (資源釋放的時候呼叫到這個,semaphore.acquire()中呼叫來到這裡了 去喚醒其他執行緒?)
    
    /**
     * 把當前結點設定為SIGNAL或者PROPAGATE
     * 喚醒head.next(B節點),B節點喚醒後可以競爭鎖,成功後head->B,然後又會喚醒B.next,一直重複直到共享節點都喚醒
     * head節點狀態為SIGNAL,重置head.waitStatus->0,喚醒head節點執行緒,喚醒後執行緒去競爭共享鎖
     * head節點狀態為0,將head.waitStatus->Node.PROPAGATE傳播狀態,表示需要將狀態向後繼節點傳播
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//head是SIGNAL狀態
                
                    
                    /* head狀態是SIGNAL,重置head節點waitStatus為0,這裡不直接設為Node.PROPAGATE,
                     * 是因為unparkSuccessor(h)中,如果ws < 0會設定為0,所以ws先設定為0,再設定為PROPAGATE
                     * 這裡需要控制併發,因為入口有setHeadAndPropagate跟release兩個,避免兩次unpark
                     */
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; //設定失敗,重新迴圈
                    /* head狀態為SIGNAL,且成功設定為0之後,喚醒head.next節點執行緒
                     * 此時head、head.next的執行緒都喚醒了,head.next會去競爭鎖,成功後head會指向獲取鎖的節點,
                     * 也就是head發生了變化。看最底下一行程式碼可知,head發生變化後會重新迴圈,繼續喚醒head的下一個節點
                     */
                    unparkSuccessor(h);
                    /*
                     * 如果本身頭節點的waitStatus是出於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。
                     * 意味著需要將狀態向後一個節點傳播
                     */
                }
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                    continue;                // loop on failed CAS
                }
            }
            if (h == head) //如果head變了,重新迴圈
            {
                break;
            }
        }
    }
    
    --shouldParkAfterFailedAcquire (判斷當前節點時候要進行阻塞等待)
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //判斷是否應該阻塞
        // 獲取前驅節點等待狀態
        int ws = pred.waitStatus;
    
        // 此狀態是可以被喚醒的 可以去獲取鎖
        if (ws == Node.SIGNAL)
            /*
             * 若前驅結點的狀態是SIGNAL,意味著當前結點可以被安全地park
             */
            return true;
        if (ws > 0) {
            /* 狀態是 1 被移除,並且繼續檢查其他節點(這裡迴圈從後往前一個個節點判斷是否需要被移除),如果都是取消狀態 一併移除
             * 前驅節點狀態如果被取消狀態,將被移除出佇列
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* 同步佇列不會出現 CONDITION
             * 所以 當前驅節點waitStatus為 0 or PROPAGATE(可傳遞狀態)狀態時
             *
             * 將其設定為SIGNAL狀態,然後當前結點才可以可以被安全地park
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
  • semaphore.release() 呼叫獲取資源

    --releaseShared 
    
      public final boolean releaseShared(int arg) {
              
          // 嘗試釋放資源 釋放成功)(更改state成功 才開始去喚醒執行緒)
          if (tryReleaseShared(arg)) {
              // 去喚醒執行緒
              doReleaseShared();
              return true;
          }
          return false;
      }
      
    -tryReleaseShared 
      // 嘗試釋放資源 更改state
      protected final boolean tryReleaseShared(int releases) {
          for (;;) {
              // 0
              int current = getState();
              // 0+1
              int next = current + releases;
              if (next < current) // overflow
              {
                  throw new Error("Maximum permit count exceeded");
              }
              // 設定狀態為 1
              if (compareAndSetState(current, next)) {
                  return true;
              }
          }
      }
      
    -doReleaseShared
    /**
     * 把當前結點設定為SIGNAL或者PROPAGATE
     * 喚醒head.next(B節點),B節點喚醒後可以競爭鎖,成功後head->B,然後又會喚醒B.next,一直重複直到共享節點都喚醒
     * head節點狀態為SIGNAL,重置head.waitStatus->0,喚醒head節點執行緒,喚醒後執行緒去競爭共享鎖
     * head節點狀態為0,將head.waitStatus->Node.PROPAGATE傳播狀態,表示需要將狀態向後繼節點傳播
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
    
                //判斷當前狀態是不是-1,是的話想要把-1設定成0,沒成功的話繼續迴圈
    
                // 成功的話  去 unparkSuccessor方法判斷狀態是否<0 (從這個方法進去的已經==0)
                // unparkSuccessor中獲取下一個節點 node s = node.next繼續判斷狀態,從這裡進去的方法中s!=null && s.waitStatus!>0
                // 所以 會執行 LockSupport.unpark(s.thread);//喚醒執行緒 (有機會去搶佔資源)(共享中一個一個喚醒,但是如果狀態是被標記為-3(可傳遞)的話,會把-3的全喚醒)
                // 這裡喚醒後 在方法 doAcquireSharedInterruptibly 掛起的執行緒繼續跑 ,開始繼續去嘗試獲取資源(不一定獲取成功:非公平鎖)
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//head是SIGNAL狀態
                    /* head狀態是SIGNAL,重置head節點waitStatus為0,這裡不直接設為Node.PROPAGATE,
                     * 是因為unparkSuccessor(h)中,如果ws < 0會設定為0,所以ws先設定為0,再設定為PROPAGATE
                     * 這裡需要控制併發,因為入口有setHeadAndPropagate跟release兩個,避免兩次unpark
                     */
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; //設定失敗,重新迴圈
                    /* head狀態為SIGNAL,且成功設定為0之後,喚醒head.next節點執行緒
                     * 此時head、head.next的執行緒都喚醒了,head.next會去競爭鎖,成功後head會指向獲取鎖的節點,
                     * 也就是head發生了變化。看最底下一行程式碼可知,head發生變化後會重新迴圈,繼續喚醒head的下一個節點
                     */
                    unparkSuccessor(h);
                    /*
                     * 如果本身頭節點的waitStatus是出於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。
                     * 意味著需要將狀態向後一個節點傳播
                     */
                }
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                    continue;                // loop on failed CAS
                }
            }
            if (h == head) //如果head變了,重新迴圈
            {
                break;
            }
        }
    }
    
    -unparkSuccessor
     /**
     *喚醒執行緒
     */
    private void unparkSuccessor(Node node) {
        //獲取wait狀態
        int ws = node.waitStatus;
        if (ws < 0)
            // 將等待狀態waitStatus設定為初始值0
            compareAndSetWaitStatus(node, ws, 0);
    
        /**
         * 若後繼結點為空,或狀態為CANCEL(已失效),則從後尾部往前遍歷找到最前的一個處於正常阻塞狀態的結點
         * 進行喚醒
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒執行緒
    }
    

2.CountDownLatch使用及應用 (不可重用)

CountDownLatch這個類能夠使一個執行緒等待其他執行緒完成各自的工作後再執行。
例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有的框架服務之後再執行
  • 工作方式

     CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。
     每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,
     它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務
     
     CountDownLatch.countDown()
     CountDownLatch.await();
    
  • 應用場景例子

     比如陪媳婦去看病。
     醫院裡邊排隊的人很多,如果一個人的話,要先看大夫,看完大夫再去排隊交錢取藥。
     現在我們是雙核,可以同時做這兩個事(多執行緒)。
     假設看大夫花3秒鐘,排隊交費取藥花5秒鐘。我們同時搞的話,5秒鐘我們就能完成,然後一起回家(回到主執行緒)。
    
  • 程式碼演示和解釋

     /**
      * CountDown 解釋
      * @author njw
      */
     public class CountDownLaunchSample {
         public static void main(String[] args) throws InterruptedException {
             long now = System.currentTimeMillis();
             CountDownLatch countDownLatch = new CountDownLatch(2);
             new Thread(new SeeDoctorTask(countDownLatch)).start();
             new Thread(new QueueTask(countDownLatch)).start();
     
             /**
              * 這裡實際呼叫的也是 acquireSharedInterruptibly
              *   返回 < 0 就去獲取資源 就是隻有當 CountDownLatch 設定的初始值為0是放心,從這段邏輯看來 state是不會恢復的,
              *   所以 CountDownLatch 不可重用
              *
              *   protected int tryAcquireShared(int acquires) {
              *      return (getState() == 0) ? 1 : -1;
              *   }
              *
              *   如果返回-1 說明state 還沒減少到0,呼叫 doAcquireSharedInterruptibly 把當前執行緒阻塞掛起
              *   (這個方法這裡只調用一次,所以佇列裡面其實這時候就只有這一個執行緒等待)
              *
              *   當 countDownLatch.countDown() 執行一次後,釋放資源會把這個執行緒喚醒,
              *   然後執行緒繼續迴圈(tryAcquireShared)嘗試去判斷是否已經state變成了0,不是的話再一次掛起當前執行緒
              *   當其他 執行緒再一次執行 countDownLatch.countDown() 的時候,這個執行緒再一次被喚醒,
              *   再去判斷,一直知道state=0,然後獲取成功 執行 setHeadAndPropagate  最後返回 繼續執行後面的邏輯
              */
             countDownLatch.await();
             System.out.println("over,回家 cost:" + (System.currentTimeMillis() - now));
         }
     
         static class SeeDoctorTask implements Runnable {
             private CountDownLatch countDownLatch;
     
             public SeeDoctorTask(CountDownLatch countDownLatch) {
                 this.countDownLatch = countDownLatch;
             }
     
             public void run() {
                 try {
                     System.out.println("開始看醫生");
                     Thread.sleep(2000);
                     System.out.println("看醫生結束,準備離開病房");
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } finally {
                     if (countDownLatch != null) {
     
                         /**
                          *  countDownLatch.countDown 實際的呼叫
                          *  public final boolean releaseShared(int arg) {
                          *      // 判斷是否已經變為0了,不是的話一次執行-1操作
                          *      if (tryReleaseShared(arg)) {
                          *          // 釋放資源 同時喚醒 countDownLatch.await() 中掛起的執行緒
                          *          doReleaseShared();
                          *          return true;
                          *      }
                          *      return false;
                          *  }
                          *
                          *
                          *  嘗試釋放共享資源  如果已經釋放到0了,就不再去釋放 不然的話一次釋放一個
                          *  等這裡的 釋放到0 的時候
                          * protected boolean tryReleaseShared(int releases) {
                          *     // 當state變成0的時候 喚醒
                          *     for (;;) {
                          *         int c = getState();
                          *         if (c == 0)
                          *             return false;
                          *         int nextc = c-1;
                          *         if (compareAndSetState(c, nextc))
                          *             return nextc == 0;
                          *     }
                          * }
                          */
                         countDownLatch.countDown();
                     }
                 }
             }
         }
         static class QueueTask implements Runnable {
         private CountDownLatch countDownLatch;
         public QueueTask(CountDownLatch countDownLatch) {
             this.countDownLatch = countDownLatch;
         }
         public void run() {
             try {
                 System.out.println("開始在醫院藥房排隊買藥....");
                 Thread.sleep(5000);
                 System.out.println("排隊成功,可以開始繳費買藥");
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } finally {
                 if (countDownLatch != null) {
                     countDownLatch.countDown();
                 }
             }
         }
       }
     }
    

3. CyclicBarrier使用及應用 (可重用)

    柵欄屏障,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,
    屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。
    
    CyclicBarrier預設的構造方法是CyclicBarrier(intparties),parties表示屏障攔截的執行緒數量,
    每個執行緒呼叫await方法告CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。
    
    cyclicBarrier.await()
  • 應用場景例子

     可以用於多執行緒計算資料,最後合併計算結果的場景。
     例如,用一個Excel儲存了使用者所有銀行流水,每個Sheet儲存一個賬戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,
     先用多執行緒處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,
     最後,再用barrierAction用這些執行緒的計算結果,計算出整個Excel的日均銀行流水。
    
  • 程式碼示例

     /**
      * CyclicBarrier 解釋
      * @author njw
      */
     public class CyclicBarrierTest implements Runnable {
         private CyclicBarrier cyclicBarrier;
         private int index ;
         public CyclicBarrierTest(CyclicBarrier cyclicBarrier, int index) {
             this.cyclicBarrier = cyclicBarrier;
             this.index = index;
         }
         public void run() {
             try {
                 System.out.println("index: " + index);
                 index--;
     
                 /**
                  *
                  */
                 cyclicBarrier.await();
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
         public static void main(String[] args) throws Exception {
             /**
              * 設定了等待完畢後的執行動作 barrierAction
              * 並且儲存了 等待次數 parties 用於重用
              * public CyclicBarrier(int parties, Runnable barrierAction) {
              *     if (parties <= 0) throw new IllegalArgumentException();
              *     this.parties = parties;
              *     this.count = parties;
              *     this.barrierCommand = barrierAction;
              * }
              */
             CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
                 public void run() {
                     System.out.println("所有特工到達屏障,準備開始執行祕密任務");
                 }
             });
             for (int i = 0; i < 10; i++) {
                 new Thread(new CyclicBarrierTest(cyclicBarrier, i)).start();
             }
             cyclicBarrier.await();
             System.out.println("全部到達屏障....");
         }
     }
    
  • 核心程式碼解釋 cyclicBarrier.await()

     dowait(false, 0L)
         
      /**
      * Main barrier code, covering the various policies.
      */
     private int dowait(boolean timed, long nanos)
             throws InterruptedException, BrokenBarrierException,
             TimeoutException {
         // 藉助 ReentrantLock 加鎖
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
             final Generation g = generation;
             // 初始化的時候是false
             if (g.broken)
                 throw new BrokenBarrierException();
    
             // 判斷執行緒是否已經中斷
             if (Thread.interrupted()) {
                 // 喚醒所有 丟擲異常
                 breakBarrier();
                 throw new InterruptedException();
             }
    
             int index = --count;
             if (index == 0) {  // tripped
                 // 已經倒數到 0 了
                 boolean ranAction = false;
                 try {
                     // 初始化時如果定了了 這個是外面傳進來的任務。barrierAction 到達柵欄後 要執行的任務
                     final Runnable command = barrierCommand;
                     // 任務不為空 執行
                     if (command != null)
                         command.run();
                     // 標記任務已經執行了
                     ranAction = true;
    
                     // 到這裡的時候 已經倒數到0了,並且執行了任務 (這裡狀態重置)
                     // 這裡便是它可以重用的原因 這裡開啟了下一次倒數。
                     // 喚醒這裡cyclicBarrier等待的所有執行緒後,把state設定成了初始值 所以它可以重用 (parties建立的時候設定的值)
                     nextGeneration();
                     return 0;
                 } finally {
                     // 失敗的話喚醒,設定中斷標記
                     if (!ranAction) {
                         breakBarrier();
                     }
                 }
             }
    
             // loop until tripped, broken, interrupted, or timed out
             for (; ; ) {
                 try {
                     // 當代的時候傳進來 false ,!false=true
                     if (!timed)
                         // reentrantLock 條件等待
                         trip.await();
                     else if (nanos > 0L)
                         nanos = trip.awaitNanos(nanos);
                 } catch (InterruptedException ie) {
                     if (g == generation && !g.broken) {
                         breakBarrier();
                         throw ie;
                     } else {
                         // We're about to finish waiting even if we had not
                         // been interrupted, so this interrupt is deemed to
                         // "belong" to subsequent execution.
                         Thread.currentThread().interrupt();
                     }
                 }
    
                 // 中斷丟擲異常
                 if (g.broken)
                     throw new BrokenBarrierException();
                 if (g != generation)
                     return index;
                 if (timed && nanos <= 0L) {
                     breakBarrier();
                     throw new TimeoutException();
                 }
             }
         } finally {
             lock.unlock();
         }
     }
    

4. 簡單介紹 Executors

    主要用來建立執行緒池,代理了執行緒池的建立,使得你的建立入口引數變得簡單
  • 重要方法

     newCachedThreadPool     建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
     newFixedThreadPool     建一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
     newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
     newSingleThreadExecutor建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO,LIFO,優先順序)執行。
    

5. Exchanger (使用比較少)

    當一個執行緒執行到exchange()方法時會阻塞,另一個執行緒執行到exchange()時,二者交換資料,然後執行後面的程式
  • 示例

     /**
      * 執行緒間資料交換
      * @author njw
      */
     public class ExchangerTest {
         public static void main(String[] args) {
             final Exchanger<Integer> exchanger = new Exchanger<Integer>();
             for (int i = 0; i < 10; i++) {
                 final Integer num = i;
                 new Thread() {
                     public void run() {
                         System.out.println("我是執行緒:Thread_" + this.getName() + "我的資料是:" + num);
                         try {
                             Integer exchangeNum = exchanger.exchange(num);
                             Thread.sleep(1000);
                             System.out.println("我是執行緒:Thread_" + this.getName() + "我原先的資料為:" + num + " , 交換後的資料為:" + exchangeNum);
                         } catch (InterruptedException e) {
                             e.printStackTrace();
                         }
                     }
                 }.start();
             }
         }
     }