1. 程式人生 > 實用技巧 >併發工具類

併發工具類

CountDownLatch

CountDownLatch是一個同步工具類,它允許一個或者多個執行緒一直等待,知道其他執行緒的操作執行完畢再執行。

CountDownLatch提供了兩個方法,一個是countDown,一個是await,countDownLatch初始化的時候需要傳入一個整數,在這個整數倒數到0之前,呼叫了await方法的程式都必須要等待,然後通過countDown來倒數。

public static void main(String[] args) throws InterruptedException {

  final CountDownLatch countDownLatch = new CountDownLatch(4);

  new Thread(new Runnable() {
    @Override
    public void run() {
      System.out.println("" + Thread.currentThread().getName() + "-執行中");
      countDownLatch.countDown();
      System.out.println("" + Thread.currentThread().getName() + "-執行完畢");
    }
  }, "t1").start();

  new Thread(new Runnable() {
    @Override
    public void run() {
      System.out.println("" + Thread.currentThread().getName() + "-執行中");
      countDownLatch.countDown();
      System.out.println("" + Thread.currentThread().getName() + "-執行完畢");
    }
  }, "t2").start();

  new Thread(new Runnable() {
    @Override
    public void run() {
      System.out.println("" + Thread.currentThread().getName() + "-執行中");
      countDownLatch.countDown();
      System.out.println("" + Thread.currentThread().getName() + "-執行完畢");
    }
  }, "t3").start();

  countDownLatch.await();
  System.out.println("所有執行緒已經執行完畢");
}

從程式碼實現看,類似join的功能,但是比join更加靈活。CountDownLatch建構函式會接受一個int型別的引數作為計數器的初始值,當呼叫CountDownLatch的countDown方法時,這和計數器就會減一。

模擬高併發場景

static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) {

  for (int i = 0; i < 1000; i++) {
    new Thread(new CountDownLatchDemo()).start();
  }
  countDownLatch.countDown();
}

@Override
public void run() {
  try {
    countDownLatch.await();
  }catch (InterruptedException e){
    e.printStackTrace();
  }
  System.out.println("ThreadName:"+Thread.currentThread().getName());
}

總的來說,凡是涉及到需要指定某個任務再執行之前,要等到前置任務執行完畢之後才能執行的場景,都可以使用CountDownLatch。

CountDownLatch原始碼解析

對於countDownLatch,只要有await()方法和countDown()方法。

countDown()方法每次呼叫都會將state減1,直到state的值為0;而await是一個阻塞方法,當state減為0的時候,await方法才會返回。await可以被多個執行緒呼叫。所有呼叫了await方法的執行緒阻塞在AQS的紫色佇列來,條件滿足(state==0),將執行緒從佇列中一個個喚醒過來。

acquireSharedInterruptibly

countDownLatch也使用到AQS,在CountDownLatch內部寫了一個Sync並且繼承了AQS這個抽象類重寫了AQS中的共享鎖方法。如下程式碼,這塊程式碼只要是判斷當前執行緒是否獲取到了共享鎖;(在CountDownLatch中,使用的是共享鎖機制,因為CountDownLatch並不需要實現互斥的特性)

public final void acquireSharedInterruptibly(int arg)
  throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  //state如果不等於0,說明當前執行緒需要加入帶共享鎖佇列
  if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
  return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly

1.addWaiter設定為shared模式

2.tryAcquire和tryAcquireShared的返回值不同,因此會多出一個判斷過程

3.在判斷前驅結點是頭結點後,呼叫了setHeadAndPropagate方法,而不是簡單地更新了一下頭結點

private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedE
  //建立一個共享模式的節點新增到佇列中
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        //判斷嘗試獲得鎖
        int r = tryAcquireShared(arg);
        //r>=0表示獲取到了執行許可權,這個時候state!=0,所以不會執行這段程式碼
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      //阻塞執行緒
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

圖解分析

加入這個時候有3個執行緒呼叫了await方法,由於這個時候state的值還不為0,所以這三個執行緒都會加入到AQS佇列中。並且三個執行緒都處於阻塞狀態。

CountDownLatch.countDown

由於執行緒被await方法阻塞了,所以只有等到countDown方法使得state=0的時候才會被喚醒。

1.只有當state減為0的時候,tryReleaseShared才會返回true,否則只是簡單的state=state-1

2.如果state=0,則呼叫doReleaseShared喚醒處於await狀態下的執行緒

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

//用自旋的方式實現state減1
protected boolean tryReleaseShared(int releases) {
  // Decrement count; signal when transition to zero
  for (;;) {
    int c = getState();
    if (c == 0)
      return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
      return nextc == 0;
  }
}

AQS.doReleaseShared

共享鎖的釋放和獨佔鎖的釋放有一定的差別,前面喚醒鎖的邏輯和獨佔鎖是一樣的,先判斷頭結點是不是SIGNAL狀態,如果是,則修改為0,並且喚醒頭結點的而下一個節點

private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;            // loop to recheck cases
        unparkSuccessor(h);
      }
      //這個CAS失敗的場景是:執行到這裡的時候,剛好有一個節點入隊,入隊會將這個ws設定為-1
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;                // loop on failed CAS
    }
    //如果當到這裡的時候,前面喚醒的執行緒已經佔了了head,那麼再迴圈
    //通過檢查頭節點是否改變了,如果改變了就繼續迴圈
    if (h == head)                   // loop if head changed
      break;
  }
}
PROPAGATE:標識為PROPAGATE狀態的節點,是共享模式下的節點狀態,處於這個狀態下的節點,會對縣城內的喚醒進行傳播

h==head:說明頭節點還沒有被剛剛用unparkSuccessor喚醒的執行緒(這裡可以理解為ThreadB)佔有,是break退出迴圈。

h!=head:頭節點被剛剛喚醒的執行緒(這裡可以理解為ThreadB)佔有,那麼這裡重新進入下一輪玄幻,喚醒下一個節點(這裡是ThreadB)。然後後面喚醒傳遞。。

一旦ThreadA被喚醒,程式碼又回到了doAcquireSharedInterruptibly中來執行。如果當前state滿足等於0的條件,則會執行setHeadAndPropagate方法

if (p == head) {
  //判斷嘗試獲得鎖
  int r = tryAcquireShared(arg);
  if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
  }
}

setHeadAndPropagate

這個方法主要作用是把被喚醒的節點,設定成head節點。然後繼續喚醒佇列中的其他執行緒。

由於現在佇列有3個執行緒處於阻塞狀態,一旦ThreadA被喚醒,並且設定為head之後,會繼續喚醒後續的ThreadB。

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; // Record old head for check below
  setHead(node);
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
      doReleaseShared();
  }
}

Semaphore

​ semaphore 也就是我們常說的訊號燈,semaphore 可以控 制同時訪問的執行緒個數,通過 acquire 獲取一個許可,如 果沒有就等待,通過 release 釋放一個許可。有點類似限流 的作用。叫訊號燈的原因也和他的用處有關,比如某商場 就 5 個停車位,每個停車位只能停一輛車,如果這個時候 來了 10 輛車,必須要等前面有空的車位才能進入。

public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < 10; i++) {
            new Car(i, semaphore).start();
        }
    }

    static class Car extends Thread {
        private int num;
        private Semaphore semaphore;

        public Car(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }

        public void run() {
            try {
                semaphore.acquire();
                System.out.println("第" + num + "佔用一個停車位");
                TimeUnit.SECONDS.sleep(2);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

使用場景

Semaphore比較常見的就是用來作限流操作。

Semaphore原始碼分析

從Semaphore的功能來看,我們可以猜測它的底層原理一定是基於AQS的共享鎖。

建立Semaphore例項的時候,需要一個引數permits,這個基本上可以確定是設定給AQS的state的,然後每個執行緒呼叫acquire的時候,執行state=state-1,release的時候執行state=state+1,當然,acquire的時候,如果state=0,說明沒有資源了,需要等待其他的執行緒release。

Semaphore分公平策略和非公平策略

FairSync

static final class FairSync extends Sync {
  private static final long serialVersionUID = 2014338818796000944L;

  FairSync(int permits) {
    super(permits);
  }

  protected int tryAcquireShared(int acquires) {
    for (;;) {
      //區別就在於是不是會先判斷是否有執行緒在排隊,然後才進行CAS鍵操作
      if (hasQueuedPredecessors())
        return -1;
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
          compareAndSetState(available, remaining))
        return remaining;
    }
  }
}

NoFairSync

通過對別發現公平鎖和非公平鎖的區別就是在於是否多了一個hasQueuedPredecessors的判斷

static final class NonfairSync extends Sync {
  private static final long serialVersionUID = -2694183684443567898L;

  NonfairSync(int permits) {
    super(permits);
  }

  protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
  }
}

final int nonfairTryAcquireShared(int acquires) {
  for (;;) {
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||
        compareAndSetState(available, remaining))
      return remaining;
  }
}
//都是基於共享鎖來實現的

CyclicBarrier

CyclicBarrier的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。他要做的事情是,讓一組執行緒到達一個屏障(也可以佳作同步點)時被阻塞,知道最後一個執行緒到達平衡住那個是,屏障才會開門,所有被屏障攔截的執行緒才會繼續工作。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier當前執行緒已經到達了屏障,然後當前執行緒被阻塞。

使用場景

當存在需要所有的子任務都完成時,才會執行主任務,這個時候就可以選擇使用CyclicBarrier。

案例

DataImportThread

public class DataImportThread extends Thread{


    private CyclicBarrier cyclicBarrier;

    private String path;

    public DataImportThread(CyclicBarrier cyclicBarrier,String path){
        this.cyclicBarrier = cyclicBarrier;
        this.path = path;
    }

    @Override
    public void run() {
        System.out.println("開始匯入:"+path+"位置的資料");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

CyclicBarrierDemo

public class CyclicBarrierDemo extends Thread{

    @Override
    public void run() {

        System.out.println("開始進行資料分析");
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new CyclicBarrierDemo());
        new Thread(new DataImportThread(cyclicBarrier,"file1")).start();
        new Thread(new DataImportThread(cyclicBarrier,"file2")).start();
        new Thread(new DataImportThread(cyclicBarrier,"file3")).start();
    }
}

注意點:

1)對於制定計數值parties。若由於某種原因,沒有足夠的執行緒呼叫CyclicBarrier的await,則所有呼叫await的執行緒都會被阻塞。

2)同樣的CyclicBarrier也可以呼叫await(timeout, unit),設定超時時間,在設定時間內,如果沒有足夠執行緒到達,則解除阻塞狀態,繼續工作。

3)通過reset重置計數,會使得進入await的執行緒出現BrokenBarrierExecption;

4)如果採用是CyclicBarrier(int parteis, Runnable barrierAction)構造方法,執行barrierAction操作的是最後一個到達的執行緒。

實現原理

CyclicBarrier相比CountDownLatch來說,要簡單很多,原始碼實現是基於ReentrantLock和Condition的組合使用。如下圖,CyclicBarrier和CountDownLatch是不是很像,只是CyclicBarrier可以不止一個柵欄,因為他的柵欄(Barrier)可以重複使用。