1. 程式人生 > 實用技巧 >「APIO2018」選圓圈(K-D Tree/CDQ+Set)

「APIO2018」選圓圈(K-D Tree/CDQ+Set)

概述

CountDownLatchJ.U.C包中提供的一個併發工具類,其主要作用是協調多個執行緒之間的同步,其可以讓一個執行緒在等待其他執行緒執行完任務之後再繼續執行。

demo1

假設現在有一場考試,考場中有五個人,考試時間是1s,那麼監考老師只能等考試時間到了才能收卷。使用CountDownLatch如下:

 public static void main(String[] args) {



        final CountDownLatch latch = new CountDownLatch(10);

        ExecutorService service = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));


        try {
            for (int i = 0; i < 10; i++) {
                service.execute(()->{

                    try {
                        System.out.println("學生i" + Thread.currentThread().getName() + " 開始答題。。。。");
                        Thread.sleep(1000);
                        System.out.println("時間到,學生i"+Thread.currentThread().getName()+"交卷。。。。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        latch.countDown();

                    }
                });
            }

            System.out.println("考試開始。。。");
            latch.await();
            service.shutdown();
            System.out.println("考試結束。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

啟動執行,輸出如下:

考試開始。。。
學生ipool-1-thread-1 開始答題。。。。
學生ipool-1-thread-3 開始答題。。。。
學生ipool-1-thread-2 開始答題。。。。
學生ipool-1-thread-4 開始答題。。。。
學生ipool-1-thread-5 開始答題。。。。
學生ipool-1-thread-6 開始答題。。。。
學生ipool-1-thread-7 開始答題。。。。
學生ipool-1-thread-8 開始答題。。。。
學生ipool-1-thread-9 開始答題。。。。
學生ipool-1-thread-10 開始答題。。。。
時間到,學生ipool-1-thread-1交卷。。。。
時間到,學生ipool-1-thread-3交卷。。。。
時間到,學生ipool-1-thread-6交卷。。。。
時間到,學生ipool-1-thread-9交卷。。。。
時間到,學生ipool-1-thread-2交卷。。。。
時間到,學生ipool-1-thread-10交卷。。。。
時間到,學生ipool-1-thread-8交卷。。。。
時間到,學生ipool-1-thread-7交卷。。。。
時間到,學生ipool-1-thread-5交卷。。。。
時間到,學生ipool-1-thread-4交卷。。。。
考試結束。。。

原始碼分析

成員變數

  //私有靜態內部類  
  private final Sync sync;
  // Sync繼承自AQS 從而具有佇列同步的功能
  private static final class Sync extends AbstractQueuedSynchronizer {
       。。。。
    }

構造方法

public CountDownLatch(int count) {
        //入參校驗
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //初始化 sync
        this.sync = new Sync(count);
    }


Sync(int count) {
            setState(count);
        }

//AbstractQueuedSynchronizer類

private volatile int state;

protected final void setState(int newState) {
        state = newState;
    }

CountDownLatch的構造方法實際上是對Sync進行初始化,而Sync的構造方法底層又是呼叫AQS框架的setState方法來設定計數器的值。

countdown方法

 public void countDown() {
        sync.releaseShared(1);
    }

countDown方法很簡單,就是呼叫AQS框架裡的releaseShared來改變state的值,原始碼如下:

   public final boolean releaseShared(int arg) {
       //tryReleaseShared是一個模板方法,由其子類進行實現
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

我們來看一下tryReleaseShared這個方法,在AQS框架中,這是一個模板方法,由繼承它的子類來具體實現。在CountDownLatch中其私有靜態內部類Sync繼承了AQS,所以也會重寫該方法,通過自旋和CAS來實現釋放鎖的目的。如下所示:

  protected boolean tryReleaseShared(int releases) {
            //自旋
            for (;;) {
                //獲取計數器的值
                int c = getState();
                // 每次釋放的時候,也就是子任務完成的時候計數值減一
                if (c == 0)
                    return false;
                //否則的話 將state-1
                int nextc = c-1;
                //使用 CAS 修改 state的值
                if (compareAndSetState(c, nextc))
                    // 子任務均處理完畢後,返回 true; 也就是真正的釋放
        	        // 將喚醒阻塞在同步佇列的執行緒
                    // 否則繼續自旋
                    return nextc == 0;
            }
        }

tryReleaseShared的返回結果true時,繼續執行AQS中的doReleaseShared方法,原始碼如下:

private void doReleaseShared() {
        //自旋
        for (;;) {
            //獲取CLH佇列的頭節點
            Node h = head;
            //如果不為null 且CLH佇列不只一個節點
            if (h != null && h != tail) {
                //獲取節點的waitStatus
                int ws = h.waitStatus;
                //如果是 ws是SINGAL狀態
                if (ws == Node.SIGNAL) {
                    //設定節點狀態由 SINGAL變為0失敗
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                     //繼續自旋
                        continue;
                    //喚醒頭節點
                    unparkSuccessor(h);
                }
                // 如果ws==0 且 設定狀態PROPAGATE失敗
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    //繼續自旋
                    continue;                
            }
            //判斷頭節點是否已經反正變化
            if (h == head)      
                //結束迴圈
                break;
        }
    }

首先獲得head節點。如果head節點不等於空且head節點不等於tail節點,說明CLH佇列中此時不止一個節點在排隊,獲得head節點的waitStatus。判斷當前head節點狀態是否是SINGAL。處於SINGAL狀態的節點,說明當前節點的後繼節點處於被喚醒的狀態。如果CAS操作將head節點的waitStatus重置為0失敗,那麼跳出當前迴圈,繼續執行下一次迴圈(重新檢查)。如果重置成功,那麼呼叫unparkSuccessor方法喚醒後繼節點。 如果當前head節點狀態等於0,通過CAS操作將waitStatus設定為PROPAGATE(傳播)狀態,確保可以向後一個節點傳播下去。如果CAS操作失敗,那麼當前迴圈,繼續執行下一次迴圈。最後的h == head,是判斷head節點是否發生變化。如果沒有發生變化,結束迴圈。如果發生變化,必須再次迴圈。


    private void unparkSuccessor(Node node) {
        //獲取節點的waitStatus
        int ws = node.waitStatus;
        //小於0的話 使用CAS 設定為0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //當前節點的下一個節點       
        Node s = node.next;
        //如果為null 或者waitStatus>0,即被取消
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從尾開始向前遍歷  找到第一個waitStatus小於等於0的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //不為空的話,就喚醒這個節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }

獲取head節點的waitStatus,如果小於0,進行CAS操作重置為0。獲取head節點的後繼節點,如果後繼節點等於null或者後繼節點的waitStaus大於0(說明後繼節點處於CANCELLED狀態),那麼從佇列從尾部往前進行遍歷尋找waitStatus小於等於0的節點。如果這個遍歷出來的節點不等於null的話,那麼通過LockSupport.unpark()喚醒這個節點中的執行緒。

從實現可以看出,每次子任務在呼叫 countDown 時,會將同步狀態值減一,當所有子任務均完成時 (state = 0) 此時會喚醒阻塞在同步佇列的節點。

await方法

await方法會使當前執行緒在計數器變為0之前,一直處於等待狀態,除非被打斷。

 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await方法底層呼叫的還是AQS框架中的acquireSharedInterruptibly方法,原始碼如下:

  public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
      //判斷執行緒是否中斷
        if (Thread.interrupted())
            //如果被中斷 則直接丟擲異常
            throw new InterruptedException();
      //tryAcquireShared依然是一個模板方法 由子類實現
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

AQS中的acquireSharedInterruptibly方法,會判斷執行緒是否中斷。如果中斷, 丟擲InterruptedException異常。值得注意的是Thread.interrupted()方法,是測試當前執行緒是否中斷。該方法會清除執行緒的中斷狀態。換句話說,如果呼叫這個方法2次,那麼第二次會直接返回false,除非當前執行緒在第一次呼叫之後再次被中斷。如果tryAcquireShared()小於0(說明該計數器值大於0),繼續執行doAcquireSharedInterruptibly。

Sync中的tryAcquireShared方法很簡單,原始碼如下:

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

這裡只是簡單的判斷state變數。如果state等於0(說明計數值為0),返回1,否則返回-1(說明計數器值大於0)。

最後是AQS裡的doAcquireSharedInterruptibly方法:

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入隊
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

很明顯,是通過輪詢的方式去獲取共享鎖。首先將當前執行緒包裝成型別為SHARED的節點,標誌為共享型別的節點。獲取當前節點的前驅節點。如果當前節點的前驅節點為head節點的話,說明該節點是在AQS佇列中等待獲取鎖的第一個節點。呼叫CountDownLatch中的tryAcquireShared()嘗試去獲取鎖。返回的值大於0的話,說明獲取鎖成功。如果獲取共享鎖成功,那麼把當前節點設定為AQS同步佇列中的head節點,同時將p.next置為null(方便GC)。回到頭看,如果當前節點的前驅節點不是head節點或者獲取鎖失敗,我們需要呼叫shouldParkAfterFailedAcquire()方法判斷當前執行緒是否需要掛起,如果需要掛起呼叫 parkAndCheckInterrupt()。

await方法還有一個過載方法,加入超時機制。

 // 返回false,代表超時 
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

// 返回false,代表超時。返回true,代表獲得共享鎖成功
 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }


private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //static final long spinForTimeoutThreshold = 1000L;
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

AQS中的doAcquireSharedNanos方法中,如果在nanosTimeout時間範圍內,還沒有獲取共享鎖成功的話,直接返回false。spinForTimeoutThreadshold的值為1000nanoseconds。如果shouldParkAfterFailedAcquire(p, node)返回true且超時時間大於閥值spinForTimeoutThreadshold的話,會通過LockSupport.parkNanos(this, nanosTimeout);讓執行緒掛起nanosTimeout時間。這樣的策略體現是:如果超時時間很短的話,就不把當前執行緒掛起,而是通過自旋,這樣執行緒獲取鎖很快就釋放的情況下,可以減少cpu資源和執行緒掛起和恢復的效能損耗。