Java併發包提供了哪些併發工具類?
典型回答
我們通常所說的併發包也就是java.util.concurrent及其子包,集中了Java併發的各種基礎工具類,具體主要包括幾個方面:
- 提供了比synchronized更加高階的各種同步結構,包括CountDownLatch、CyclicBarrier、Semaphore等,可以實現更加豐富的多執行緒操作。比如利用Semaphore作為資源控制器,限制同時進行工作的執行緒數量。
- 各種執行緒安全的容器,比如最常見的ConcurrentHashMap、有序的ConcurrentSkipListMap,或者通過類似快照機制,實現執行緒安全的動態陣列CopyOnWriteArrayList等。
- 各種併發佇列實現,如各種BlockingQueue實現,比較典型的ArrayBlockingQueue、SynchorousQueue或針對特定場景的PriorityBlockingQueue等。
- 強大的Executor框架,可以建立各種不同型別的執行緒池,排程任務執行等。絕大部分情況下,不再需要自己從頭實現執行緒池和任務排程器。
知識擴充套件
這部分內容比較多,將分為兩講。這一講將重點介紹Semaphore,下一講將介紹CountDownLatch和CyclicBarrier。
1、經典訊號量Semaphore
Java提供了經典訊號量Semaphore的實現,它通過控制一定數量的許可(permit)的方式,來達到限制通用資源訪問的目的。你可以想象一下這個場景:在車站、機場等計程車時,當很多空出租車就位時,為防止過度擁擠,排程員指揮排隊等待坐車的隊伍一次進來5個人上車,等這5個人坐車出發,再放進去下一批。這和Semaphore的工作原理有些類似。
可以試試使用Semaphore來模擬實現這個排程過程:
import java.util.concurrent.Semaphore; public class UsualSemaphoreSample { public static void main(String[] args) throws InterruptedException { System.out.println("Action...GO!"); Semaphore semaphore = new Semaphore(5); for (int i = 0; i < 10; i++) { Thread t = new Thread(new SemaphoreWorker(semaphore)); t.start(); } } public static class SemaphoreWorker implements Runnable { private String name; private Semaphore semaphore; public SemaphoreWorker(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { try { log("is waiting for a permit!"); semaphore.acquire(); log("acquired a permit!"); log("executed!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { log("released a permit!"); semaphore.release(); } } private void log(String msg) { if (name == null) { name = Thread.currentThread().getName(); } System.out.println(name + " " + msg); } } }
這段程式碼是比較典型的Semaphore示例,其邏輯是,執行緒試圖獲得工作許可,得到許可則進行任務,然後釋放許可,這時等待許可的其它執行緒,就可獲得許可進入工作狀態,直到全部處理結束。編譯執行,我們就能看到Semaphore的許可機制對工作執行緒的限制。
但是,從具體節奏看來,其實並不符合我們前面場景的需求,因為本例中Semaphore的用法實際是保證,一直有5個人可以試圖乘車,如果有一個人出發了,立即就有排隊的人獲得許可,而這並不完全符合我們前面的需求。
那麼,我們再修改一下,演示個非典型的Semaphore用法。
import java.util.concurrent.Semaphore; public class AbnormalSemaphoreSample { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(0); for (int i = 0; i < 10; i++) { Thread t = new Thread(new MyWorker(semaphore)); t.start(); } System.out.println("Action...GO!"); semaphore.release(5); System.out.println("Wait for permits off"); while (semaphore.availablePermits() != 0) { Thread.sleep(100L); } System.out.println("Action...GO again!"); semaphore.release(5); } static class MyWorker implements Runnable { private Semaphore semaphore; public MyWorker(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("Executed!"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
注意,上面的程式碼,更側重的是演示Semaphore的功能以及侷限性,其實有很多執行緒程式設計中的反實踐。比如使用了sleep來協調任務執行,而且使用輪詢呼叫availiablePermits來檢測訊號量獲取情況。這都是很低效並且脆弱的,通常只是在測試或者診斷場景。
總的來說,我們可以看出Semaphore就是個計數器,其基本邏輯基於acquire/release,並沒有太複雜的同步邏輯。
如果Semaphore的數值被初始化為1,那麼一個執行緒就可以通過acquire進入互斥狀態。本質上和互斥鎖是非常相似的。但是區別也非常明顯,比如互斥鎖是有持有者的,而對於Semaphore這種計數器結構,雖然有類似功能,但其實不存在真正意義的持有者,除非我們進行擴充套件包裝。
【完】