1. 程式人生 > 程式設計 >詳解Java迴環屏障CyclicBarrier

詳解Java迴環屏障CyclicBarrier

  上一篇說的CountDownLatch是一個計數器,類似執行緒的join方法,但是有一個缺陷,就是當計數器的值到達0之後,再呼叫CountDownLatch的await和countDown方法就會立刻返回,就沒有作用了,那麼反正是一個計數器,為什麼不能重複使用呢?於是就出現了這篇說的CyclicBarrier,它的狀態可以被重用;

一.簡單例子

  用法其實和CountDownLatch差不多,也就是一個計數器,當計數器的值變為0之後,就會把阻塞的執行緒喚醒:

package com.example.demo.study;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Study0216 {
 // 注意這裡的構造器,第一個引數表示計數器初始值
 // 第二個引數表示當計數器的值變為0的時候就觸發的任務
 static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,() -> {
  System.out.println("cyclicBarrier task ");
 });

 public static void main(String[] args) {
  // 新建兩個執行緒的執行緒池
  ExecutorService pool = Executors.newFixedThreadPool(2);
  // 執行緒1放入執行緒池中
  pool.submit(() -> {
   try {
    System.out.println("Thread1----await-begin");
    cyclicBarrier.await();
    System.out.println("Thread1----await-end");
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 執行緒2放到執行緒池中
  pool.submit(() -> {
   try {
    System.out.println("Thread2----await-begin");
    cyclicBarrier.await();
    System.out.println("Thread2----await-end");
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 關閉執行緒池,此時還在執行的任務會繼續執行
  pool.shutdown();
 }
}

詳解Java迴環屏障CyclicBarrier

 我們再看看CyclicBarrier的複用性,這裡比如有一個任務,有三部分組成,分別是A,B,C,然後建立兩個執行緒去執行這個任務,必須要等到兩個執行緒都執行完成A部分,然後才能開始執行B,只有兩個執行緒都執行完成B部分,才能執行C:

package com.example.demo.study;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Study0216 {
 // 這裡的構造器,只有一個引數,表示計數器初始值
 static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

 public static void main(String[] args) {
  // 新建兩個執行緒的執行緒池
  ExecutorService pool = Executors.newFixedThreadPool(2);
  // 執行緒1放入執行緒池中
  pool.submit(() -> {
   try {
    System.out.println("Thread1----stepA-start");
    cyclicBarrier.await();
    
    System.out.println("Thread1----stepB-start");
    cyclicBarrier.await();
    
    System.out.println("Thread1----stepC-start");
    
    
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 執行緒2放到執行緒池中
  pool.submit(() -> {
   try {
    System.out.println("Thread2----stepA-start");
    cyclicBarrier.await();
    
    System.out.println("Thread2----stepB-start");
    cyclicBarrier.await();
    
    System.out.println("Thread2----stepC-start");
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 關閉執行緒池,此時還在執行的任務會繼續執行
  pool.shutdown();
 }
}

詳解Java迴環屏障CyclicBarrier

二.基本原理

  我們看看一些重要屬性:

public class CyclicBarrier {
 //這個內部類只有一個boolean值
 private static class Generation {
  boolean broken = false;
 }

 //獨佔鎖
 private final ReentrantLock lock = new ReentrantLock();
  //條件變數
 private final Condition trip = lock.newCondition();
  //儲存執行緒的總數
 private final int parties;
 //這是一個任務,通過構造器傳遞一個任務,當計數器變為0之後,就可以執行這個任務
 private final Runnable barrierCommand;
 //這類內部之後一個boolean的值,表示屏障是否被打破
 private Generation generation = new Generation();
 //計數器
 private int count;
}

  構造器:

//我們的構造器初始值設定的是parties
public CyclicBarrier(int parties) {
 this(parties,null);
}
//注意,這裡開始的時候是count等於parties
//為什麼要有兩個變數呢?我們每次呼叫await方法的時候count減一,當count的值變為0之後,怎麼又還原成初始值呢?
//直接就把parties的值賦值給count就行了呀,簡單吧!
public CyclicBarrier(int parties,Runnable barrierAction) {
 if (parties <= 0) throw new IllegalArgumentException();
 this.parties = parties;
 this.count = parties;
 this.barrierCommand = barrierAction;
}

  然後再看看await方法:

public int await() throws InterruptedException,BrokenBarrierException {
 try {
  //呼叫的是dowait方法
  return dowait(false,0L);
 } catch (TimeoutException toe) {
  throw new Error(toe); // cannot happen
 }
}

//假設count等於3,有三個執行緒都在呼叫這個方法,預設超時時間為0,那麼首每次都只有一個執行緒可以獲取鎖,將count減一,不為0
//就會到下面的for迴圈中扔到條件佇列中掛起;直到第三個執行緒呼叫這個dowait方法,count減一等於0,那麼當前執行緒執行任務之後,
//就會喚醒條件變數中阻塞的執行緒,並重置count為初始值3
private int dowait(boolean timed,long nanos)throws InterruptedException,BrokenBarrierException,TimeoutException {
 //獲取鎖
 final ReentrantLock lock = this.lock;
 lock.lock();
 try {
  //g中只有一個boolean值
  final Generation g = generation;
  //如果g中的值為true的時候,拋錯
  if (g.broken)
   throw new BrokenBarrierException();
  //如果當前執行緒中斷,就拋錯
  if (Thread.interrupted()) {
   breakBarrier();
   throw new InterruptedException();
  }
  //count減一,再賦值給index
  int index = --count;
  //如果index等於0的時候,說明所有的執行緒已經到屏障點了,就可以
  if (index == 0) { // tripped
   boolean ranAction = false;
   try {
    //執行當前執行緒的任務
    final Runnable command = barrierCommand;
    if (command != null)
     command.run();
    ranAction = true;
    //喚醒其他因為呼叫了await方法阻塞的執行緒
    nextGeneration();
    return 0;
   } finally {
    if (!ranAction)
     breakBarrier();
   }
  }
  //能到這裡來,說明是count不等於0,也就是還有的執行緒沒有到屏障點
  for (;;) {
   try {
    //wait方法有兩種情況,一種是設定超時時間,一種是不設定超時時間
    //這裡就是對超時時間進行的一個判斷,如果設定的超時時間為0,則會在條件佇列中無限的等待下去,直到被喚醒
    //設定了超時時間,那就等待該時間
    if (!timed)
     trip.await();
    else if (nanos > 0L)
     nanos = trip.awaitNanos(nanos);
   } catch (InterruptedException ie) {
    if (g == generation && ! g.broken) {
     breakBarrier();
     throw ie;
    } else {
     Thread.currentThread().interrupt();
    }
   }

   if (g.broken)
    throw new BrokenBarrierException();

   if (g != generation)
    return index;

   if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
   }
  }
 } finally {
  //釋放鎖
  lock.unlock();
 }
}

//喚醒其他因為呼叫了await方法阻塞的執行緒
private void nextGeneration() {
 //喚醒條件變數中所有執行緒
 trip.signalAll();
 //重置count的值
 count = parties;
 generation = new Generation();
}

private void breakBarrier() {
 generation.broken = true;
 //重置count為初始值parties
 count = parties;
 //喚醒條件佇列中的所有執行緒
 trip.signalAll();
}

以上就是詳解Java迴環屏障CyclicBarrier的詳細內容,更多關於Java CyclicBarrier的資料請關注我們其它相關文章!