1. 程式人生 > >Java併發包提供了哪些併發工具類?

Java併發包提供了哪些併發工具類?

典型回答

我們通常所說的併發包也就是java.util.concurrent及其子包,集中了Java併發的各種基礎工具類,具體主要包括幾個方面:

  • 提供了比synchronized更加高階的各種同步結構,包括CountDownLatch、CyclicBarrier、Semaphore等,可以實現更加豐富的多執行緒操作。比如利用Semaphore作為資源控制器,限制同時進行工作的執行緒數量。
  • 各種執行緒安全的容器,比如最常見的ConcurrentHashMap、有序的ConcurrentSkipListMap,或者通過類似快照機制,實現執行緒安全的動態陣列CopyOnWriteArrayList等。
  • 各種併發佇列實現,如各種BlockingQueue實現,比較典型的ArrayBlockingQueue、SynchorousQueue或針對特定場景的PriorityBlockingQueue等。
  • 強大的Executor框架,可以建立各種不同型別的執行緒池,排程任務執行等。絕大部分情況下,不再需要自己從頭實現執行緒池和任務排程器。

知識擴充套件

這部分內容比較多,將分為兩講。這一講將重點介紹Semaphore,下一講將介紹CountDownLatch和CyclicBarrier。

1、經典訊號量Semaphore

Java提供了經典訊號量Semaphore的實現,它通過控制一定數量的許可(permit)的方式,來達到限制通用資源訪問的目的。你可以想象一下這個場景:在車站、機場等計程車時,當很多空出租車就位時,為防止過度擁擠,排程員指揮排隊等待坐車的隊伍一次進來5個人上車,等這5個人坐車出發,再放進去下一批。這和Semaphore的工作原理有些類似。

可以試試使用Semaphore來模擬實現這個排程過程:

import java.util.concurrent.Semaphore;
public class UsualSemaphoreSample {
  public static void main(String[] args) throws InterruptedException {
    System.out.println("Action...GO!");
    Semaphore semaphore = new Semaphore(5);
    for (int i = 0; i < 10; i++) {
      Thread t = new Thread(new SemaphoreWorker(semaphore));
      t.start();
    }
  }
  public static class SemaphoreWorker implements Runnable {
    private String name;
    private Semaphore semaphore;
    public SemaphoreWorker(Semaphore semaphore) {
      this.semaphore = semaphore;
    }
    @Override
    public void run() {
      try {
        log("is waiting for a permit!");
        semaphore.acquire();
        log("acquired a permit!");
        log("executed!");
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        log("released a permit!");
        semaphore.release();
      }
    }
    private void log(String msg) {
      if (name == null) {
        name = Thread.currentThread().getName();
      }
      System.out.println(name + " " + msg);
    }
  }
}

這段程式碼是比較典型的Semaphore示例,其邏輯是,執行緒試圖獲得工作許可,得到許可則進行任務,然後釋放許可,這時等待許可的其它執行緒,就可獲得許可進入工作狀態,直到全部處理結束。編譯執行,我們就能看到Semaphore的許可機制對工作執行緒的限制。

但是,從具體節奏看來,其實並不符合我們前面場景的需求,因為本例中Semaphore的用法實際是保證,一直有5個人可以試圖乘車,如果有一個人出發了,立即就有排隊的人獲得許可,而這並不完全符合我們前面的需求。

那麼,我們再修改一下,演示個非典型的Semaphore用法。

import java.util.concurrent.Semaphore;
public class AbnormalSemaphoreSample {
  public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(0);
    for (int i = 0; i < 10; i++) {
      Thread t = new Thread(new MyWorker(semaphore));
      t.start();
    }
    System.out.println("Action...GO!");
    semaphore.release(5);
    System.out.println("Wait for permits off");
    while (semaphore.availablePermits() != 0) {
      Thread.sleep(100L);
    }
    System.out.println("Action...GO again!");
    semaphore.release(5);
  }
  static class MyWorker implements Runnable {
    private Semaphore semaphore;
    public MyWorker(Semaphore semaphore) {
      this.semaphore = semaphore;
    }
    @Override
    public void run() {
      try {
        semaphore.acquire();
        System.out.println("Executed!");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

注意,上面的程式碼,更側重的是演示Semaphore的功能以及侷限性,其實有很多執行緒程式設計中的反實踐。比如使用了sleep來協調任務執行,而且使用輪詢呼叫availiablePermits來檢測訊號量獲取情況。這都是很低效並且脆弱的,通常只是在測試或者診斷場景。

總的來說,我們可以看出Semaphore就是個計數器,其基本邏輯基於acquire/release,並沒有太複雜的同步邏輯。

如果Semaphore的數值被初始化為1,那麼一個執行緒就可以通過acquire進入互斥狀態。本質上和互斥鎖是非常相似的。但是區別也非常明顯,比如互斥鎖是有持有者的,而對於Semaphore這種計數器結構,雖然有類似功能,但其實不存在真正意義的持有者,除非我們進行擴充套件包裝。

【完】