【Java併發工具類】CountDownLatch和CyclicBarrier
前言
下面介紹協調讓多執行緒步調一致的兩個工具類:CountDownLatch
和CyclicBarrier
。
CountDownLatch和CyclicBarrier的用途介紹
CountDownLatch
// API void await(); // 使當前執行緒在閉鎖計數器到零之前一直等待,除非執行緒被中斷。 boolean await(long timeout, TimeUnit unit); // 使當前執行緒在閉鎖計數器至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間。 void countDown(); // 遞減閉鎖計數器,如果計數到達零,則釋放所有等待的執行緒。 long getCount(); // 返回當前計數。 String toString(); // 返回標識此閉鎖及其狀態的字串。
CountDownLatch
是一個同步工具類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。可以指定計數初始化CountDownLatch,當呼叫countDown()
方法後,在當前計數到達零之前,await()
方法會一直受阻塞。計數到達零之後,所有被阻塞的執行緒都會被釋放,await()
的所有後續呼叫都會立即返回。CountDownLatch的計數只能被使用一次,如果需要重複計數使用,則要考慮使用CyclicBarrier
。
CountDownLatch的用途有很多。將計數為1初始化的CountDownLatch可用作一個簡單的開/關或入口:在通過呼叫countDown()的執行緒開啟入口前,所有呼叫await()的執行緒都一直在入口出等待。而用N初始化CountDownLatch可以使一個執行緒在N個執行緒完成某項操作之前一直等待,或者使其在某項操作完成N次之前一直等待。
COuntDownLatch的記憶體一致性語義:執行緒中呼叫 countDown()
之前的操作 Happens-Before緊跟在從另一個執行緒中對應 await()
成功返回的操作。
CyclicBarrier
// API int await(); // 執行緒將一直等待直到所有參與者都在此 barrier 上呼叫 await 方法 int await(long timeout, TimeUnit unit); // 執行緒將一直等待直到所有參與者都在此 barrier 上呼叫 await 方法, 或者超出了指定的等待時間。 int getNumberWaiting(); // 返回當前在屏障處等待的參與者數目。 int getParties(); // 返回要求啟動此 barrier 的參與者數目。 boolean isBroken(); // 查詢此屏障是否處於損壞狀態。 void reset(); // 將屏障重置為其初始狀態。
CyclicBarrier
是一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點(barrier也可被翻譯為柵欄) (common barrier point)。 CyclicBarrier 適用於在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待的情況。即所有執行緒都必須到達屏障位置後,下面的程式才能繼續執行,適於在迭代演算法中使用。因為 barrier 在釋放等待執行緒後可以計數器會被重置可繼續使用,所以稱它為迴圈 的 barrier。
CyclicBarrier支援一個可選的 Runnable
命令(也就是可以傳入一個執行緒執行其他操作),在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令將只在每個 barrier point 執行一次。這對所有參與執行緒繼續執行之前更新它們的共享狀態將十分有用。
CyclicBarrier的記憶體一致性語義:執行緒中呼叫 await()
之前的操作 Happens-Before 那些是屏障操作的一部份的操作,後者依次 Happens-Before 緊跟在從另一個執行緒中對應 await()
成功返回的操作。
Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.
在對賬系統中使用CountDownLatch和CyclicBarrier
對賬系統流程圖如下:
目前對賬系統的處理流程是:先查詢訂單,然後查詢派送單,之後對比訂單和派送單,將差異寫入差異庫。對賬系統的程式碼抽象後如下:
while(存在未對賬訂單){
// 查詢未對賬訂單
pos = getPOrders();
// 查詢派送單
dos = getDOrders();
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
利用並行優化對賬系統
目前的對賬系統,由於訂單量和派送單量巨大,所以查詢未對賬訂單getPOrder()
和查詢派送單getDOrder()
都相對比較慢。目前對賬系統是單執行緒執行的,示意圖如下(圖來自參考[1]):
對於序列化的系統,優化效能首先想到的就是能否利用多執行緒並行處理。
如果我們能將getPOrders()和getDOrders()這兩個操作並行處理,那麼將會提升效率很多。因為這兩個操作並沒有先後順序的依賴,所以,我們可以並行處理這兩個耗時的操作。
並行後的示意圖如下(圖來自參考[1]):
對比單執行緒的執行示意圖,我們發現在同等時間裡,並行執行的吞吐量近乎單執行緒的2倍,優化效果還是相對明顯的。
優化後的程式碼如下:
while(存在未對賬訂單){
// 查詢未對賬訂單
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查詢派送單
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 要等待執行緒T1和T2執行完才能執行check()和save()這兩個操作
// 通過呼叫T1.join()和T2.join()來實現等待
// 當T2和T2執行緒退出時,呼叫T1.jion()和T2.join()的主執行緒就會從阻塞態被喚醒,從而執行check()和save()
T1.join();
T2.join();
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
使用CountDownLatch實現執行緒等待
上面的解決方案美中不足的地方在於:每一次while迴圈都會建立新的執行緒,而執行緒的建立是一個耗時操作。所以,最好能使創建出來的執行緒能夠迴圈使用。一個自然而然的方案便是執行緒池。
// 建立 2 個執行緒的執行緒池
Executor executor =Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
// 查詢未對賬訂單
executor.execute(()-> {
pos = getPOrders();
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
});
/* ??如何實現等待??*/
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
於是我們就建立兩個固定大小為2的執行緒池,之後在while迴圈裡重複利用。
但是問題也出來了:主執行緒如何得知getPOrders()和getDOrders()這兩個操作什麼時候執完?
前面主執行緒通過呼叫執行緒T1和T2的join()
方法來等待T1和T2退出,但是線上程池的方案裡,執行緒根本就不會退出,所以,join()方法不可取。
這時我們就可以使用CountDownLatch工具類,將其初始計數值設定為2。當執行完pos = getPOrders();
後,將計數器減一,執行完dos = getDOrders();
後也將計數器減一。當計數器為0時,被阻塞的主執行緒就可以繼續執行了。
// 建立 2 個執行緒的執行緒池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
// 計數器初始化為 2
CountDownLatch latch = new CountDownLatch(2);
// 查詢未對賬訂單
executor.execute(()-> {
pos = getPOrders();
latch.countDown(); // 實現對計數器減1
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
latch.countDown(); // 實現對計數器減1
});
// 等待兩個查詢操作結束
latch.await(); // 在await()返回之前,主執行緒會一直被阻塞
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
使用CyclicBarrier進一步優化對賬系統
除了getPOrders()和getDOrders()這兩個操作可以並行,這兩個查詢操作和check()
、save()
這兩個對賬操作之間也可以並行。
兩次查詢操作和對賬操作並行,對賬操作還依賴查詢操作的結果,有點像生產者-消費者的意思,兩次查詢操作是生產者,對賬操作是消費者。那麼,我們就需要一個佇列,來儲存生產者生產的資料,而消費者則從這個佇列消費資料。
不過,針對對賬系統,可以設計兩個佇列,並且這兩個佇列之間還有對應關係。訂單查詢操作將訂單查詢結果插入訂單佇列,派送單查詢操作將派送單插入派送單佇列,這兩個佇列的元素之間是有一一對應關係。這樣的好處在於:對賬操作可以每次從訂單隊列出一個元素和從派送單隊列出一個元素,然後對這兩個元素執行對賬操作,這樣資料一定不會亂掉。
如何使兩個佇列實現完全的並行?
兩個查詢操作所需時間並不相同,那麼一個簡單的想法便是,一個執行緒T1執行訂單的查詢工程,一個執行緒T2執行派送單的查詢工作,僅當執行緒T1和T2各自都生產完1條資料的時候,通知執行緒T3執行對賬操作。
先查詢完的一方需要在設定的屏障點等待另一方,直到雙方都到達屏障點,才開始繼續下一步任務。
於是我們可以使用CyclicBarrier來實現這個功能。建立一個計數器初始值為2的CyclicBarrier,同時傳入一個回撥函式,當計數器減為0的時候,便呼叫這個函式。
Vector<P> pos; // 訂單佇列
Vector<D> dos; // 派送單佇列
// 執行回撥的執行緒池
// 固定執行緒數量為1是因為只有單執行緒取獲取兩個佇列中的資料才不會出現數據匹配不一致問題
Executor executor = Executors.newFixedThreadPool(1);
// 建立CyclicBarrier的計數器為2,傳入一個執行緒另外執行對賬操作
// 當計數器為0時,會執行傳入執行緒執行對賬操作
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0); // 從訂單佇列中獲取訂單
D d = dos.remove(0); // 從派送單佇列中獲取派送單
// 執行對賬操作
diff = check(p, d);
// 差異寫入差異庫
save(diff);
}
void checkAll(){
// 迴圈查詢訂單庫
Thread T1 = new Thread(()->{
while(存在未對賬訂單){
pos.add(getPOrders()); // 查詢訂單庫
barrier.await(); // 將計數器減一併等待直到計數器為0
}
});
T1.start();
// 迴圈查詢運單庫
Thread T2 = new Thread(()->{
while(存在未對賬訂單){
dos.add(getDOrders()); // 查詢運單庫
barrier.await(); // 將計數器減一併等待直到計數器為0
}
});
T2.start();
}
執行緒T1負責查詢訂單,當查出一條時,呼叫barrier.await()
來將計數器減1,同時等待計數器變為0;執行緒T2負責查詢派送訂單,當查出一條時,也呼叫barrier.await()
來將計數器減1,同時等待計數器變為0;當T1和T2都呼叫barrier.await()時,計數器就會減到0,此時T1和T2就可以執行下一條語句了,同時會呼叫barrier的回撥函式來執行對賬操作。
CyclicBarrier的計數器有自動重置的功能,當減到0時,會自動重置你設定的初始值。於是,我們便可以重複使用CyclicBarrier。
小結
CountDownLatch
和CyclicBarrier
是Java併發包提供的兩個非常易用的執行緒同步工具類。它們的區別在於:CountDownLatch主要用來解決一個執行緒等待多個執行緒的場景(計數器一旦減到0,再有執行緒呼叫await(),該執行緒會直接通過,計數器不會被重置);CyclicBarrier是一組執行緒之間的相互等待(計數器可以重用,減到0會重置為設定的初始值),還可以傳入回撥函式,當計數器為0時,執行回撥函式。
參考:
[1] 極客時間專欄王寶令《Java併發程式設計實戰》
[2] Brian Goetz.Tim Peierls. et al.Java併發程式設計實戰[M].北京:機械工業出版社,2016
[3] Oracle Java API.https://docs.oracle.com/javase/8/docs/api/index.html?overview-summary.h