1. 程式人生 > >JAVA同步屏障CyclicBarrier

JAVA同步屏障CyclicBarrier

一:簡單介紹同步屏障CyclicBarrier.

      1.1 CyclicBarrier可以讓一組執行緒到達一個屏障時被阻塞,直到最後一個執行緒到達屏障時,所有被屏障攔截的執行緒才會繼續向下執行的.使用場景用於多執行緒計算資料.計算結果完成,插入同步屏障,阻塞等待.

      1.2 CyclicBarrier位於java.util.concurrent包下.執行緒內執行CyclicBarrier例項物件的await()方法後此執行緒已經到達了同步屏障了,在等待其他執行緒也到達屏障吶.

      1.3 CyclicBarrier的構造方法如下.

      傳一個攔截執行緒數的引數.

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

     傳一個攔截執行緒數的引數,提供一個執行緒任務,當執行緒到達屏障時,優先執行BarrierAction.

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
}

 二:基礎使用演示

    2.1 兩個執行緒的時候.一個主執行緒和一個自定義的子執行緒.

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * author:
 * date: 
 * time: 
 * description:CyclicBarrier
 * 一組執行緒到達一個屏障時被阻塞
 */
public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier=new CyclicBarrier(2);

        public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread執行");
            }
        }).start();
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Main執行");
    }
}

 執行結果如下:(多次執行後,有一下兩種結果,Java多執行緒的執行順序是CPU隨機排程的,和程式碼的物理位置的先後順序無關!)

將CyclicBarrier的攔截執行緒數目設定為3的時候.(此時任然只有一個主執行緒和一個自定義的執行緒執行).只需如下修改,其他地方不變

private static CyclicBarrier cyclicBarrier=new CyclicBarrier(3);

執行結果如下:

這個時候主執行緒和自定義的執行緒都執行了await()方法,到達了屏障,但是沒有第三個執行緒執行到屏障將不能放行此時阻塞的主執行緒個自定義執行緒了,只能等待了.

     2.2 現在執行CyclicBarrier的有優先任務的執行緒任務的.

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * author: 
 * date: 
 * time: 
 * description:CyclicBarrier
 * 一組執行緒到達一個屏障時被阻塞
 */
public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier=new CyclicBarrier(2,new ImportAction());

        public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread執行");
            }
        }).start();
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Main執行");
    }
}
class ImportAction implements Runnable{
    @Override
    public void run(){
        System.out.println("執行ImportAction關鍵任務");
    }
}

執行結果如下:

(首先分析一下,應該是CPU隨機排程其中一個執行緒,執行緒在其得到的時間片執行執行緒內的業務方法,然後執行了CyclicBarrier的await()方法,然後到達屏障阻塞了,在等待另外一個執行緒也到達屏障吶,但是現在有一個優先任務了,此時要執行優先執行緒任務了.).

執行完了,才會只需執行另外一個沒有到達同步屏障的任務執行緒方法.

多次執行會有以下兩種執行結果的.

下面的測試程式檢視當前CyclicBarrier阻塞的執行緒數目.

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * author: 
 * date: 
 * time: 
 * description:CyclicBarrier
 * 一組執行緒到達一個屏障時被阻塞
 */
public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier=new CyclicBarrier(2,new ImportAction());

        public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(cyclicBarrier.getParties());
                    System.out.println("此時攔截的執行緒屏障數:"+ cyclicBarrier.getNumberWaiting());
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread執行");
            }
        }).start();
        System.out.println("此時攔截的執行緒屏障數:"+ cyclicBarrier.getNumberWaiting());
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Main執行");
        System.out.println("此時攔截的執行緒屏障數:"+ cyclicBarrier.getNumberWaiting());
    }
}
class ImportAction implements Runnable{
    @Override
    public void run(){
        System.out.println("執行ImportAction關鍵任務");
    }
}

執行結果:

三:實戰演練

     3.1 場景描述:一個Excel儲存了一個使用者的多個賬戶這一年內銀行消費情況,其中每個Sheet儲存了該使用者的一個賬戶一年的每一筆銀行交易消費流水詳細數目.現在要統計一下這個使用者這一年平均每天銀行消費賬目數目.(假如該使用者有四張不同銀行的銀行卡).

    3.2 實現思路:為了計算效率,使用多執行緒分別統計出每個工作表的日均消費水平,都執行完之後就計算出了每個工作表的日均銀行消費流水數目,然後使用BarrierAction計算出所有的賬戶日均銀行消費流水賬目,繼而計算出該使用者這一年日均銀行消費流水賬目了.

    3.3 設計可能出現的問題:

         ①:如果用單執行緒計算,由於資料計算量大,效率低.

         ②:採用多執行緒,但是不使用CyclicBarrier,這裡是需要得到4個Sheet的日均流水才能得到整個使用者一年的日均銀行流水,需要等待都完成才可以的.不使用CyclicBarrier可能計算不方便,出現計算錯誤問題.

   3.4 編碼實現.

          (使用多執行緒技術羅列:CyclicBarier,ConcurrentHashMap,FixedThreadPool,Runnable.

import java.util.Map;
import java.util.concurrent.*;
/**
 * author: 
 * date: 
 * time: 
 * description:
 */
public class BankConsumeFlow implements  Runnable {
    /** 獲取當前PC的執行緒cpu數*/
    private static final Integer CPU_NUMBER=Runtime.getRuntime().availableProcessors();

    /** 設定同步屏障數目*/
    private CyclicBarrier cyclicBarrier=new CyclicBarrier(4,this);

    /** 儲存每個工作表的銀行消費日均流水*/
    private ConcurrentHashMap<String, Integer> sheetCount=new ConcurrentHashMap<>();

    /** 核心計算方法*/
    private void flowCount(){
        Executor executor= Executors.newFixedThreadPool(CPU_NUMBER);
        for(int i=0;i<CPU_NUMBER;i++){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    // 使用常數模擬一下
                    sheetCount.put(Thread.currentThread().getName(), 100);
                    // 一個執行緒計算一個sheet完成後到達同步屏障
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        // 關閉執行緒池
        ((ExecutorService) executor).shutdown();
    }
    @Override
    public void run(){
        int result=0;
        // 統計出所有的Sheet的日均銀行消費流水
        for(Map.Entry<String,Integer> sheet:sheetCount.entrySet()){
            result+=sheet.getValue();
        }
        // 得到最後計算結果
        sheetCount.put("result", result);
        System.out.println("該使用者這一年的銀行消費日均為:"+result+"元");
    }

    public static void main(String[] args) {
        BankConsumeFlow bankConsumeFlow=new BankConsumeFlow();
        bankConsumeFlow.flowCount();
    }
}

每個Sheet的計算結果為100元,4個就是400元.

四:關鍵方法總結

        3.1 getParties():返回同步屏障攔截的執行緒數目.     

public int getParties() {
    return parties;
}

        3.2 getNumberWaiting():返回CyclicBarrier此時正在阻塞的執行緒數目.(0~getParties()-1)    

private int count;
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
        lock.lock();
    try {
          return parties - count;
        } finally {
            lock.unlock();
        }
    }

  await呼叫了dowait()方法.

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
}

 使用等待特定時間後,就不會阻塞阻塞當前執行緒的.

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
}

看下面這個初始化的構造方法如下.

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

 使用了ReentrantLock鎖.返回的是阻塞執行緒總數減去執行緒計數的個數就是當前正在阻塞的.

      3.3 isBoken()方法用來了解阻塞的執行緒是否被中斷了.

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
 }

    3.4 reset()重新計算count,直接賦值為同步屏障初始化的阻塞的執行緒數.CountDownLatch的計數器只能使用一次.計算髮生錯誤可以重置計算器,執行緒重新執行一次.

private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
}
public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
 }