1. 程式人生 > 其它 >CountDownLatch能不能在多個執行緒上新增await?

CountDownLatch能不能在多個執行緒上新增await?

CountDownLatch類的使用過程中,發現了一個很奇怪的現象:

	CountDownLatch countDownLatch = new CountDownLatch(2);
		
		Runnable taskMain = () -> {
			try { 
				countDownLatch.await();   // 等待喚醒
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("繼續執行任務");
		};
		Runnable taskMain1 = () -> {
			try { 
				countDownLatch.await();  // 等待喚醒
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("繼續執行任務2");
		};
		
		Runnable task1 = () -> {
			countDownLatch.countDown();   // 計數器 -1 
			System.out.println("前置任務1完成");
		};
		
		Runnable task2 = () -> {
			countDownLatch.countDown();  // 計數器 -1 
			System.out.println("前置任務2完成");
		};
		
		new Thread(taskMain).start();
		new Thread(taskMain1).start();
		new Thread(task1).start();
		new Thread(task2).start();

在這個地方使用了兩個await,希望在兩個前置執行緒執行完成之後再執行剩下的兩個執行緒。但是結果有點特別:

繼續執行任務
前置任務1完成
前置任務2完成
繼續執行任務2

我發現taskMain的任務首先被執行了。

按照邏輯來說,在第一個await執行中,由於此時AQSstate值等於計數器設定的count值2,state必須等於0時他才能拿到鎖,所以此時他被掛起。第二個也是如此,那麼為什麼會出現第一個await被執行了呢?

我從頭捋一遍程式碼,當第一個await被呼叫後:

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }
    
    // 這是在CountDownLatch中複寫的方法,忘了方便觀看放到一起了。
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

因為此時state值為2,所以tryAcquireShared(arg) < 0成立,此時會呼叫acquire嘗試去獲取鎖。

 /**
     * Main acquire method, invoked by all exported acquire methods.
     *
     * @param node null unless a reacquiring Condition
     * @param arg the acquire argument
     * @param shared true if shared mode else exclusive
     * @param interruptible if abort and return negative on interrupt
     * @param timed if true use timed waits
     * @param time if timed, the System.nanoTime value to timeout
     * @return positive if acquired, 0 if timed out, negative if interrupted
     */
    final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued

        /*
         * Repeatedly:
         *  Check if node now first
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if node not yet created, create it
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */

        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }

由此可以看出第一次執行之後,建立節點加入到CLH佇列中,然後被LockSupport.park(this)掛起。這部分跟我預想的一樣。第二次也是這樣,按理說沒什麼區別啊。

我不信邪,又重新試了一遍。這次我添加了計數器的個數(15),並且每次都輸出當前的state值。

前置任務b完成:13
前置任務a完成:13
前置任務a完成:12
前置任務b完成:11
前置任務a完成:10
前置任務b完成:9
前置任務a完成:8
前置任務b完成:7
前置任務a完成:6
前置任務b完成:5
前置任務a完成:4
前置任務b完成:3
前置任務a完成:2
前置任務b完成:1
繼續執行任務c:0
繼續執行任務c:0
繼續執行任務c:0
繼續執行任務d:0
繼續執行任務c:0
繼續執行任務d:0
繼續執行任務d:0
繼續執行任務d:0
繼續執行任務c:0
繼續執行任務d:0
繼續執行任務d:0

誤會,嘿嘿。CountDownLatch果然可以在多個執行緒上新增await。