CountDownLatch能不能在多個執行緒上新增await?
阿新 • • 發佈:2021-08-25
在CountDownLatch
類的使用過程中,發現了一個很奇怪的現象:
CountDownLatch countDownLatch = new CountDownLatch(2); Runnable taskMain = () -> { try { countDownLatch.await(); // 等待喚醒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("繼續執行任務"); }; Runnable taskMain1 = () -> { try { countDownLatch.await(); // 等待喚醒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("繼續執行任務2"); }; Runnable task1 = () -> { countDownLatch.countDown(); // 計數器 -1 System.out.println("前置任務1完成"); }; Runnable task2 = () -> { countDownLatch.countDown(); // 計數器 -1 System.out.println("前置任務2完成"); }; new Thread(taskMain).start(); new Thread(taskMain1).start(); new Thread(task1).start(); new Thread(task2).start();
在這個地方使用了兩個await
,希望在兩個前置執行緒執行完成之後再執行剩下的兩個執行緒。但是結果有點特別:
繼續執行任務
前置任務1完成
前置任務2完成
繼續執行任務2
我發現taskMain
的任務首先被執行了。
按照邏輯來說,在第一個await
執行中,由於此時AQS
的state
值等於計數器設定的count
值2,state
必須等於0時他才能拿到鎖,所以此時他被掛起。第二個也是如此,那麼為什麼會出現第一個await
被執行了呢?
我從頭捋一遍程式碼,當第一個await
被呼叫後:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted() || (tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); } // 這是在CountDownLatch中複寫的方法,忘了方便觀看放到一起了。 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
因為此時state
值為2,所以tryAcquireShared(arg) < 0
成立,此時會呼叫acquire
嘗試去獲取鎖。
/** * Main acquire method, invoked by all exported acquire methods. * * @param node null unless a reacquiring Condition * @param arg the acquire argument * @param shared true if shared mode else exclusive * @param interruptible if abort and return negative on interrupt * @param timed if true use timed waits * @param time if timed, the System.nanoTime value to timeout * @return positive if acquired, 0 if timed out, negative if interrupted */ final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false; Node pred = null; // predecessor of node when enqueued /* * Repeatedly: * Check if node now first * if so, ensure head stable, else ensure valid predecessor * if node is first or not yet enqueued, try acquiring * else if node not yet created, create it * else if not yet enqueued, try once to enqueue * else if woken from park, retry (up to postSpins times) * else if WAITING status not set, set and retry * else park and clear WAITING status, and check cancellation */ for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }
由此可以看出第一次執行之後,建立節點加入到CLH佇列中,然後被LockSupport.park(this)
掛起。這部分跟我預想的一樣。第二次也是這樣,按理說沒什麼區別啊。
我不信邪,又重新試了一遍。這次我添加了計數器的個數(15),並且每次都輸出當前的state
值。
前置任務b完成:13
前置任務a完成:13
前置任務a完成:12
前置任務b完成:11
前置任務a完成:10
前置任務b完成:9
前置任務a完成:8
前置任務b完成:7
前置任務a完成:6
前置任務b完成:5
前置任務a完成:4
前置任務b完成:3
前置任務a完成:2
前置任務b完成:1
繼續執行任務c:0
繼續執行任務c:0
繼續執行任務c:0
繼續執行任務d:0
繼續執行任務c:0
繼續執行任務d:0
繼續執行任務d:0
繼續執行任務d:0
繼續執行任務c:0
繼續執行任務d:0
繼續執行任務d:0
誤會,嘿嘿。CountDownLatch果然可以在多個執行緒上新增await。