AQS系列(五)- CountDownLatch的使用及原理
前言
前面四節學完了AQS最難的兩種重入鎖應用,下面兩節進入實戰學習,看看JUC包中其他的工具類是如何運用AQS實現特定功能的。今天一起看一下CountDownLatch。
CountDownLatch可以用來實現多個執行緒執行完一個功能後讓另一個執行緒繼續執行的功能。常見的場景比如大檔案的處理,我們需要對一個或多個檔案進行處理,處理完之後再統一入庫,這時我們就可以用到CountDownLatch了。
一、使用樣例
1 public static void main(String[] args) { 2 // 指定初始容量 3 CountDownLatch latch = new CountDownLatch(3); 4 // 啟動三個執行緒,每個執行緒獨自處理檔案 5 for (int i = 0;i < 3; i++) { 6 new Thread(() -> { 7 System.out.println(Thread.currentThread().getName() + " 正在處理檔案"); 8 try { 9 Thread.sleep(2000); 10 } catch (InterruptedException e) { 11 e.printStackTrace(); 12 } 13 System.out.println(Thread.currentThread().getName() + " 處理完畢"); 14 latch.countDown(); 15 }).start(); 16 } 17 try { 18 latch.await(); 19 } catch (Exception e) { 20 e.printStackTrace(); 21 } 22 System.out.println("所有檔案處理完成後,統一入庫"); 23 }
執行結果:
1 Thread-0 正在處理檔案 2 Thread-2 正在處理檔案 3 Thread-1 正在處理檔案 4 Thread-0 處理完畢 5 Thread-2 處理完畢 6 Thread-1 處理完畢 7 所有檔案處理完成後,統一入庫
效果就是這樣,下面我們一起看看它是如何實現的這種功能。
二、原始碼學習
1、首先我們看看new CountDownLatch(3) 做了什麼事情
1 public CountDownLatch(int count) { 2 if (count < 0) throw new IllegalArgumentException("count < 0"); 3 this.sync = new Sync(count); 4 }
繼續追蹤可以發現,就是將count賦值給AQS中的成員變數state,表示已經有3個執行緒佔用了鎖。
2、看countDown()方法做了什麼
1 public void countDown() { 2 sync.releaseShared(1); 3 }
可以看到,countDown走的是釋放共享鎖的邏輯,從給state賦值也可以猜到用的是共享鎖-有多個執行緒且state可賦大於0的值。繼續看releaseShared邏輯:
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) { 3 doReleaseShared(); 4 return true; 5 } 6 return false; 7 }
可以看到就是讀鎖釋放的邏輯,其中doReleaseShared方法實現邏輯相同就不看了,不同的是tryReleaseShared方法,下面跟進:
1 protected boolean tryReleaseShared(int releases) { 2 // Decrement count; signal when transition to zero 3 for (;;) { 4 int c = getState(); 5 if (c == 0) 6 return false; 7 int nextc = c-1; 8 if (compareAndSetState(c, nextc)) 9 return nextc == 0; 10 } 11 }
此方法在CountDownLatch中的內部類Sync中得到實現,邏輯為將state-1,並且如果是0的話返回true。返回true後在releaseShared方法中會進入if裡面,走喚醒後續節點的邏輯doReleaseShared方法,在該方法中喚醒的main執行緒。main執行緒什麼時候被掛起的?且看下面。
3、await方法
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 }
await呼叫了可響應中斷的獲取共享鎖方法,繼續檢視:
1 public final void acquireSharedInterruptibly(int arg) 2 throws InterruptedException { 3 if (Thread.interrupted()) 4 throw new InterruptedException(); 5 if (tryAcquireShared(arg) < 0) 6 doAcquireSharedInterruptibly(arg); 7 }
此方法是AQS中的公用模板方法,不同點在於各實現類的實現邏輯,在CountDownLatch中對tryAcquireShared方法進行了實現,實現邏輯如下:
1 protected int tryAcquireShared(int acquires) { 2 return (getState() == 0) ? 1 : -1; 3 }
即如果state==0則能獲取到鎖,否則獲取不到。獲取不到進入下面的doAcquireSharedInterruptibly方法,最終會將head的waitStatus設定為-1,自己掛起等待喚醒。
三、總結
CountDownLatch是基於共享鎖實現的併發控制功能,現在對總的實現邏輯做個梳理:首先在構造器初始化CountDownLatch的時候,就會給AQS中的state賦值,表示共享鎖已經被獲取了N次;然後每執行一次countDown則共享鎖釋放一次,直到釋放完;await方法是加鎖的邏輯,但加鎖條件是state==0時才會加鎖成功,否則掛起;最後,當通過countDown的呼叫將state減為0後,會喚醒處於阻塞狀態的主執行緒,讓其 獲取到鎖並執行