1. 程式人生 > >Java併發多執行緒 - 併發工具類JUC

Java併發多執行緒 - 併發工具類JUC

安全共享物件策略

1.Java執行緒限制 : 一個被執行緒限制的物件,由執行緒獨佔,並且只能被佔有它的執行緒修改
2.共享只讀 : 一個共享只讀的物件,在沒有額外同步的情況下,可以被多個執行緒併發訪問,
但是任何執行緒都不能修改它
3.執行緒安全物件 : 一個執行緒安全的物件或則容器,在內部通過同步機制來保證執行緒安全,
所以其他執行緒無需額外的同步就可以通過公共介面隨意訪問它
4.被守護物件 : 被守護物件只能通過獲取特定的鎖來訪問

執行緒安全 - 同步容器

採用synchronized關鍵字同步,缺點 :

  1. 不能完成做到執行緒安全
  2. 效能差

ArrayLisy -> Vector, Stack
HashMap -> HashTable (key、value不能為null)
Collections.synchronizedXXX(List、Set、Map)

執行緒安全 - 併發容器 J.U.C

ArrayList -> CopyOnWriteArrayList
HashSet、TreeSet -> CopyOnWriteArraySet ConcurrentSkipListSet
HashMap、TreeMap -> ConcurrentHashMap ConcurrentSkipListMap

AbstractQueuedSynchronizer - AQS

  1. 使用Node實現FIFO佇列,可以用於構建鎖或則其他同步裝置的基礎框架
  2. 利用一個int型別表示狀態
  3. 使用方法是基礎
  4. 子類通過繼承並通過實現它的方法管理其狀態 { acquire 和 release} 的方法操縱狀態
  5. 可以同時實現排他鎖和共享鎖模式(獨佔、共享)

常用類

CountDownLatch
Semaphore
CyclicBarrier
ReentrantLock
Condition
FutureTask

CountDownLacth

CountDownLatch是一個同步工具類,它允許一個或多個執行緒一直等待,直到其他執行緒執行完後再執行。例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有框架服務之後執行。

CountDownLatch是通過一個計數器來實現的,計數器的初始化值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就相應得減1。當計數器到達0時,表示所有的執行緒都已完成任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。


@Self4j
public class CountDownLatchExample {

    private final  static int threadCount = 200;
    
    public static void main(String[] arg) {
    
        ExecutorService exec = Executors.newCachedThreadPool(); 
        
        final CountDownLatch lacth = new CountDownLatch(5);
        
        for (int i = 0; i < 1000; i++) {
            exec.excute( () -> {
            final int threadNum  = i;
            try {
                test(threadNum);
            } catch (Exception e) {
                log.error("exception", e);
            } finally {
                // latch遞減
                lacth.countDown();
            }
            });
        }
        // 等待latch計數器為0,則繼續往下執行
        latch.await();
        // latch的await方法也可以傳入等待時間,等到等待時間後不管有沒完成計數都往下執行
        // latch.await( 10, TimeUnit.MILLISECONDS);
        log.info("finished");
        exec.shutdown();
    }

    public static void test(int i)  throw Exception{
        log.info("thread: {}", i);
    }
}

Semaphore

Semaphore(int permits):構造方法,建立具有給定許可數的計數訊號量並設定為非公平訊號量。
Semaphore(int permits,boolean fair):構造方法,當fair等於true時,建立具有給定許可數的計數訊號量並設定為公平訊號量。
void acquire():從此訊號量獲取一個許可前執行緒將一直阻塞。
void acquire(int n):從此訊號量獲取給定數目許可,在提供這些許可前一直將執行緒阻塞。
void release():釋放一個許可,將其返回給訊號量。就如同車開走返回一個車位。
void release(int n):釋放n個許可。
int availablePermits():獲取當前可用的許可數。
boolean tryAcquire():僅在呼叫時此訊號量存在一個可用許可,才從訊號量獲取許可。
boolean tryAcquire(int permits):僅在呼叫時此訊號量中有給定數目的許可時,才從此訊號量中獲取這些許可。


boolean tryAcquire(int permits,
                          long timeout,
                          TimeUnit unit)
                   throws InterruptedException

如果在給定的等待時間內此訊號量有可用的所有許可,並且當前執行緒未被 中斷,則從此訊號量獲取給定數目的許可。


@Self4j
public class SemaphoreExample {

    private final  static int threadCount = 200;

    public static void main(String[] arg) {

        ExecutorService exec = Executors.newCachedThreadPool(); 
    
        final Semaphore semaphore = new Semaphore(3);
    
        for (int i = 0; i < threadCount; i++) {
            exec.excute( () )-> {
            final int threadNum  = i;
            try {
                // tryAcquire會嘗試去獲取一個訊號量,如果獲取不到
                // 則什麼都不會發生,走接下來的邏輯
                // if (semaphore.tryAcquire(1)) {
                //    test(i);
                //    semaphore.release();//釋放一個訊號量
                // }
                semaphore.acquire();//獲取一個訊號量
                test(i);
                semaphore.release();//釋放一個訊號量
            } catch (Exception e) {
                log.error("exception", e);
            } 
            });
        }
        log.info("finished");
        exec.shutdown();
    }

    public static void test(int i)  throw Exception{
        log.info("thread: {}", i);
    }
}

CyclicBarrier

一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。

CyclicBarrier(int parties, Runnable barrierAction)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(執行緒)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的執行緒執行。

CyclicBarrier(int parties)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(執行緒)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。

int await()
在所有 參與者都已經在此 barrier 上呼叫 await 方法之前,將一直等待。


int await(long timeout,
                 TimeUnit unit)
          throws InterruptedException,
                 BrokenBarrierException,
                 TimeoutException

在所有 參與者都已經在此屏障上呼叫 await 方法之前將一直等待,或者超出了指定的等待時間。

boolean isBroken() : 查詢此屏障是否處於損壞狀態。

void reset() :
將屏障重置為其初始狀態。如果所有參與者目前都在屏障處等待,則它們將返回,同時丟擲一個 BrokenBarrierException。注意,在由於其他原因造成損壞 之後,實行重置可能會變得很複雜;此時需要使用其他方式重新同步執行緒,並選擇其中一個執行緒來執行重置。與為後續使用建立一個新 barrier 相比,這種方法可能更好一些。

int getNumberWaiting() :返回當前在屏障處等待的參與者數目。此方法主要用於除錯和斷言。


@Self4j
public class CyclicBarrierExample {

    private final  static int threadCount = 200;
    
    private final static CyclicBarrier cyclicBarrier = new CyclicBarrier(7, 
        () -> {
        log.info("callback is running !");
        }
    );
    
    public static void main(String[] arg) {
    
        ExecutorService exec = Executors.newCachedThreadPool(); 
        
        for (int i = 0; i < threadCount; i++) {
            exec.excute( () -> {
                final int threadNum  = i;
                try {
                    race(i);
                } catch (Exception e) {
                    log.error("exception", e);
                } 
            });
        }
        log.info("finished");
        
        exec.shutdown();
    }
    
    public static void race(int i)  throw Exception{
        log.info("thread {} is ready", i);
        cyclicBarrier.await();
        log.info("thread {} is continue", i);
    }
}

來源:https://segmentfault.com/a/1190000017864607